Spring Boot+RabbitMQ 实战:4 种交换机模式(Work/Fanout/Direct/Topic)保姆级实现
RabbitMQ 的实现
Spring AMQP
Spring 提供了RabbitMQ 开发的封装,Spring AMQP通过集成Spring生态,大幅简化了消息队列
引入依赖
<!--Spring MVC相关依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency>
<!--RabbitMQ相关依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>配置 Constants
public class Constants {public static final String WORK_QUEUE = "WORK_QUEUE";public static final String FANOUT_QUEUE1 = "Fanout_QUEUE1";public static final String FANOUT_QUEUE2 = "Fanout_QUEUE2";public static final String FANOUT_EXCHANGE = "Fanout_EXCHANGE";public static final String DIRECT_QUEUE1 = "DIRECT_QUEUE1";public static final String DIRECT_QUEUE2 = "DIRECT_QUEUE2";public static final String DIRECT_EXCHANGE = "DIRECT_EXCHANGE";public static final String TOPIC_QUEUE1 = "TOPIC_QUEUE1";public static final String TOPIC_QUEUE2 = "TOPIC_QUEUE2";public static final String TOPIC_EXCHANGE = "TOPIC_EXCHANGE";}配置 Config
@Configuration
public class RabbitMQConfig {//work_queue@Bean("workQueue")public Queue workQueue(){return QueueBuilder.durable(Constants.WORK_QUEUE).build();}//fanout_queue@Bean("fanoutQueue1")public Queue fanoutQueue1(){return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2(){return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();}@Bean("fanoutExchange")public FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).build();}@Bean("bindingFanoutQueue1")public Binding bindingFanoutQueue1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue1") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}@Bean("bindingFanoutQueue2")public Binding bindingFanoutQueue2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue2") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}// direct_queue@Bean("directQueue1")public Queue directQueue1(){return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2(){return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).build();}@Bean("bindingDirectQueue1")public Binding bindingDirectQueue1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue){return BindingBuilder.bind(queue).to(directExchange()).with("a");}@Bean("bindingDirectQueue2")public Binding bindingDirectQueue2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange()).with("b");}@Bean("bindingDirectQueue3")public Binding bindingDirectQueue3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange()).with("c");}@Bean("topicQueue1")public Queue topicQueue1(){return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}@Bean("topicExchange")public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).build();}@Bean("bindingTopicQueue1")public Binding bindingTopicQueue1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.x.*");}@Bean("bindingTopicQueue2")public Binding bindingTopicQueue2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.*.y");}@Bean("bindingTopicQueue3")public Binding bindingTopicQueue3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("xy'.#");}}可以通过配置 Config 一次性大量的声明,队列、交换机、绑定关系等,大幅度缩减了频繁创建文件的次数
ProducerController
@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work(){for (int i = 0; i <10 ; i++) {rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE,"hello work queue" + i);}return "发送成功";}@RequestMapping("/fanout")public String fanout(){for (int i = 0; i <10 ; i++) {rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello fanout queue" + i);}return "发送成功";}@RequestMapping("/direct/{routingKey}")public String direct(@PathVariable("routingKey") String routingKey){rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routingKey,"hello direct this is routingKey " + routingKey);return "发送成功";}@RequestMapping("/topic/{routingKey}")public String topic(@PathVariable("routingKey") String routingKey){rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey,"hello topic " + routingKey);return "发送成功";}
}这是一个基于Spring Boot的RabbitMQ消息生产者控制器(ProducerController),用于向RabbitMQ消息队列发送消息。它实现了四种常见的消息队列模式,通过HTTP接口触发消息发送
WorkListener
@Component
public class WorkListener {@RabbitListener( queues = Constants.WORK_QUEUE )public void workListener1(String message) {System.out.println("队列["+Constants.WORK_QUEUE+"] 接收到消息:" + message);}@RabbitListener( queues = Constants.WORK_QUEUE )public void workListener2(String message) {System.out.println("队列["+Constants.WORK_QUEUE+"] 接收到消息:" + message);}
}@RabbitListener 不仅可用于方法,还可用于类级别。当标注在处理方法时如上图代码所示,当标注在类时,需要搭配 @RabbitHandler 使用,将 @RabbitHandler 标注在类的方法上
FanOutListener
@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void queueListener1(String msg) throws InterruptedException {System.out.println("队列["+Constants.DIRECT_QUEUE1+"] 接收到消息:" + msg);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void queueListener2(String msg) throws InterruptedException {System.out.println("队列["+Constants.DIRECT_QUEUE2+"] 接收到消息:" + msg);}
}outListener 是一个基于 Spring AMQP 的消息消费者组件,专门用于处理 Fanout 类型交换机的消息。它通过@RabbitListener注解监听两个不同的队列,实现 广播模式 的消息消费
DirectListener
@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void queueListener1(String msg) throws InterruptedException {System.out.println("队列["+Constants.DIRECT_QUEUE1+"] 接收到消息:" + msg);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void queueListener2(String msg) throws InterruptedException {System.out.println("队列["+Constants.DIRECT_QUEUE2+"] 接收到消息:" + msg);}
}DirectListener专门用于处理 Direct 类型交换机的消息,实现路由键精准匹配的消息分发模式
TopicsListener
public class TopicsListener {@RabbitListener(queues = Constants.TOPIC_QUEUE1)public void topicListener1(String message){System.out.println( "队列["+Constants.TOPIC_QUEUE1+"] 接收到消息:" + message);}@RabbitListener(queues = Constants.TOPIC_QUEUE2)public void topicListener2(String message){System.out.println( "队列["+Constants.TOPIC_QUEUE2+"] 接收到消息:" + message);}
}TopicsListener专门用于处理 Topic 类型交换机的消息,实现 通配符路由 的灵活消息分发模式
希望这份博客能够帮助到你。如果有其他需要修改或添加的地方,请随时告诉我。

