当前位置: 首页 > news >正文

事务设置和消息分发

事务

RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制.

SpringAMQP也提供了对事务相关的操作,RabbitMQ事务允许开发者确保消息的发送和接收是原子性的,要么
全部成功,要么全部失败.|

前期准备工作:

    //事务public static final String TRANS_QUEUE = "TRANS_QUEUE";public static final String TRANS_EXCHANGE = "TRANS_EXCHANGE";public static final String TRANS_KEY = "TRANS_KEY";
    //事务@Bean("transQueue")public Queue transQueue() {return QueueBuilder.durable(MQConstants.TRANS_QUEUE).build();}@Bean("transExchange")public Exchange transExchange() {return ExchangeBuilder.directExchange(MQConstants.TRANS_EXCHANGE).build();}@Bean("transBinding")public Binding transBinding(@Qualifier("transExchange") Exchange exchange, @Qualifier("transQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.TRANS_KEY).noargs();}

配置事务管理器

@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
}

添加 @Transactional

如果不添加 @Transactional,我们的事务管理器是不会在这个代码上生效的

    @Transactional@RequestMapping("/trans")public String trans() {rabbitTemplate.convertAndSend(MQConstants.TRANS_EXCHANGE, MQConstants.TRANS_KEY, "trans1 ");int n = 10 / 0;rabbitTemplate.convertAndSend(MQConstants.TRANS_EXCHANGE, MQConstants.TRANS_KEY, "trans2 ");return "消息发送成功";}

经过观察,我们可以看到这两条消息要么同时发送成功,要么同时发送失败

消息分发

RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者,每条消息只会发送给订阅列表里的一个消费者,这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经收到消费并已经确认了消息,这种方式是不太合理的,试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。

如何处理呢?我们可以使用前面章节讲到的**channel.basicQos(intprefetchCount)**方法,来限制当前信道上的消费者所能保持的最大未确认消息的数量比如:消费端调用了channelbasicQos(5),RabbitMQ会为该消费者数,发送一条消息计数+1,消费一条消息计数-1,当达到了设定的上限,RabbitMQ就不会再向它发送消息了,直到消费者确认了某条消息。

类似TCP/IP中的"滑动窗口".
prefetchCount设置为0时表示没有上限。
basicQos对拉模式的消费无效

限流和负载均衡

配置信息:

    listener:simple:acknowledge-mode: manual # 设置确认模式prefetch: 5 

前期准备:

常量类:

    //限流public static final String QOS_QUEUE = "QOS_QUEUE";public static final String QOS_EXCHANGE = "QOS_EXCHANGE";public static final String QOS_KEY = "QOS_KEY";

声明:

    //限流@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(MQConstants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(MQConstants.QOS_EXCHANGE).build();}@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, @Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.QOS_KEY).noargs();}

生产者:

    @RequestMapping("/qos")public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(MQConstants.QOS_EXCHANGE, MQConstants.QOS_KEY, "qos");}return "消息发送成功";}

如果我们不进行手动确认,观察最大未确认的消息接收量:

在这里插入图片描述
在这里插入图片描述

可以得知消费者最大接收到的未确认消息数量为 我们设置的 prefetch 值


我们可以通过限流这种方式实现负载均衡

@Component
public class QosListener {@RabbitListener(queues = MQConstants.QOS_QUEUE)public void handle1(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消费消息:" + messageContent);Thread.sleep(10000);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = MQConstants.QOS_QUEUE)public void handle2(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消费消息:" + messageContent);Thread.sleep(5000);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}

可以观察到消费能力强的队列会持续消费消息,消费能力弱的队列消费的消息会相对较少

在这里插入图片描述


文章转载自:

http://sVrlTp9n.hmbtb.cn
http://hOo9066Z.hmbtb.cn
http://gffb0Wnr.hmbtb.cn
http://CK75KBKv.hmbtb.cn
http://4T7DYK4K.hmbtb.cn
http://bj9w6xwf.hmbtb.cn
http://XUX75761.hmbtb.cn
http://JpYL84KH.hmbtb.cn
http://AhYLQLmd.hmbtb.cn
http://dRZeZvvd.hmbtb.cn
http://vgMMAEe7.hmbtb.cn
http://pxM05gYP.hmbtb.cn
http://XnZ6Wj6Y.hmbtb.cn
http://mcybF37P.hmbtb.cn
http://oOetlrlF.hmbtb.cn
http://z2WTUxPd.hmbtb.cn
http://Pg07aVi5.hmbtb.cn
http://2jOKI3k1.hmbtb.cn
http://dvOhDHad.hmbtb.cn
http://9lhkmKIV.hmbtb.cn
http://vkxEPnJv.hmbtb.cn
http://W1cRoWWT.hmbtb.cn
http://ZxzmKi6V.hmbtb.cn
http://V6DaouAZ.hmbtb.cn
http://ioRnIB0x.hmbtb.cn
http://7ntM7vho.hmbtb.cn
http://JUToJQyU.hmbtb.cn
http://eQZXVZKx.hmbtb.cn
http://vcskKVBr.hmbtb.cn
http://5nK0K6Bu.hmbtb.cn
http://www.dtcms.com/a/373558.html

相关文章:

  • 人工智能-python-深度学习-神经网络-GoogLeNet
  • 告别进度拖延:19款项目进度管理软件深度测评
  • lesson56:CSS进阶指南:Flex布局、变换渐变与动画实战全解析
  • 【高等数学】第十一章 曲线积分与曲面积分——第四节 对面积的曲面积分
  • 精通Octokit:GitHub API开发全攻略
  • 超越模仿:探寻智能的本源
  • CSS 定位技术解析
  • IACheck赋能AI环评报告审核,推动环保设备制造行业发展
  • Photoshop保存图层
  • Java高级编程--XML
  • Nano Banana 技术深度解析:重新定义AI影像的革命性里程碑
  • 运作管理学习笔记5-生产和服务设施的选址
  • 基于单片机的智能路灯(论文+源码)
  • Python中hashlib模块 - 哈希加密
  • Webpack开发:从入门到精通
  • paddlex3.0.1-ocr服务化安装部署(docker)
  • [Upscayl图像增强] 应用程序状态管理 | 响应式状态Jotai | 持久化设置
  • 趣味学RUST基础篇(函数式编程闭包)
  • 5000+张带XML标注的杂货货架数据集:专为目标检测与产品识别设计的零售AI训练数据,助力智能超市与计算机视觉研究
  • 【项目】-mipi摄像头从0开发的过程
  • 宁波浙江制造认证、立标
  • k8s常用命令详解
  • uv使用指南
  • GPS汽车限速器有哪些功能?主要运用在哪里?
  • ARM 基础(2)
  • 【Unity】使用ProtobufNet处理数据
  • (回溯/组合)Leetcode77组合+39组合总和+216组合总和III
  • 2025年渗透测试面试题总结-59(题目+回答)
  • 如何使用Docker快速运行Firefox并实现远程访问本地火狐浏览器的教程
  • [硬件电路-167]:Multisim - 标准的元件库