当前位置: 首页 > news >正文

RabbitMQ高级特性——TTL、死信队列、延迟队列、事务、消息分发

目录

一、TTL

1.1设置消息的TTL

1.2设置队列的TTL

1.3两者之间的区别

二、死信队列

2.1死信的概念

2.2死信产生的条件:

2.3死信队列的实现

死信队列的工作原理

2.4常⻅⾯试题

三、延迟队列

3.1概念

3.2应用场景

3.3RabbitMQ 实现延迟队列的核心原理

1. 基于 TTL(Time-To-Live)+ 死信队列(Dead Letter Queue)

2.使用RabbitMQ延迟插件 

1. 安装插件

2. 配置队列和交换机(Spring Boot)

3. 发送延迟消息

4. 消费延迟消息

5. 测试

注意事项

四、事务

五、消息分发

5.1 介绍

5.2 限流

5.3负载均衡


一、TTL

TTL(Time to Live,过期时间)即过期时间,RabbitMQ可以对消息和队列设置TTL过期时间

当消息到达存活的时间之后,还没有被消费就自动清除

类似于 购物订单超时了没有付款,订单被自动取消

1.1设置消息的TTL

目前有两种方法可以设置消息的TTL

  1. 设置队列的TTL,该队列中的所有消息均为相同的过期时间
  2. 针对消息本身设置的TTL,每条消息TTL可以不同

但是如果两种方法一起使用,则会根据两种方法的较小的数据为准


针对消息本身设置的TTL方法,是在发送消息的方针中加入expiration的属性参数(单位为毫秒)

完整代码:

    //TTLpublic static final String TTL_QUEUE = "ttl.queue";public static final String TTL_QUEUE2 = "ttl2.queue";public static final String TTL_EXCHANGE = "ttl.exchange";@Configuration
public class TTLRabbitMQ {@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}
}@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("/ttl")public String ttl() {System.out.println("ttl...");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000");  //单位: 毫秒, 过期时间为30sreturn message;});rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000");  //单位: 毫秒, 过期时间为10sreturn message;});return "消息发送成功";}
}

运行程序,观察结果

1.可以发现当发送四条消息,两个队列的Ready消息均为4,

2.10秒钟之后, 刷新⻚⾯, 发现ttl2队列的消息已被删除(再过20秒ttl队列消息也会被删除)

  • 如果不设置TTL,则表⽰此消息不会过期;如果将TTL设置为0,则表⽰除⾮此时可以直接将消息投递到 消费者,否则该消息会被⽴即丢弃

1.2设置队列的TTL

设置队列TTL的方法是在创建队列时,加入x-messahe-ttl参数实现(单位是毫米)

完整代码:

    //TTLpublic static final String TTL_QUEUE = "ttl.queue";public static final String TTL_QUEUE2 = "ttl2.queue";public static final String TTL_EXCHANGE = "ttl.exchange";//设置ttl@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build();  //设置队列的ttl为20s}@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("/ttl2")public String ttl2() {System.out.println("ttl2...");//发送普通消息rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...");return "消息发送成功";}
}

启动程序,验证结果:

运⾏之后发现,新增了⼀个队列, 队列有⼀个TTL标识

1.发送消息后, 可以看到, Ready消息为1
  • 采⽤发布订阅模式, 所有与该交换机绑定的队列(ttl_queue和ttl_queue2)都会收到消息

2.20秒钟之后, 刷新⻚⾯, 发现消息已被删除

  • 由于ttl_queue队列, 未设置过期时间, 所以ttl_queue的消息未删除

1.3两者之间的区别

  • 设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除
  • 设置消息TTL的⽅法, 即使消息过期, 也不会⻢上从队列中删除, ⽽是在即将投递到消费者之前进⾏判定的.
为什么这两种⽅法处理的⽅式不⼀样?
  1. 因为设置队列过期时间, 队列中已过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否 有过期的消息即可.
  2. ⽽设置消息TTL的⽅式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可

