当前位置: 首页 > news >正文

springboot整合rabbitMQ的示例

RabbitMQ细分有多种工作模式,发布订阅、工作队列最为常见。
本次简单介绍发布订阅,着重介绍工作队列
环境:springboot2.7.18,RabbitMQ4.0.2(docker)

docker run -id --name=rabbitmq -v /usr/local/docker/rabbitmq:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:management

发布订阅模式

特点:一端发送,多端消费

  • pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • yml
spring.rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: admin
  • 创建队列配置FanoutQueue
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutQueue {public static final String FANOUT_EXCHANGE = "fanout.exchange";public static final String FANOUT_QUEUE1 = "fanout.queue1";public static final String FANOUT_QUEUE2 = "fanout.queue2";/*** 声明交换机*/@Beanpublic FanoutExchange exchange() {return new FanoutExchange(FANOUT_EXCHANGE, true, false);}/*** 声明队列1*/@Beanpublic Queue queue1() {return new Queue(FANOUT_QUEUE1, true, false, false);}/*** 声明队列2*/@Beanpublic Queue queue2() {return new Queue(FANOUT_QUEUE2, true, false, false);}/*** 绑定队列1关系*/@Beanpublic Binding queueBinding1(FanoutExchange fanoutExchange) {return BindingBuilder.bind(queue1()).to(fanoutExchange);}/*** 绑定队列2关系*/@Beanpublic Binding queueBinding2(FanoutExchange fanoutExchange) {return BindingBuilder.bind(queue2()).to(fanoutExchange);}}
  • 创建消费者Consumer
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
public class Consumer {@Componentstatic class Consumer1{@RabbitListener(queues = FanoutQueue.FANOUT_QUEUE1)public void dealQueue1(Message message) {System.out.println(FanoutQueue.FANOUT_QUEUE1 + "收到的消息:" + new String(message.getBody()));}}@Componentstatic class Consumer2{@RabbitListener(queues = FanoutQueue.FANOUT_QUEUE2)public void dealQueue2(Message message) {System.out.println(FanoutQueue.FANOUT_QUEUE2 + "收到的消息:" + new String(message.getBody()));}}
}
  • 创建生产者Controller
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class Controller {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("fb")public void testSendMessage() {String message = "我是忘崽大乔,你们好吗";rabbitTemplate.convertAndSend(FanoutQueue.FANOUT_EXCHANGE, "", message);}
}
  • 测试:调用fb接口,打印出如下信息,测试成功

fanout.queue2收到的消息:我是忘崽大乔,你们好吗
fanout.queue1收到的消息:我是忘崽大乔,你们好吗

简单模式:pull消费

特点:无需声明交换机,直接操作队列,手动发送,手动拉取

  • pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • yml
spring.rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: admin
  • 创建MyQueue
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MyQueue {public static final String TEST_QUEUE = "test.queue";//创建一个简单的队列@Beanpublic Queue originalQueue() {return QueueBuilder.durable(TEST_QUEUE).build();}
}
  • 创建MyController
 	@Autowiredprivate RabbitTemplate rabbitTemplate;// 发送10条消息@GetMapping("/put")public void put() {for (int i = 1; i <= 10; i++) {rabbitTemplate.convertAndSend("", MyQueue.TEST_QUEUE, "你好"+i);}   }// 拉取消息@GetMapping("/get")public void get() {Message message = rabbitTemplate.receive(MyQueue.TEST_QUEUE);if (message != null) {String msgBody = new String(message.getBody());System.out.println("📥 拉取到消息: " + msgBody);} else {System.out.println("📭 队列为空,没有可拉取的消息");}}
  • 测试 调用put接口,再调用2次get接口,打印如下信息
拉取到消息: 你好1
拉取到消息: 你好2

工作队列模式:push消费-自动确认机制

特点:无需声明交换机,直接操作队列,手动发送,主动推送,自动确认

  • 创建MyCustomer
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;@Component
@Slf4j
public class MyCustomer {//消费者1@RabbitListener(queues = MyQueue.TEST_QUEUE)public void auto1(Message message, Channel channel) throws Exception {String msg= new String(message.getBody(), StandardCharsets.UTF_8);log.info("监听1获取信息为:{}", msg);Thread.sleep(1000); // 模拟处理耗时}//消费者2@RabbitListener(queues = MyQueue.TEST_QUEUE)public void auto2(Message message, Channel channel) throws Exception {String msg= new String(message.getBody(), StandardCharsets.UTF_8);log.info("监听2获取信息为:{}", msg);Thread.sleep(1000); // 模拟处理耗时}
}
  • 测试:调用put接口,打印出如下消息,说明测试成功
监听2获取信息为:你好2
监听1获取信息为:你好1
监听2获取信息为:你好4
监听1获取信息为:你好3
监听1获取信息为:你好5
监听2获取信息为:你好6
监听2获取信息为:你好8
监听1获取信息为:你好7
监听1获取信息为:你好9
监听2获取信息为:你好10

