当前位置: 首页 > news >正文

RabbitMQ 高级特性之消息分发

1. 为什么要消息分发

当 broker 拥有多个消费者时,就会将消息分发给不同的消费者,消费者之间的消息不会重复,RabbitMQ 默认的消息分发机制是轮询,但会无论消费者是否发送了 ack,broker 都会继续发送消息至消费者,这就会造成消费者压力增大。于是,可以限制消费者每一次接收到的消息的数量,当消息达到该数量时,broker 就不会继续给这个消费者发送消息,而是会给其他的消费者派送消息。

所消费者消费了一条消息,并且给 broker 发送了 ack,那么此时消费者所未消耗的消息就没有达到最大消息数量,于是 broker 就会继续给消费者分配消息,直到消费者未消费的消息数量达到上限。

对于这种特性,有下面两个应用场景:

1.1 限流

在某些秒杀场景中,每一秒消费者接收到的消息都会非常大,那么就会造成消费者压力过大。于是我们就可以限制消费者所能接收的最大消息数量。

配置代码如下:

spring:rabbitmq:listener:simple:acknowledge-mode: manualprefetch: 5 #每个队列最多接收五条消息

在配置中,prefetch 表示队列中的消息数量上限为 5 条,若队列中未确认的消息数量达到 5 条,此时 broker 就不会继续给该队列分配消息,而是给其它的未达到上限的队列分配消息。

并且此处要设置为手动确认,若使用 auto 或 none,可能业务逻辑还没有开始消息就已经被签收,这就无法发挥限流的作用。

队列、交换机声明代码如下:

    @Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qoeExchange")public DirectExchange qoeExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}@Bean("qosBind")public Binding qosBind(@Qualifier("qoeExchange") DirectExchange directExchange,@Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.QOS_ROUTINGKEY);}

生产者代码如下:

    @RequestMapping("/qos")public String qos() {for (int i = 0; i < 20; i++) {String messageInfo = "qos... " + i;rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, Constants.QOS_ROUTINGKEY, messageInfo);}return "消息发送成功";}

消费者代码如下:

@Component
@Slf4j
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void listener1(Message message, Channel channel) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收到消息: {}, deliveryTag: {}", messageInfo, deliveryTag);}}

此处,消费者没有给 broker 发送 ack,那么队列中的消息就会一直存在。

代码运行结果如下:

这里我们可以看到,一共有 5 条未确认的消息, 已经达到了上限,于是就不会继续向消费者发送消息。

1.2 负载均衡

使用消息分发,也可以实现负载均衡。

现有两个消费者 A、B,A 处理消息的速度慢,B处理消息的速度快。若不设置负载均衡,那么就会出现 A 积压的消息过多,而 B 几乎没有什么消息挤压,这就没有充分地利用资源。

于是我们可以将 prefetch 设置为 1,那么每次消费者只会接收到一条消息,当 A、B 接收到消息后,在 B 处理完成时 A 还没有处理完,于是 broker 就会给 B 继续推送消息,直到 A 处理完成后才会继续给 A 推送消息。

消费者代码如下:

@Component
@Slf4j
public class QosListener {/*** 消费者1* @param message* @param channel* @throws IOException*/@RabbitListener(queues = Constants.QOS_QUEUE)public void listener1(Message message, Channel channel) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("消费者 1 接收到消息: {}, deliveryTag: {}", messageInfo, deliveryTag);try {Thread.sleep(2000);channel.basicAck(deliveryTag, false);} catch (IOException e) {channel.basicNack(deliveryTag, false, true);} catch (InterruptedException e) {throw new RuntimeException(e);}}/*** 消费者2* @param message* @param channel* @throws IOException*/@RabbitListener(queues = Constants.QOS_QUEUE)public void listener2(Message message, Channel channel) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("消费者 2 接收到消息: {}, deliveryTag: {}", messageInfo, deliveryTag);try {Thread.sleep(1000);channel.basicAck(deliveryTag, false);} catch (IOException e) {channel.basicNack(deliveryTag, false, true);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}

上述代码,将消费者 2 的处理速度是消费者 1 的两倍,代码运行结果如下:

可以看出,消费者  2 每消耗 2 条数据,消费者 1 才消耗 1 条数据。,也就达到了负载均衡的作用。

http://www.dtcms.com/a/272754.html

相关文章:

  • 【Fargo】发送一个rtp包的过程1:怎么统一加twcc序号
  • 华锐云空间展销编辑器:开启数字化展示新时代​
  • U-Boot 2025.07 引入的 “uthreads” 优势介绍
  • 什么是主链
  • 【会员专享数据】2013-2024年我国省市县三级逐月SO₂数值数据(Shp/Excel格式)
  • 使用EasyExcel动态合并单元格(模板方法)
  • RK3568项目(八)--linux驱动开发之基础外设(上)
  • 亚马逊运营中出单词反查
  • 机器学习:反向神经元传播公式推导
  • 记录今天学习Comfyui的感受
  • python正则表达式(小白五分钟从入门到精通)
  • 智能化时代下的门店运营:AI的深刻影响
  • 2025年第十五届APMCM亚太地区大学生数学建模竞赛(中文赛项)
  • 【C++】红黑树的底层思想 and 大厂面试常问
  • BootStrap
  • 售前:该站高位思考还是站低位思考
  • Codeforces Round 1034 (Div. 3) G题题解记录
  • 创建本地软件仓库(rhel7与rhel9)
  • HighReport报表工具开始支持BS报表设计器
  • SW-CA(多平台产品上架系统)
  • uni-app 途径站点组件开发与实现分享
  • 体积超过2MB?uniapp小程序分包上传
  • [论文阅读]Text Compression for Efficient Language Generation
  • Go语言包管理完全指南:从基础到最佳实践
  • BM12 单链表的排序
  • 东土科技智能塔机系统亮相南京,助力智能建造高质量发展
  • HOOK专题
  • web前端面试笔记
  • 北京一家IPO业绩持续性存疑,关联交易频繁独立性堪忧
  • 24、企业设备清单管理(Equipment)详解:从分类到管理,设备全生命周期把控