二、死信队列

2.1死信的概念

  • 死信简单理解就是因为种种原因,无法被消费的消息,就是死信
  • 有死信,自然就有死信队列.当消息在一个队列中变成死信之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX(Dead Letter Exchange),绑定DLX的队列,就称为死信队列(Dead LetterQueue,简称DLQ).
  • RabbitMQ 的死信队列(Dead Letter Queue,DLQ)是一种特殊的队列,用于存储无法被正常消费的消息(即 "死信")。当消息满足特定条件时,会被从原队列转发到死信队列,这有助于消息的可靠性处理和问题排查。

2.2死信产生的条件:

消息变成死信一般是由一下几种情况:

  1. 消息被拒绝(Basic.Reject / Basic.Nack),并且设置了requeue 参数为false
  2. 消息过期
  3. 队列达到最大长度

2.3死信队列的实现

死信队列的工作原理

  1. 为普通队列设置死信交换机(Dead Letter Exchange,DLX)
  2. 当消息成为死信时,RabbitMQ 会自动将其发送到设置的死信交换机
  3. 死信交换机通过绑定关系将消息路由到死信队列
  4. 可以专门创建消费者处理死信队列中的消息

注意:死信队列和死信交换机 与 普通队列和普通交换机没有区别。

实现代码:

    //死信public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String DL_QUEUE = "dl.queue";public static final String DL_EXCHANGE= "dl.exchange";//死信相关配置@Configuration
public class DLConfig {//演示TTL+死信队列模拟的延迟队列存在问题//制造死信产生的条件@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE) //绑定普通队列.deadLetterExchange(Constants.DL_EXCHANGE) //绑定死信交换机.deadLetterRoutingKey("dlx")       //绑定死信路由键//制造死信条件如下两种.ttl(10000) //设置TTL 10秒.maxLength(10L) //设置队列最大长度.build();}/*** 创建正常队列* 设置死信交换机和死信路由键* @return Queue*/
//    @Bean("normalQueue")
//    public Queue normalQueue(){
//        return QueueBuilder.durable(Constants.NORMAL_QUEUE)
//                .deadLetterExchange(Constants.DL_EXCHANGE)
//                .deadLetterRoutingKey("dlx")
//                .build();
//    }/*** 创建正常交换机* @return DirectExchange*/@Bean("normalExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}/*** 绑定正常队列到正常交换机* @param queue 正常队列* @param exchange 正常交换机* @return Binding*/@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}//死信交换机和队列/*** 创建死信队列* @return Queue*/@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}/*** 创建死信交换机* @return DirectExchange*/@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}/*** 绑定死信队列到死信交换机* @param queue 死信队列* @param exchange 死信交换机* @return Binding*/@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}
}

发布消息:


@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;/*** 测试死信* @return*/@RequestMapping("/dl")public String dl() {System.out.println("dl...");//发送普通消息
//        rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test...");System.out.printf("%tc 消息发送成功 \n", new Date());
//        //测试队列长度for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "dl test..."+i);}return "消息发送成功";}
}

测试结果:

  • 已知普通队列长度为10TTL为10秒,当发送20条消息后,观察队列的情况
  • 当启动生产者时,可以看到两个队列均有十条消息,但因为超出普通队列长度,导致后十条消息会直接到达死信队列,等待10后再次观察,发现原有的消息也会给死信队列

2.4常⻅⾯试题

死信队列作为RabbitMQ的⾼级特性,也是⾯试的⼀⼤重点.
1.死信队列的概念
  • 死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些⽆法被正常消费或处理的消息,在消息队列系统中, 如RabbitMQ, 死信队列⽤于存储这些死信消息
2.死信的来源
  • 消息过期: 消息在队列中存活的时间超过了设定的TTL
  • 消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新⼊队(requeue=false), 消息也会成为死信.
  • 队列满了: 当队列达到最⼤⻓度, ⽆法再容纳新的消息时, 新来的消息会被处理为死信.
