当前位置: 首页 > news >正文

Spring Boot+RabbitMQ 实战:4 种交换机模式(Work/Fanout/Direct/Topic)保姆级实现

RabbitMQ 的实现

Spring AMQP

Spring 提供了RabbitMQ 开发的封装,Spring AMQP通过集成Spring生态,大幅简化了消息队列

引入依赖

<!--Spring MVC相关依赖--> 
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency>
<!--RabbitMQ相关依赖--> 
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置 Constants

public class Constants {public static final String WORK_QUEUE = "WORK_QUEUE";public static final String FANOUT_QUEUE1 = "Fanout_QUEUE1";public static final String FANOUT_QUEUE2 = "Fanout_QUEUE2";public static final String FANOUT_EXCHANGE = "Fanout_EXCHANGE";public static final String DIRECT_QUEUE1 = "DIRECT_QUEUE1";public static final String DIRECT_QUEUE2 = "DIRECT_QUEUE2";public static final String DIRECT_EXCHANGE = "DIRECT_EXCHANGE";public static final String TOPIC_QUEUE1  = "TOPIC_QUEUE1";public static final String TOPIC_QUEUE2 = "TOPIC_QUEUE2";public static final String TOPIC_EXCHANGE = "TOPIC_EXCHANGE";}

配置 Config