此模式下如果方法异常,消息会自动回到队列中

工作队列模式:push消费-手动确认机制

特点:无需声明交换机,直接操作队列,手动发送,主动推送,手动确认

  • yml添加参数
spring.rabbitmq:listener:simple:acknowledge-mode: manual	# 手动确认,默认是自动
  • 修改MyCustomer(主要测试确认机制,所以只添加1个消费者吧)
@Component
@Slf4j
public class MyCustomer {//手动消费@RabbitListener(queues = MyQueue.TEST_QUEUE)public void hand(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("获取信息为:{}", msg );Thread.sleep(1000);// 手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}
  • 测试:put10条,可以看到消息全部打印出来

获取信息为:你好1
获取信息为:你好2
获取信息为:你好3
获取信息为:你好4
获取信息为:你好5
获取信息为:你好6
获取信息为:你好7
获取信息为:你好8
获取信息为:你好9
获取信息为:你好10

此模式下如果抛异常会发生什么?我们修改代码,加入异常

    //手动消费@RabbitListener(queues = MyQueue.TEST_QUEUE)public void hand(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("获取信息为:{}", msg );Thread.sleep(1000);if("你好5".equals(msg) || "你好6".equals(msg) || "你好7".equals(msg) || "你好8".equals(msg)){int x = 1/0;}channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);}
  • 测试:put10条,查看日志

1-10全部打印,但5-8有如下报错:Execution of Rabbit message listener failed.
因为5-8没有手动确认,rabbitmq收不到反馈,只能呆住。直到重试机制(默认30min)断开channel,再次消费5-8,再次异常…
可是既然呆住,为什么6、7、8、9、10依然会打印出来呢
因为mq有一个默认预取条数 prefetch,默认是250。
我这里取了10条,也就是10条都在ack,也可以理解为并行,我自己理解为一次性ack条数
我们登录mq面板查看此队列,发现有4条ack状态,这四条就是5-8
在这里插入图片描述

如果prefetch改为1条,会发生什么

  • 修改yml:
  listener:simple:prefetch: 1	# 消费者预取1条数据到内存,默认为250条acknowledge-mode: manual   # 确定机制
  • 测试:停止程序,清除队列消息(purge Messages按钮),重启程序,put10条

发现到5就停止了,6-10未被消费。 此时查看mq面板,发现ack是1条,待消费是5条,符合预期
在这里插入图片描述

不想卡在某一条,可以把异常消息重回队列吗?有,nack

  • 修改代码
	//手动消费@RabbitListener(queues = MyQueue.TEST_QUEUE)public void hand(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("获取信息为:{}", message);Thread.sleep(1000);if("你好5".equals(msg ) || "你好6".equals(msg ) || "你好7".equals(msg ) || "你好8".equals(msg )){//重回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}else{channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
  • 测试:清空消息,重新put。

发现消息5一直在重复打印,后面的消息无法消费
因为basicNack策略把消息5放回队头,紧接着又取出,再放回队头,再取出。。。,导致后面消息堆积
拒绝策略自行查阅资料,不在此叙述(官方解释是放到队尾,可测试明明是放在头部了,不知道为啥)

有解决消息堆积的办法吗?最直接的方法增加一个消费者(直接复制一个就行,本次就不展示了)
还有解决消息堆积的办法吗?设置concurrency线程数为5,或者prefetch=5(5>异常消息数)

  • yml修改参数
  listener:simple:concurrency: 5prefetch: 1acknowledge-mode: manual
  • 测试:清空队列,重新put

当concurrency=5,prefetch=1时,2秒钟消费完毕。但不保证顺序消费
当concurrency=1,prefetch=5时,10秒就消费完毕。保证顺序
发现区别点就是:concurrency缩短了消费时间,prefetch保证了消费顺序
concurrency=5:就像5个人干活,每人干1份工,省时但无序
prefetch=5:就像1个人干活,自己干5份工,不省时但有序
当然concurrency和prefetch都不是解决堆积问题的根本方法,只适合异常消息很少的情况下可取

曲线救国的方法,就是放入新消息,丢弃老消息,但也不是最优解,可了解一下