3.死信队列的应⽤场景
  • 消息重试机制:对于处理失败的消息,可以在死信队列中进行重试
  • 延迟任务:利用消息过期时间,实现延迟任务(如订单超时取消)
  • 异常监控:通过监控死信队列,及时发现系统问题
  • 数据恢复:死信队列可作为消息的备份,便于数据恢复

三、延迟队列

3.1概念

在 RabbitMQ 中,延迟队列(Delay Queue) 是一种特殊的消息队列,用于存储需要在指定时间后才被消费的消息。它的核心作用是实现消息的 “延时投递”,即消息发送后不会立即被消费者处理,而是等待预设的延迟时间后才进入消费流程。

3.2应用场景

  1. 订单超时取消:用户下单后,若 30 分钟内未支付,自动取消订单并释放库存。
  2. 定时任务触发:例如每天凌晨 2 点执行数据备份、定时发送提醒消息等。
  3. 失败重试机制:当某个操作失败时,延迟一段时间后重试(如接口调用失败后,5 分钟后再次尝试)。
  4. 消息通知延迟:如用户注册成功后,1 小时后发送新手引导邮件。

3.3RabbitMQ 实现延迟队列的核心原理

RabbitMQ 本身没有直接提供 “延迟队列” 的功能,但可以通过以下两种方式间接实现:

1. 基于 TTL(Time-To-Live)+ 死信队列(Dead Letter Queue)

这是最常用的实现方式,利用了 RabbitMQ 的两个特性:

  • TTL(消息存活时间):设置消息的过期时间,当消息超过该时间未被消费时,会变成 “死信”(Dead Letter)。
  • 死信队列(DLQ):为队列配置 “死信交换机(Dead Letter Exchange)”,当消息成为死信后,会被自动路由到死信交换机绑定的队列(即死信队列),消费者从死信队列中获取消息,实现延迟效果。
