springboot整合rabbitMQ
1.生产者工程
-
pom.xml里引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
application.yml里配置基本信息
spring: rabbitmq: host: localhost port: 5672 username: ****** password: ****** virtual-host: /test
-
在配置类里创建交换机,队列,绑定交换机和队列
package com.min.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { // 创建交换机 @Bean("topicExchange") public Exchange exchange() { return ExchangeBuilder.topicExchange("springboot-topic-exchange").durable(true).build(); } // 创建队列 @Bean("queue1") public Queue queue1() { return QueueBuilder.durable("springboot-queue1").build(); } @Bean("queue2") public Queue queue2() { return QueueBuilder.durable("springboot-queue2").build(); } // 绑定队列和交换机 @Bean public Binding BindQueue1Exchange(@Qualifier("queue1") Queue queue, @Qualifier("topicExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("#.error").noargs(); } @Bean public Binding BindQueue2Exchange(@Qualifier("queue2") Queue queue, @Qualifier("topicExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("order.*").noargs(); } }
-
注入RabbitTemplate发消息
package com.min; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class SpringbootRabbitmqProducerApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void testSend1() { rabbitTemplate.convertAndSend("springboot-topic-exchange","boot.error","boot.error的key发送的消息"); } @Test void testSend2() { rabbitTemplate.convertAndSend("springboot-topic-exchange","order.error","order.error的key发送的消息"); } @Test void testSend3() { rabbitTemplate.convertAndSend("springboot-topic-exchange","order.insert","order.insert的key发送的消息"); } }
2.消费者工程
-
引入依赖并且配置基本信息,和生产者一样
-
创建两个监听器,分别监听两个队列
package com.min.listener; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitMQListener1 implements MessageListener { @Override @RabbitListener(queues = "springboot-queue1") public void onMessage(Message message) { System.out.println("springboot-queue1接收到消息:" + new String(message.getBody())); } }
package com.min.listener; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitMQListener2 implements MessageListener { @Override @RabbitListener(queues = "springboot-queue2") public void onMessage(Message message) { System.out.println("springboot-queue2接收到消息:" + new String(message.getBody())); } }
-
启动消费者工程,然后分别运行生产者工程里的三个测试方法,结果如下
springboot-queue1接收到消息:boot.error的key发送的消息
springboot-queue1接收到消息:order.error的key发送的消息 springboot-queue2接收到消息:order.error的key发送的消息
springboot-queue2接收到消息:order.insert的key发送的消息
从以上结果可以看出,routingkey为boot.error的消息只能被springboot-queue1接收到,routingkey为order.insert的消息只能被springboot-queue2接收到,而routingkey为order.error的消息两个队列都能接收到。(注意:本案例使用的是rabbitmq的topic工作模式)