  • 修改代码:
    @RabbitListener(queues = MyQueue.TEST_QUEUE)public void hand(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("获取信息为:{}", msg);Thread.sleep(1000);if("你好5".equals(message) || "你好6".equals(message) || "你好7".equals(message) || "你好8".equals(message)){//channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);//重新发送消息到队尾channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBody());channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}else{channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
  • 测试:

发现1-10被顺序消费,堆积问题得以解决。
但是5-8不断被投递、消费,导致cpu飙升,并且业务逻辑也不希望这样消费,怎么改?

引入死信队列,有问题的直接扔小黑屋。个人认为这是解决我的项目中消息堆积最合适的

  • 修改MyQueue,创建死信交换机和队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class MyQueue {public static final String TEST_QUEUE = "test.queue";public static final String TEST_EXCHANGE = "test.exchange";public static final String TEST_ROUTING_KEY = "original.routing.key";public static final String DEAD_EXCHANGE = "dead.exchange";public static final String DEAD_QUEUE = "dead.queue";public static final String DEAD_ROUTING_KEY = "dead.routing.key";// 1. 声明死信交换机(持久化)@Beanpublic DirectExchange deadExchange() {return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();}// 2. 声明死信队列(持久化)@Beanpublic Queue deadQueue() {return QueueBuilder.durable(DEAD_QUEUE).build();}// 3. 绑定死信队列到交换机@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTING_KEY);}// 4. 声明原始交换机@Beanpublic DirectExchange originalExchange() {return ExchangeBuilder.directExchange(TEST_EXCHANGE).durable(true).build();}// 5. 声明原始队列(绑定死信参数)@Beanpublic Queue originalQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DEAD_EXCHANGE);args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);//args.put("x-message-ttl", 30000); // 可选:队列级 TTLreturn QueueBuilder.durable(TEST_QUEUE).withArguments(args).build();}// 6. 绑定原始队列到交换机@Beanpublic Binding originalBinding() {return BindingBuilder.bind(originalQueue()).to(originalExchange()).with(TEST_ROUTING_KEY);}//    @Bean
//    public Queue originalQueue() {
//        return QueueBuilder.durable(TEST_QUEUE).build();
//    }}
  • 修改MyCustomer
    @RabbitListener(queues = MyQueue.TEST_QUEUE)public void hand(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("获取信息为:{}", msg);Thread.sleep(1000);if("你好5".equals(msg) || "你好6".equals(msg) || "你好7".equals(msg) || "你好8".equals(msg)){channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}else{channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
  • 修改MyController
 	// 发送消息@GetMapping("/put")public void put() {for (int i = 1; i <= 10; i++) {rabbitTemplate.convertAndSend(MyQueue.TEST_EXCHANGE, MyQueue.TEST_ROUTING_KEY, "你好" + i);}}
  • 测试 :启动(先删除原始队列,否则会报错),重新put

发现1-10均被消费,并且5-8进入了死信队列中
可以进一步配置死信队列重试机制,此处就不再深究了

整体来说,增加消费者,增加线程数,增加预取条数,引入死信队列,都可以解决堆积的问题,没有最好的,只有最合适的

以上是浅浅尝试springboot与rabbitMQ的结合使用,并不做专业配置
ps:为什么拒绝策略会放在队头呢?有知道的小伙伴欢迎留言

http://www.dtcms.com/a/321191.html

相关文章:

  • Elasticsearch:在向量搜索中使用 Direct IO
  • 解码华为云安全“铁三角”:用“分层防御”化解安全挑战
  • 微软披露Exchange Server漏洞:攻击者可静默获取混合部署环境云访问权限
  • 企业AI的双层技术栈架构:融合社区创新与企业级管控的设计蓝图
  • Git 使用场景笔记
  • DuoPlus支持导入文件批量配置云手机参数,还优化了批量操作和搜索功能!
  • 数据结构--哈希表
  • QAGenerationChain从知识库生成大模型应用测试的问题对
  • LeetCode算法日记 - Day 5: 长度最小的子数组、无重复字符的最长子串
  • 【uni-app】解决在 h5 环境下会出现双标题问题
  • 内核的调试和优化
  • Netty-Rest搭建笔记
  • 微算法科技(NASDAQ:MLGO)使用循环QSC和QKD的量子区块链架构,提高交易安全性和透明度
  • 降低程序运行时CPU和GPU峰值占用的技术方案
  • 基于深度学习的鸟类检测识别系统(yolo11、yolov8、yolov5+UI界面+Python项目源码+模型+标注好的数据集)
  • ROHM推出适用于Zone-ECU的高性能智能高边开关!
  • 【unitrix数间混合计算】2.3 标准化处理系统(src/number/normalize/mod.rs)
  • Alkimi 与 Sui 合作,修复「破碎」的广告生态
  • HarmonyOS多设备资源文件管理以及resources资源引用方式
  • 交换机100G模块远距离连接踩坑记录
  • 强制用户更改WordPress密码的重要性及实现方法
  • Pinterest视觉营销自动化:亚矩阵云手机实例与多分辨率适配技术
  • 在 Elasticsearch/Kibana (ELK Stack) 中搜索包含竖线 (|)​​ 这类特殊字符的日志消息 (msg 字段) ​确实需要转义
  • proteus实现简易DS18B20温度计(stm32)
  • python学智能算法(三十五)|SVM-软边界拉格朗日方程乘子非负性理解
  • 阿里云服务linux安装单机版
  • Java 之 设计模式
  • Scratch编程:枪战游戏(附源码)
  • C++信息学奥赛一本通-第一部分-基础一-第3章-第1节
  • 【深度学习新浪潮】近三年高精度大规模三维实景重建研究进展(2022-2025)