@Configuration
public class DelayConfig {//延迟队列public static final String DELAY_QUEUE = "delay.queue";public static final String DELAY_EXCHANGE = "delay.exchange";@Bean("delayQueue")public Queue delayQueue(){return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange(){return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();}
}@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;* 延迟队列测试 - 使用 TTL 实现延迟效果* 通过设置消息的过期时间来模拟延迟队列的效果* @return 发送结果提示信息*/@RequestMapping("/delay")public String delay() {System.out.println("delay...");rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000");  //单位: 毫秒, 过期时间为10sreturn message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 30s...", message -> {message.getMessageProperties().setExpiration("30000");  //单位: 毫秒, 过期时间为30sreturn message;});System.out.printf("%tc 消息发送成功 \n", new Date());return "消息发送成功";}/*** 延迟队列测试 - 使用 RabbitMQ 延迟插件实现延迟效果* 利用 RabbitMQ 的延迟插件来精确控制消息的延迟时间* @return 发送结果提示信息*/@RequestMapping("/delay2")public String delay2() {System.out.println("delay2...");rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test 30s...", message -> {message.getMessageProperties().setDelayLong(30000L);  //单位: 毫秒, 延迟时间为30sreturn message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test 10s...", message -> {message.getMessageProperties().setDelayLong(10000L);  //单位: 毫秒, 延迟时间为10sreturn message;});System.out.printf("%tc 消息发送成功 \n", new Date());return "消息发送成功";}}

2.使用RabbitMQ延迟插件 

RabbitMQ 官方提供了一个插件 rabbitmq_delayed_message_exchange,专门用于实现延迟队列,功能更强大且灵活。

特点

  • 支持为每个消息单独设置延迟时间(无需依赖死信队列)。
  • 延迟精度更高,避免了 TTL + 死信队列方式中 “消息堆积导致的延迟偏差” 问题。

实现步骤

  • 安装插件:在 RabbitMQ 服务器上安装 rabbitmq_delayed_message_exchange 插件(需重启 RabbitMQ 生效)。
  • 声明一个类型为 x-delayed-message 的交换机,并指定延迟类型(如 x-delayed-type: direct)。
  • 发送消息时,通过 x-delay 头部字段设置延迟时间(单位:毫秒)。
  • 交换机在延迟时间到达后,会自动将消息路由到绑定的队列,消费者直接从队列中获取消息。

两种实现方式的对比

实现方式优点缺点适用场景
TTL + 死信队列无需额外插件,依赖 RabbitMQ 原生特性1. 队列级 TTL 不支持单消息单独设置延迟;
2. 消息堆积可能导致延迟时间不准
延迟时间固定、精度要求不高的场景
延迟插件(推荐)支持单消息单独设置延迟,精度高需要额外安装插件延迟时间灵活、精度要求高的场景

代码示例(基于 Spring Boot + 延迟插件)

以下是使用 rabbitmq_delayed_message_exchange 插件实现延迟队列的简单示例:

1. 安装插件
# 下载插件(版本需与RabbitMQ匹配)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez# 复制到RabbitMQ插件目录
cp rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.12.0/plugins/# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 重启RabbitMQ
systemctl restart rabbitmq-server
2. 配置队列和交换机(Spring Boot)
@Configuration
public class DelayQueueConfig {// 延迟交换机名称public static final String DELAY_EXCHANGE_NAME = "delay.exchange";// 延迟队列名称public static final String DELAY_QUEUE_NAME = "delay.queue";// 路由键public static final String DELAY_ROUTING_KEY = "delay.routing.key";// 声明延迟交换机(类型为x-delayed-message)@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 指定延迟交换机的底层类型(如direct、topic等)return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 声明延迟队列@Beanpublic Queue delayQueue() {return QueueBuilder.durable(DELAY_QUEUE_NAME).build();}// 绑定交换机和队列@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY).noargs();}
}
3. 发送延迟消息
@Service
public class DelayMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDelayMessage(String message, long delayTime) {// 发送消息时,通过x-delay头设置延迟时间(毫秒)rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE_NAME,DelayQueueConfig.DELAY_ROUTING_KEY,message,correlationData -> {correlationData.getMessageProperties().setHeader("x-delay", delayTime);return correlationData;});System.out.println("发送延迟消息:" + message + ",延迟时间:" + delayTime + "ms");}
}
4. 消费延迟消息
@Service
public class DelayMessageReceiver {@RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE_NAME)public void receiveDelayMessage(String message) {System.out.println("收到延迟消息:" + message + ",时间:" + LocalDateTime.now());}
}
5. 测试
@SpringBootTest
public class DelayQueueTest {@Autowiredprivate DelayMessageSender sender;@Testpublic void testDelayMessage() {// 发送一条延迟5秒的消息sender.sendDelayMessage("订单超时取消提醒", 5000);}
}

输出结果

发送延迟消息:订单超时取消提醒,延迟时间:5000ms
(5秒后)
收到延迟消息:订单超时取消提醒,时间:2025-08-18T15:30:05

注意事项

  1. 延迟精度:延迟插件的精度较高,但受 RabbitMQ 服务器负载影响,可能存在毫秒级偏差。
  2. 消息持久化:若需保证消息不丢失,需将队列、交换机和消息都设置为持久化(durable)。
  3. 插件版本兼容:延迟插件版本需与 RabbitMQ 版本匹配,否则可能无法正常工作。
  4. 避免消息堆积:延迟队列中的消息在延迟时间到达前会暂存在内存或磁盘中,需合理设置队列容量,避免堆积过多消息影响性能。

四、事务

RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也⽀持事务机制. Spring AMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原⼦性的, 要么全部成功, 要么全部失败

rabbitTemplate.setChannelTransacted(true); 是 Spring AMQP 中用于开启 RabbitMQ 事务模式的配置,它会为 RabbitTemplate 操作的信道(Channel)启用事务支持。

相关配置

@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}@Bean("transQueue")public Queue transQueue() {return QueueBuilder.durable("trans_queue").build();}}