@Configuration
public class RabbitMQConfig {//work_queue@Bean("workQueue")public Queue workQueue(){return QueueBuilder.durable(Constants.WORK_QUEUE).build();}//fanout_queue@Bean("fanoutQueue1")public Queue fanoutQueue1(){return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2(){return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();}@Bean("fanoutExchange")public FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).build();}@Bean("bindingFanoutQueue1")public Binding bindingFanoutQueue1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue1")  Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}@Bean("bindingFanoutQueue2")public Binding bindingFanoutQueue2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue2")  Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}// direct_queue@Bean("directQueue1")public Queue directQueue1(){return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2(){return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).build();}@Bean("bindingDirectQueue1")public Binding bindingDirectQueue1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue){return BindingBuilder.bind(queue).to(directExchange()).with("a");}@Bean("bindingDirectQueue2")public Binding bindingDirectQueue2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange()).with("b");}@Bean("bindingDirectQueue3")public Binding bindingDirectQueue3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange()).with("c");}@Bean("topicQueue1")public Queue topicQueue1(){return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}@Bean("topicExchange")public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).build();}@Bean("bindingTopicQueue1")public Binding bindingTopicQueue1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.x.*");}@Bean("bindingTopicQueue2")public Binding bindingTopicQueue2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.*.y");}@Bean("bindingTopicQueue3")public Binding bindingTopicQueue3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("xy'.#");}}

可以通过配置 Config 一次性大量的声明,队列、交换机、绑定关系等,大幅度缩减了频繁创建文件的次数

ProducerController

@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work(){for (int i = 0; i <10 ; i++) {rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE,"hello work queue" + i);}return "发送成功";}@RequestMapping("/fanout")public String fanout(){for (int i = 0; i <10 ; i++) {rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello fanout queue" + i);}return "发送成功";}@RequestMapping("/direct/{routingKey}")public String direct(@PathVariable("routingKey") String routingKey){rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routingKey,"hello direct this is routingKey " + routingKey);return "发送成功";}@RequestMapping("/topic/{routingKey}")public String topic(@PathVariable("routingKey") String routingKey){rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey,"hello topic " + routingKey);return "发送成功";}
}

这是一个基于Spring Boot的RabbitMQ消息生产者控制器(ProducerController),用于向RabbitMQ消息队列发送消息。它实现了四种常见的消息队列模式,通过HTTP接口触发消息发送

WorkListener

@Component
public class WorkListener {@RabbitListener( queues = Constants.WORK_QUEUE )public void workListener1(String message) {System.out.println("队列["+Constants.WORK_QUEUE+"] 接收到消息:" + message);}@RabbitListener( queues = Constants.WORK_QUEUE )public void workListener2(String message) {System.out.println("队列["+Constants.WORK_QUEUE+"] 接收到消息:" + message);}
}

@RabbitListener 不仅可用于方法,还可用于类级别。当标注在处理方法时如上图代码所示,当标注在类时,需要搭配 @RabbitHandler 使用,将 @RabbitHandler 标注在类的方法上

FanOutListener

@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void queueListener1(String msg) throws InterruptedException {System.out.println("队列["+Constants.DIRECT_QUEUE1+"] 接收到消息:" + msg);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void queueListener2(String msg) throws InterruptedException {System.out.println("队列["+Constants.DIRECT_QUEUE2+"] 接收到消息:" + msg);}
}

outListener 是一个基于 Spring AMQP 的消息消费者组件,专门用于处理 ​Fanout 类型交换机的消息。它通过@RabbitListener注解监听两个不同的队列,实现 ​广播模式​ 的消息消费

DirectListener

@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void queueListener1(String msg) throws InterruptedException {System.out.println("队列["+Constants.DIRECT_QUEUE1+"] 接收到消息:" + msg);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void queueListener2(String msg) throws InterruptedException {System.out.println("队列["+Constants.DIRECT_QUEUE2+"] 接收到消息:" + msg);}
}

DirectListener专门用于处理 ​Direct 类型交换机的消息,实现​路由键精准匹配​的消息分发模式

TopicsListener

public class TopicsListener {@RabbitListener(queues = Constants.TOPIC_QUEUE1)public void topicListener1(String message){System.out.println( "队列["+Constants.TOPIC_QUEUE1+"] 接收到消息:" + message);}@RabbitListener(queues = Constants.TOPIC_QUEUE2)public void topicListener2(String message){System.out.println( "队列["+Constants.TOPIC_QUEUE2+"] 接收到消息:" + message);}
}

TopicsListener专门用于处理 ​Topic 类型交换机的消息,实现 ​通配符路由​ 的灵活消息分发模式

希望这份博客能够帮助到你。如果有其他需要修改或添加的地方,请随时告诉我。

http://www.dtcms.com/a/523786.html

相关文章:

  • 【2026计算机毕业设计】基于Django的智慧办公hr招聘辅助管理系统
  • NBIOT (1) : 当世界开始“低语“
  • 酒店网站制作公司有谁做分销网站
  • Git 服务器搭建
  • Ubuntu24安装MongoDB7
  • Ubuntu 自动挂载移动硬盘
  • 如何使用Postman做接口自动化测试及完美的可视化报告?
  • 配置Centos7.6 yum镜像源
  • Flink非对齐checkpoint踩坑记
  • 使用 WebSocket 实现手机控制端和电脑展示端的实时通信,包含断线重连功能。
  • 服装网站建设怎么写wordpress strip_tags
  • 一文讲清:数据清洗、数据中台、数据仓库、数据治理
  • 【C++ STL】探索STL的奥秘——vector底层的深度剖析和模拟实现!
  • STM32CUBEMX安装离线库
  • 体验 Suno v5:最新的 Suno AI 音乐模型
  • 2.4 欧拉集群安装Nova计算服务
  • 贵港网站建设兼职免费广告设计网站
  • Cell Mol Biol Lett|Runx2诱导超级沉默子形成下调Lpl表达:重塑雪旺细胞脂质代谢的新机制
  • 国自然·医工交叉热点|泛癌组织学重建AI模型
  • Python依赖管理与环境迁移实战:Poetry+Docker构建高效开发体系
  • 山西网站建设推荐景区网站如何建设
  • Flutter---CupertinoPicker滚动选择器
  • 全面掌握PostgreSQL关系型数据库,备份和恢复,笔记46和笔记47
  • Python SQLAlchemy模块:从入门到实战的数据库操作指南
  • 天津哪里有做网站的jquery wordpress
  • 流媒体网站建设规划亚马逊网站建设案例
  • PHP 异步IO扩展包 AsyncIO v2.0.0 发布
  • 《信息系统项目管理师》案例分析题及解析模拟题5
  • Jenkins上实现CI集成软件信息Teams群通知案例实现。
  • ZYNQ平台中断服务函数中的变量不加volatile修饰导致的奇怪问题解决