生产者代码

@RequestMapping("/trans")
@RestController
public class TransactionProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Transactional@RequestMapping("/send")public String send(){rabbitTemplate.convertAndSend("","transQueue", "trans teat 1......");int a = 5/0;rabbitTemplate.convertAndSend("","transQueue", "trans teat 2......");return "发送成功";}
}
  1. 不加 @Transactional , 会发现消息1发送成功
  2. 添加 @Transactional , 消息1和消息2全部发送失败

五、消息分发

 消息分发机制主要有两种应用场景:

  1. 限流
  2. 负载均衡

5.1 介绍

当队列拥有多个消费者时,RabbitMQ默认会通过轮询的方式将消息平均的分发给每个消费者,但是没有可能其中一部分消费者消费消息的速度很快,另一部分消费者消费很慢呢?其实是有可能的,那么这就有可能导致这个系统的吞吐量下降,那如何分发消息才是合理的?在前面学习RabbitMQ JDK Client 时,我们可以通过 channel.basicQos(int prefetchCount) 来设置当前信道的消费者所能拥有的最大未确认消息数量,在Spring AMQP中我们可以通过配置 prefetch 来达到同样的效果,使用消息分发机制时消息确认机制必须为手动确认。
 

5.2 限流

        在秒杀场景下,假设订单系统每秒能处理的订单数是10000,但是秒杀场景下可能某一瞬间会有50000订单数,这就会导致订单系统处理不过来而压垮。可以利用basicQos()来进行限流:

  1.   SpringBoot配置文件用prefetch控制限流数,对应channel.basicQos(int prefetchCount)的prefetchCount。
  2.    开启消息确认机制的手动确认模式manual。未手动确认的消息都视为未消费完的消费,prefetchCount并不会-1。

配置信息:

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@8.140.60.17:5672/xibbeilistener:simple:acknowledge-mode: manual #消息接收确认(MQ-消费者):none(自动确认)、auto(正常自动确认,异常不确认)、manual(手动确认)prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量retry:enabled: true           # 启用重试机制initial-interval: 5000ms # 初始重试间隔5秒max-attempts: 5         # 最多重试5次

 声明队列和交换机:

public class RabbitMQConnection {public static final String QOS_QUEUE = "qos.queue";public static final String QOS_EXCHANGE = "qos.exchange";}@Configuration
public class QosConfig {@Bean("qosQueue")public Queue qosQueue(){return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange(){return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();}
}

生产者代码:

@RestController@RequestMapping("/producer")public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("qos")public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(RabbitMQConnection.QOS_EXCHANGE, "qos", "Hello SpringBoot RabbitMQ");}return "发送成功";}}

消费者代码:

@Component
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消费者逻辑System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(2000);
//            System.out.println("业务处理完成");//肯定确认channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定确认channel.basicNack(deliveryTag, false, true);}}}

5.3负载均衡

 负载均衡主要是根据不同消费者消费消息的速度来协调它们的压力,比如一个消费者处理消息快,另一个消费者处理消息满,那么就可以配置 prefetch(如配置prefetch为1),就可以使这些消费者还未处理完当前消息,不允许处理下一条,这样就可以使处理消息满的消费者可以慢慢处理一条消息,而处理消息快的消费者,可以在处理完一条消息后,继续处理下一条
 

代码示例:

一、修改 prefetch 配置

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@8.140.60.17:5672/xibbeilistener:simple:acknowledge-mode: manual  # 手动确认模式prefetch: 1               # 每次只预取1条消息

二、修改消费者代码(取消手动确认的注释并新增一个消费者)

@Component
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消费者逻辑System.out.printf("111接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(2000);
//            System.out.println("业务处理完成");//肯定确认channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定确认channel.basicNack(deliveryTag, false, true);}}@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage2(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消费者逻辑System.out.printf("222接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());Thread.sleep(1000);
//            System.out.println("业务处理完成");//肯定确认channel.basicAck(deliveryTag,false);} catch (Exception e) {//否定确认channel.basicNack(deliveryTag, false, true);}}
}

三、测试

接收到消息: qos test...1, deliveryTag: 1
消费者2接收到消息: qos test...0, deliveryTag: 1
接收到消息: qos test...2, deliveryTag: 2
接收到消息: qos test...3, deliveryTag: 3
接收到消息: qos test...4, deliveryTag: 4
接收到消息: qos test...5, deliveryTag: 5
消费者2接收到消息: qos test...6, deliveryTag: 2
接收到消息: qos test...7, deliveryTag: 6
接收到消息: qos test...8, deliveryTag: 7
接收到消息: qos test...9, deliveryTag: 8
接收到消息: qos test...10, deliveryTag: 9
消费者2接收到消息: qos test...11, deliveryTag: 3
接收到消息: qos test...12, deliveryTag: 10
接收到消息: qos test...13, deliveryTag: 11
接收到消息: qos test...14, deliveryTag: 12
接收到消息: qos test...15, deliveryTag: 13
消费者2接收到消息: qos test...16, deliveryTag: 4
接收到消息: qos test...17, deliveryTag: 14
接收到消息: qos test...18, deliveryTag: 15
接收到消息: qos test...19, deliveryTag: 16
deliveryTag 有重复是因为两个消费者使⽤的是不同的Channel, 每个 Channel 上的
deliveryTag 是独⽴计数的
http://www.dtcms.com/a/337821.html

相关文章:

  • 【展厅多媒体】互动地砖屏怎么提升展厅互动感的?
  • python基于机器学习进行数据处理与预测(火灾的三因素回归问题)
  • 探索机器学习:从核心概念到实战应用
  • 精通sqlmap tamper:WAF绕过实战技巧剖析
  • 磁流变液迟滞性能的机器学习软件设计
  • MySQL实战优化高手教程 – 从架构原理到生产调优
  • 突破成长瓶颈:产品运营能力体系化提升技巧
  • 大数据毕业设计选题推荐:基于Hadoop+Spark的城镇居民食品消费分析系统源码
  • 28、企业安防管理(Security)体系构建:从生产安全到日常安保的全方位防护
  • 【秋招笔试】2025.08.16科大讯飞秋招机考真题
  • 从虚拟到现实:数字孪生赋能智能制造
  • 跨设备文件共享优化:cpolar 提升 PicoShare 访问速度方案
  • Nextcloud 私有云部署:cpolar 内网穿透服务实现安全远程文件访问
  • 知识点 | 麒麟OS环境中curl -4回显真实IP的原因
  • Nextcloud容器化部署革新:Docker+Cpolar构建高效私有云远程访问新架构
  • Harmonyos之字体设置功能
  • 什么是Hystrix?实现原理是什么?
  • Hadoop - 1:Hadoop 技术解析;Hadoop是什么;Hadoop优势;Hadoop组成;HDFS、YARN、MapReduce 三者关系
  • docker——docker执行roslaunch显示错误
  • listagg 多了空格 Oracle数据库
  • 【嵌入式人工智能产品开发实战】(二十四)—— 政安晨:解释一下小智AI项目中析构函数的应用
  • McCabe 环形复杂度
  • Owen大规模文本嵌入生成
  • PMP-项目管理-十大知识领域:风险管理-识别、评估、应对项目风险
  • nsfp-
  • 《Image Classification with Classic and Deep Learning Techniques》复现
  • 地图导航怎么测?
  • 深入浅出决策树
  • 决策树总结
  • 视觉语言导航(9)——位置编码 VLNBERT与HAMT 记忆模块 3.3后半段