当前位置: 首页 > news >正文

RabbitMQ -- 高级特性

RabbitMQ高级特性有:

  • 发送⽅消息确认
  • 持久化
  • 消费端消息确认
  • 重试机制
  • TTL
  • 死信队列
  • 延迟队列
  • 事务
  • 消息分发

其中,发送⽅消息确认,持久化,消费端消息确认这三个上一篇已经讲过了,这三个特性是用来保证消息的可靠传输。

重试机制

在消息传递过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可⽤, 资源不⾜等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送.但如果是程序逻辑引起的错误, 那么多次重试也是没有⽤的, 可以设置重试次数。

配置重试机制

spring:rabbitmq:addresses: amqp://admin:admin@localhost:5672/studylistener:simple:#acknowledge-mode: manualacknowledge-mode: auto  #消息接收确认#acknowledge-mode: noneretry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时⻓为5秒max-attempts: 5 # 最⼤重试次数(包括⾃⾝消费的⼀次)

配置交换机&队列

public class Constants {//重试机制public static final String RETRY_QUEUE = "retry.queue";public static final String RETRY_EXCHANGE = "retry.exchange";
}@Configuration
public class RabbitMQConfig {//重试机制@Bean("retryQueue")public Queue retryQueue(){return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();}@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryExchange") DirectExchange exchange, @Qualifier("retryQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("retry");}}

发送消息

		@RequestMapping("/retry")public String retry(){System.out.println("retry...");rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test ...");return "消息发送成功";}

消费消息

@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s,deliveryTag: %s \\n", new String(message.getBody(), "UTF-8"), deliveryTag);int num = 3 / 0;System.out.println("业务处理完成");}
}

运⾏程序, 观察结果

retry...
[retry.queue]接收到消息: retry test ...,deliveryTag: 1 
[retry.queue]接收到消息: retry test ...,deliveryTag: 1 
[retry.queue]接收到消息: retry test ...,deliveryTag: 1 
[retry.queue]接收到消息: retry test ...,deliveryTag: 1 
[retry.queue]接收到消息: retry test ...,deliveryTag: 1 
2025-10-20T00:45:00.555+08:00  WARN 1146 --- [ntContainer#1-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'retry test ...' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=retry.exchange, receivedRoutingKey=retry, deliveryTag=1, consumerTag=amq.ctag-xEKmLExm_5vjI5e-DRu5qw, consumerQueue=retry.queue])org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.riggie.extension.listener.RetryListener.handlerMessage(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.io.UnsupportedEncodingException' threw exception
......

如果对异常进⾏捕获, 那么就不会进⾏重试 代码修改如下:(记得要把 acknowledge-mode 改为*manual ,不然还会报错*)

		@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s,deliveryTag: %s \\n", new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3 / 0;System.out.println("业务处理完成");channel.basicAck(deliveryTag,false);} catch (Exception e) {channel.basicNack(deliveryTag,false,true);}}

重新运⾏程序, 结果如下:

[retry.queue]接收到消息: retry test ...,deliveryTag: 1 
[retry.queue]接收到消息: retry test ...,deliveryTag: 2 
[retry.queue]接收到消息: retry test ...,deliveryTag: 3 
[retry.queue]接收到消息: retry test ...,deliveryTag: 4 
[retry.queue]接收到消息: retry test ...,deliveryTag: 5 
[retry.queue]接收到消息: retry test ...,deliveryTag: 6 
[retry.queue]接收到消息: retry test ...,deliveryTag: 7 
[retry.queue]接收到消息: retry test ...,deliveryTag: 8 
[retry.queue]接收到消息: retry test ...,deliveryTag: 9 
[retry.queue]接收到消息: retry test ...,deliveryTag: 10 
[retry.queue]接收到消息: retry test ...,deliveryTag: 11 
...........

他就会一直重新入对,再发送给消费者,因为 basicNack(deliveryTag,false,true); 具体请看上一篇 “保障消息可靠性”。

可以看到, ⼿动确认模式时, 重试次数的限制不会像在⾃动确认模式下那样直接⽣效, 因为是否重试以及何时重试更多地取决于应⽤程序的逻辑和消费者的实现.

⾃动确认模式下, RabbitMQ 会在消息被投递给消费者后⾃动确认消息. 如果消费者处理消息时抛出异常, RabbitMQ 根据配置的重试参数⾃动将消息重新⼊队, 从⽽实现重试. 重试次数和重试间隔等参数可以直接在RabbitMQ的配置中设定,并且RabbitMQ会负责执⾏这些重试策略.

⼿动确认模式下, 消费者需要显式地对消息进⾏确认. 如果消费者在处理消息时遇到异常, 可以选择不确认消息使消息可以重新⼊队. 重试的控制权在于应⽤程序本⾝, ⽽不是RabbitMQ的内部机制. 应⽤程序可以通过⾃⼰的逻辑和利⽤RabbitMQ的⾼级特性来实现有效的重试策略

<aside> 💡

使⽤重试机制时需要注意:

  1. ⾃动确认模式下: 程序逻辑异常, 多次重试还是失败, 消息就会被⾃动确认, 那么消息就丢失了
  2. ⼿动确认模式下: 程序逻辑异常, 多次重试消息依然处理失败, ⽆法被确认, 就⼀直是unacked的状态, 导致消息积压。 </aside>

TTL

TTL(Time to Live, 过期时间), 即过期时间. RabbitMQ可以对消息和队列设置TTL.当消息到达存活时间之后, 还没有被消费, 就会被⾃动清除.

设置消息的TTL

⽬前有两种⽅法可以设置消息的TTL.

  1. 设置队列的TTL, 队列中所有消息都有相同的过期时间.
  2. 对消息本⾝进⾏单独设置, 每条消息的TTL可以不同. 如果两种⽅法⼀起使⽤, 则消息的TTL以两者之间较⼩的那个数值为准.

先看针对每条消息设置TTL.

针对每条消息设置TTL的⽅法是在发送消息的⽅法中加⼊expiration的属性参数,单位为毫秒.

配置交换机&队列

public class Constants {//ttlpublic static final String TTL_QUEUE = "ttl.queue";public static final String TTL_QUEUE2 = "ttl.queue2";public static final String TTL_EXCHANGE = "ttl.exchange";
}@Configuration
public class RabbitMQConfig {//ttl//未设置ttl@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlExchange") DirectExchange exchange, @Qualifier("ttlQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("ttl");}
}

发送消息

		@RequestMapping("/ttl")public String ttl(){System.out.println("ttl");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test 10s ...", message -> {message.getMessageProperties().setExpiration("10000");//10sreturn message;});return "消息发送成功";}

运⾏程序, 观察结果

http://127.0.0.1:8080/product/ttl

发送消息后, 可以看到, Ready消息为2

10秒钟之后, 刷新⻚⾯, 发现消息已被删除

如果不设置TTL,则表⽰此消息不会过期;如果将TTL设置为0,则表⽰除⾮此时可以直接将消息投递到消费者,否则该消息会被⽴即丢弃

  • 这有一个非常有意思的小实验:

    		@RequestMapping("/ttl")public String ttl(){System.out.println("ttl");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test 30s ...", message -> {message.getMessageProperties().setExpiration("30000");return message;});rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test 10s ...", message -> {message.getMessageProperties().setExpiration("10000");return message;});return "消息发送成功";}
    
    		@RequestMapping("/ttl")public String ttl(){System.out.println("ttl");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test 10s ...", message -> {message.getMessageProperties().setExpiration("10000");return message;});rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test 30s ...", message -> {message.getMessageProperties().setExpiration("30000");return message;});return "消息发送成功";}
    

    上面这两个代码,有什么区别,也就是在队列中会发生什么样的效果,假如访问5次地址,也就是会产生 10 条消息,是 5条消息10s后消失,另外5条消息30s后消失,还是30s后消息全都消失呢。咋们来看看。

    先看第一种情况:发送10条消息后:

    10s后:

    30s后:

    再来看看第二种情况:发送10条消息后:

    10s后:

    30s后:

为什么第二种会出现这样的情况呢?

这是因为对于设置消息TTL的⽅式,每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除。

对于第二种情况来说,这10条消息的过期时间分别是 10s ,30s,10s ,30s,10s ,30s,10s ,30s,10s ,30s。10s之后第一条消息过期,扔出队列,队列中消息总数来到九,来到第二条消息,过期时间是30s,先检测是否到达过期时间,没有过期,那30s之后,第二条消息过期,队列中消息总数来到八,当来到第三条消息的时候,检测过期时间,发现这条消息已经超过过期时间,直接扔掉,后面第四条第五条都是如此,所以才会出现,10 → 9 → 0 这种情况。

那么第一种情况一下子全都消息,从 10 → 0 的情况大家也都应该能想清楚了吧。10条消息的过期时间分别是30s,10s ,30s,10s ,30s,10s ,30s,10s ,30s,10s。30s后第一条消息过期,从队列中移除,检测第二条消息的时候,发现已经过期,就移除队列,后面第三条第四条都一样,只是这进行的非常快,导致会出现一下子全都一出队列的感觉。

设置队列的TTL

设置队列TTL的⽅法是在创建队列时, 加⼊ x-message-ttl 参数实现的, 单位是毫秒。

配置队列和绑定关系

public class Constants {//ttlpublic static final String TTL_QUEUE = "ttl.queue";public static final String TTL_QUEUE2 = "ttl.queue2";public static final String TTL_EXCHANGE = "ttl.exchange";
}Configuration
public class RabbitMQConfig {//设置ttl//方式一@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build(); //设置队列的ttl为20s}//方式二@Bean("ttlQueue3")public Queue ttlQueue3(){Map<String,Object> map = new HashMap<>();map.put("x-message-ttl",20000);return QueueBuilder.durable(Constants.TTL_QUEUE2).withArguments(map).build(); //设置队列的ttl为20s}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue, @Qualifier("ttlExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl");}
}

发送消息

		@RequestMapping("/ttl2")public String ttl2(){System.out.println("ttl2");//发送普通消息rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test ...");return "ttl2消息发送成功";}

运⾏程序, 观察结果

运⾏之后发现,新增了⼀个队列, 队列Features有⼀个TTL标识

调⽤接⼝, 发送消息:

http://127.0.0.1:8080/product/ttl2

20s之后,刷新页面,发现消息已经被清除。

由于ttl_queue队列, 未设置过期时间, 所以ttl_queue的消息未删除。

两者区别

设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除

设置消息TTL的⽅法, 即使消息过期, 也不会⻢上从队列中删除, ⽽是在即将投递到消费者之前进⾏判定的.

为什么这两种⽅法处理的⽅式不⼀样?

因为设置队列过期时间, 队列中已过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否有过期的消息即可.⽽设置消息TTL的⽅式, 每条消息的过期时间不同, 如果要删

除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可。

死信队列

要讲死信队列之前就要理解什么是死信。

什么是死信

死信(dead message) 简单理解就是因为种种原因, ⽆法被消费的信息, 就是死信.

有死信, ⾃然就有死信队列. 当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器中,这个交换器就是DLX( Dead Letter Exchange ), 绑定DLX的队列, 就称为死信队列(DeadLetter Queue,简称DLQ).

消息变成死信⼀般是由于以下⼏种情况:

  1. 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false.
  2. 消息过期.
  3. 队列达到最⼤⻓度

声明队列和交换机

包含两部分:

  • 声明正常的队列和正常的交换机
  • 声明死信队列和死信交换机

死信交换机和死信队列和普通的交换机, 队列没有区别。

正常队列绑定死信交换机

当这个队列中存在死信时, RabbitMQ会⾃动的把这个消息重新发布到设置的DLX上, 进⽽被路由到另⼀个队列, 即死信队列.可以监听这个死信队列中的消息以进⾏相应的处理

public class Constants {//dlpublic static final String NORMAL_QUEUE = "normal.queue";public static final String NORAML_EXCHANGE = "normal.exchange";public static final String DL_QUEUE = "dl.queue";public static final String DL_EXCHANGE = "dl.exchange";
}@Configuration
public class DLConfig {@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE) //正常队列绑定死信交换机.deadLetterRoutingKey("dlx")//设置发送给死信队列的RoutingKey.ttl(10000)//过期时间.maxLength(10) //队列最大长度.build();}//也可以这么写//@Bean("normalQueue")//public Queue normalQueue() {//		 Map<String, Object> arguments = new HashMap<>();//		 arguments.put("x-dead-letter-exchange",Constant.DLX_EXCHANGE_NAME);//绑定死信队列//		 arguments.put("x-dead-letter-routing-key","dlx");//设置发送给死信队列的RoutingKey//		 return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();//}@Bean("noramlExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORAML_EXCHANGE).build();}@Bean("normalBingding")public Binding normalBinding(@Qualifier("noramlExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}//死信交换机和死信队列@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlExchange") Exchange exchange, @Qualifier("dlQueue")Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}}

过期时间和队列最大长度都是制造死信产⽣的条件。

发送消息

		@RequestMapping("/dl")public String dl(){System.out.println("dl ....");
//        //发送普通消息rabbitTemplate.convertAndSend(Constants.NORAML_EXCHANGE,"normal","dl test..");System.out.printf("%tc 消息发送成功 \\n", new Date());
//        //测试队列长度
//        for (int i = 0; i < 20; i++) {
//            rabbitTemplate.convertAndSend(Constants.NORAML_EXCHANGE,"normal","dl test.." + i);
//        }//测试消息拒收//rabbitTemplate.convertAndSend(Constants.NORAML_EXCHANGE,"normal","dl test..");return "dl消息发送成功。。";}

测试死信

观察队列

队列Features说明:

D: durable的缩写, 设置持久化

TTL: Time to Live, 队列设置了TTL

Lim: 队列设置了⻓度(x-max-length)

DLX: 队列设置了死信交换机(x-dead-letter-exchange)

DLK: 队列设置了死信RoutingKey(x-dead-letter-routing-key)

测试过期时间, 到达过期时间之后, 进⼊死信队列

http://127.0.0.1:8080/product/dl

10s之后,消息就会进入到死信队列。

⽣产者⾸先发送⼀条消息,然后经过交换器(normal_exchange)顺利地存储到队列(normal_queue)中.由于队列normal_queue设置了过期时间为10s, 在这10s内没有消费者消费这条消息, 那么判定这条消息过期. 由于设置了DLX, 过期之时, 消息会被丢给交换器(dlx_exchange)中, 这时根据RoutingKey匹配,找到匹配的队列(dlx_queue), 最后消息被存储在queue.dlx这个死信队列中.

测试达到队列⻓度, 消息进⼊死信队列

http://127.0.0.1:8080/product/dl

队列⻓度设置为10, 我们发送20条数据, 那么就会有10条数据直接进⼊到死信队列

10s之后,剩下的10条消息也会进入到死信队列里面。

测试消息拒收

写消费者代码, 并强制异常, 测试拒绝签收

编写消费者代码:

@Component
public class DLListener {@RabbitListener(queues = Constants.NORMAL_QUEUE)public void handMessage(Message message, Channel channel) throws IOException, InterruptedException {long deliberytag = message.getMessageProperties().getDeliveryTag();try{//消费者逻辑System.out.printf("[normal.queue]接收到消息:%s, deliberytag: %d \\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//进行业务逻辑处理System.out.println("业务逻辑处理");//模拟处理失败int num = 3 / 0;System.out.println("业务处理完成");//肯定确认channel.basicAck(deliberytag,false);} catch (Exception e){Thread.sleep(1000);//否定确认,拒绝接受//第二个false,代表不进入队列。那么此时就会进入死信队列channel.basicNack(deliberytag,false,false);}}@RabbitListener(queues = Constants.DL_QUEUE)public void dlHandMessage(Message message, Channel channel) throws IOException {//消费者逻辑System.out.printf("[dl.queue] %tc 接收到消息: %s, deliberytag: %d \\n", new Date(),new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());}
}

再次访问 http://127.0.0.1:8080/product/dl

查看控制台打印的日志信息

.....[normal.queue]接收到消息:dl test.., deliberytag: 1 
业务逻辑处理
[dl.queue] 周五 10月 24 00:42:03 CST 2025 接收到消息: dl test.., deliberytag: 22 .....
  • 常⻅⾯试题

    死信队列作为RabbitMQ的⾼级特性,也是⾯试的⼀⼤重点.

    1. 死信队列的概念 死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些⽆法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列⽤于存储这些死信消息

    2. 死信的来源

      1. 消息过期: 消息在队列中存活的时间超过了设定的TTL
      2. 消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新⼊队(requeue=false), 消息也会成为死信.
      3. 队列满了: 当队列达到最⼤⻓度, ⽆法再容纳新的消息时, 新来的消息会被处理为死信.
    3. 死信队列的应⽤场景 对于RabbitMQ来说, 死信队列是⼀个⾮常有⽤的特性. 它可以处理异常情况下,消息不能够被消费者正确消费⽽被置⼊死信队列中的情况, 应⽤程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况, 进⽽可以改善和优化系统.

      ⽐如: ⽤⼾⽀付订单之后, ⽀付系统会给订单系统返回当前订单的⽀付状态为了保证⽀付信息不丢失, 需要使⽤到死信队列机制. 当消息消费异常时, 将消息投⼊到死信队列中, 由订单系统的其他消费者来监听这个队列, 并对数据进⾏处理(⽐如发送⼯单等,进⾏⼈⼯确认).

      场景的应⽤场景还有:

      • 消息重试:将死信消息重新发送到原队列或另⼀个队列进⾏重试处理.
      • 消息丢弃:直接丢弃这些⽆法处理的消息,以避免它们占⽤系统资源.
      • ⽇志收集:将死信消息作为⽇志收集起来,⽤于后续分析和问题定位.

延迟队列

概念

延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费.

应⽤场景

延迟队列的使⽤场景有很多, ⽐如:

  1. 智能家居: ⽤⼾希望通过⼿机远程遥控家⾥的智能设备在指定的时间进⾏⼯作. 这时候就可以将⽤⼾ 指令发送到延迟队列, 当指令设定的时间到了再将指令推送到智能设备.
  2. ⽇常管理: 预定会议后,需要在会议开始前⼗五分钟提醒参会⼈参加会议
  3. ⽤⼾注册成功后, 7天后发送短信, 提⾼⽤⼾活跃度等
  4. ......

RabbitMQ本⾝没有直接⽀持延迟队列的的功能, 但是可以通过前⾯所介绍的TTL+死信队列的⽅式组合模拟出延迟队列的功能.

所以死信队列章节展⽰的也是延迟队列的使⽤.

TTL+死信队列实现

代码实现

先看TTL+死信队列实现延迟队列,继续沿⽤死信队列的代码即可.

@Configuration
public class DLConfig {@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE) //正常队列绑定死信交换机.deadLetterRoutingKey("dlx")//设置发送给死信队列的RoutingKey.build();}@Bean("noramlExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORAML_EXCHANGE).build();}@Bean("normalBingding")public Binding normalBinding(@Qualifier("noramlExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}//死信交换机和死信队列@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlExchange") Exchange exchange, @Qualifier("dlQueue")Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}}

⽣产者:

		@RequestMapping("delay")public String delay(){System.out.println("delay ....");rabbitTemplate.convertAndSend(Constants.NORAML_EXCHANGE,"normal","delay test ...",message -> {message.getMessageProperties().setExpiration("10000"); //单位:毫秒,过期时间为10sreturn message;});rabbitTemplate.convertAndSend(Constants.NORAML_EXCHANGE,"normal","delay test ...",message -> {message.getMessageProperties().setExpiration("30000"); //单位:毫秒,过期时间为10sreturn message;});return "delay发送成功";}

消费者:

@Component
public class DLListener {@RabbitListener(queues = Constants.DL_QUEUE)public void dlHandMessage(Message message, Channel channel) throws IOException {//消费者逻辑System.out.printf("[dl.queue] %tc 接收到消息: %s, deliberytag: %d \\n", new Date(),new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());}
}

调⽤接⼝, 发送数据: http://127.0.0.1:8080/product/delay 通过控制台观察死信队列消费情况:

delay ....
[dl.queue] 周五 10月 24 23:30:21 CST 2025 接收到消息: delay test ..., deliberytag: 1 
[dl.queue] 周五 10月 24 23:30:41 CST 2025 接收到消息: delay test ..., deliberytag: 2 

可以看到, 两条消息按照过期时间依次进⼊了死信队列.

延迟队列, 就是希望等待特定的时间之后, 消费者才能拿到这个消息. TTL刚好可以让消息延迟⼀段时间成为死信, 成为死信的消息会被投递到死信队列⾥, 这样消费者⼀直消费死信队列⾥的消息就可以了.

存在问题

接下来把⽣产消息的顺序修改⼀下,先发送20s过期数据, 再发送10s过期数据

		@RequestMapping("delay2")public String delay2(){System.out.println("delay ....");rabbitTemplate.convertAndSend(Constants.NORAML_EXCHANGE,"normal","delay test ...",message -> {message.getMessageProperties().setExpiration("30000"); //单位:毫秒,过期时间为10sreturn message;});rabbitTemplate.convertAndSend(Constants.NORAML_EXCHANGE,"normal","delay test ...",message -> {message.getMessageProperties().setExpiration("10000"); //单位:毫秒,过期时间为10sreturn message;});return "delay2发送成功";}

调⽤接⼝, 发送数据: http://127.0.0.1:8080/product/delay2 通过控制台观察死信队列消费情况:

delay ....
[dl.queue] 周五 10月 24 23:39:29 CST 2025 接收到消息: delay test ..., deliberytag: 1 
[dl.queue] 周五 10月 24 23:39:29 CST 2025 接收到消息: delay test ..., deliberytag: 2 

这时会发现: 10s过期的消息, 也是在20s后才进⼊到死信队列.

消息过期之后, 不⼀定会被⻢上丢弃. 因为RabbitMQ只会检查队⾸消息是否过期, 如果过期则丢到死信队列. 此时就会造成⼀个问题, 如果第⼀个消息的延时时间很⻓, 第⼆个消息的延时时间很短, 那第⼆个消息并不会优先得到执⾏.

所以在考虑使⽤TTL+死信队列实现延迟任务队列的时候, 需要确认业务上每个任务的延迟时间是⼀致的, 如果遇到不同的任务类型需要不同的延迟的话, 需要为每⼀种不同延迟时间的消息建⽴单独的消息队列.

延迟队列插件

RabbitMQ官⽅也提供了⼀个延迟的插件来实现延迟的功能 参考: https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq

插件下载地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

具体的参考官网介绍,或者在网上搜索一下,这里就不详细介绍了。

常⻅⾯试题

延迟队列作为RabbitMQ的⾼级特性,也是⾯试的⼀⼤重点. 介绍下RabbitMQ的延迟队列 延迟队列是⼀个特殊的队列, 消息发送之后, 并不⽴即给消费者, ⽽是等待特定的时间, 才发送给消费者. 延迟队列的应⽤场景有很多, ⽐如:

  1. 订单在⼗分钟内未⽀付⾃动取消
  2. ⽤⼾注册成功后, 3天后发调查问卷
  3. ⽤⼾发起退款, 24⼩时后商家未处理, 则默认同意, ⾃动退款
  4. ......

但RabbitMQ本⾝并没直接实现延迟队列, 通常有两种⽅法:

  1. TTL+死信队列组合的⽅式
  2. 使⽤官⽅提供的延迟插件实现延迟功能

⼆者对⽐:

  1. 基于死信实现的延迟队列 a. 优点: 1) 灵活不需要额外的插件⽀持 b. 缺点: 1) 存在消息顺序问题 2) 需要额外的逻辑来处理死信队列的消息, 增加了系统的复杂性
  2. 基于插件实现的延迟队列 a. 优点: 1) 通过插件可以直接创建延迟队列, 简化延迟消息的实现. 2) 避免了DLX的时序问题 b. 缺点: 1) 需要依赖特定的插件, 有运维⼯作 2) 只适⽤特定版本

事务

RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也⽀持事务机制. SpringAMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原⼦性的, 要么全部成功, 要么全部失败.

  • 不采用事务
public class Constants {// 事务public static final String TRANS_QUEUE = "trans.queue";
}
		@RequestMapping("/trans")public String trans(){System.out.println("trans test ...");rabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans test 1 ...");int num  = 5 / 0;rabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans test 2 ...");return "消息发送成功...";}

访问 http://127.0.0.1:8080/product/trans

观察控制台

trans test ...
2025-10-25T19:43:36.205+08:00 ERROR 55049 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: java.lang.ArithmeticException: / by zero] with root cause
.....

不采用事务的方式,第一条消息成功,第二条消息失败。

  • 采用事务

配置事务管理器

		@Bean("transRabbitTemplate")public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true); //开始事务return rabbitTemplate;}@Beanpublic RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);}

⽣产者

import com.riggie.extension.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/product")
@RestController
public class ProducerController {@Resource(name = "transRabbitTemplate")private RabbitTemplate transRabbitTemplate;@Transactional@RequestMapping("/trans2")public String trans2(){System.out.println("trans test ...");transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans test 1 ...");int num  = 5 / 0;transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans test 2 ...");return "消息发送成功...";}}

访问 http://127.0.0.1:8080/product/trans

观察控制台

trans test ...
2025-10-25T23:25:13.925+08:00 ERROR 60842 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: java.lang.ArithmeticException: / by zero] with root cause

不加 @Transactional , 会发现消息1发送成功

添加 @Transactional , 消息1和消息2全部发送失败

消息分发

概念

RabbitMQ队列拥有多个消费者时, 队列会把收到的消息分派给不同的消费者. 每条消息只会发送给订阅列表⾥的⼀个消费者. 这种⽅式⾮常适合扩展, 如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可.

默认情况下, RabbitMQ是以轮询的⽅法进⾏分发的, ⽽不管消费者是否已经消费并已经确认了消息. 这种⽅式是不太合理的, 试想⼀下, 如果某些消费者消费速度慢, ⽽某些消费者消费速度快, 就可能会导致某些消费者消息积压, 某些消费者空闲, 进⽽应⽤整体的吞吐量下降.

如何处理呢? 我们可以使⽤前⾯章节讲到的channel.basicQos(int prefetchCount) ⽅法, 来限制当前信道上的消费者所能保持的最⼤未确认消息的数量

⽐如: 消费端调⽤了 channelbasicQos(5) , RabbitMQ会为该消费者计数, 发送⼀条消息计数+1, 消费⼀条消息计数-1, 当达到了设定的上限, RabbitMQ就不会再向它发送消息了,直到消费者确认了某条消息.类似TCP/IP中的"滑动窗⼝".

prefetchCount 设置为0时表⽰没有上限. basicQos 对拉模式的消费⽆效(后⾯再讲)

应⽤场景

消息分发的常⻅应⽤场景有如下:

  1. 限流
  2. ⾮公平分发

限流

如下使⽤场景:

订单系统每秒最多处理5000请求, 正常情况下, 订单系统可以正常满⾜需求

但是在秒杀时间点, 请求瞬间增多, 每秒1万个请求, 如果这些请求全部通过MQ发送到订单系统, ⽆疑会把订单系统压垮.

RabbitMQ提供了限流机制, 可以控制消费端⼀次只拉取N个请求

通过设置prefetchCount参数, 同时也必须要设置消息应答⽅式为⼿动应答

prefetchCount: 控制消费者从队列中预取(prefetch)消息的数量, 以此来实现流控制和负载均衡

  1. 配置prefetch参数, 设置应答⽅式为⼿动应答
spring:rabbitmq:addresses: amqp://admin:admin@localhost:5672/studylistener:simple:acknowledge-mode: manualprefetch: 5
  1. 配置交换机, 队列
public class Constants {//限流public static final String QOS_QUEUE = "qos.queue";public static final String QOS_EXCHANGE = "qos.exchange";
}@Configuration
public class QOSConfig {@Bean("qosQueue")public Queue qosQueue(){return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qosExchange")public DirectExchange qosExchange(){return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}@Bean("QOSBinding")public Binding qosBinding(@Qualifier("qosExchange")Exchange exchange, @Qualifier("qosQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();}
}
  1. 发送消息, ⼀次发送20条消息
    @RequestMapping("/qos")public String qos(){System.out.println("qos test ...");//发送普通消息for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE,"qos","qos test ..." + i);}return "qos消息发送成功";}
  1. 消费者监听
@Component
public class QOSListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息: %s,deliveryTag: %d \\n",new String(message.getBody()),deliveryTag);//手动签收//channel.basicAck(deliveryTag,true);}}
  1. 测试

调⽤接⼝, 发送消息

发送消息时, 需要先把⼿动确认注掉, 不然会直接消费掉

可以看到,控制台只打印了5条消息

接收到消息: qos test ...0,deliveryTag: 1 
接收到消息: qos test ...1,deliveryTag: 2 
接收到消息: qos test ...2,deliveryTag: 3 
接收到消息: qos test ...3,deliveryTag: 4 
接收到消息: qos test ...4,deliveryTag: 5 

可以看到, ready, 也就是待发送15条, 未确认的5条(因为代码未⼿动ack)

把 prefetch: 5 注掉, 再观察运⾏结果

从⽇志和控制台上可以看到: 消费者会⼀次性把20条消息全部收到

接收到消息: qos test ...0,deliveryTag: 1 
接收到消息: qos test ...1,deliveryTag: 2 
接收到消息: qos test ...2,deliveryTag: 3 
接收到消息: qos test ...3,deliveryTag: 4 
接收到消息: qos test ...4,deliveryTag: 5 
接收到消息: qos test ...5,deliveryTag: 6 
接收到消息: qos test ...6,deliveryTag: 7 
接收到消息: qos test ...7,deliveryTag: 8 
接收到消息: qos test ...8,deliveryTag: 9 
接收到消息: qos test ...9,deliveryTag: 10 
接收到消息: qos test ...10,deliveryTag: 11 
接收到消息: qos test ...11,deliveryTag: 12 
接收到消息: qos test ...12,deliveryTag: 13 
接收到消息: qos test ...13,deliveryTag: 14 
接收到消息: qos test ...14,deliveryTag: 15 
接收到消息: qos test ...15,deliveryTag: 16 
接收到消息: qos test ...16,deliveryTag: 17 
接收到消息: qos test ...17,deliveryTag: 18 
接收到消息: qos test ...18,deliveryTag: 19 
接收到消息: qos test ...19,deliveryTag: 20 

负载均衡

我们也可以⽤此配置,来实现"负载均衡"

比如, 在有两个消费者的情况下,⼀个消费者处理任务⾮常快, 另⼀个⾮常慢,就会造成⼀个消费者会⼀直很忙, ⽽另⼀个消费者很闲. 这是因为 RabbitMQ 只是在消息进⼊队列时分派消息. 它不考虑消费者未确认消息的数量.

我们可以使⽤设置prefetch=1 的⽅式, 告诉 RabbitMQ ⼀次只给⼀个消费者⼀条消息, 也就是说, 在处理并确认前⼀条消息之前, 不要向该消费者发送新消息. 相反, 它会将它分派给下⼀个不忙的消费者.

代码⽰例:

  1. 配置prefetch参数, 设置应答⽅式为⼿动应答
spring:rabbitmq:addresses: amqp://admin:admin@localhost:5672/studylistener:simple:acknowledge-mode: manualprefetch: 1
  1. 启动两个消费者
		@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage1(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消费者逻辑System.out.printf("[消费者1]接收到消息: %s,deliveryTag: %d \\n",new String(message.getBody()),deliveryTag);Thread.sleep(1000);//肯定确认channel.basicAck(deliveryTag,true);} catch (InterruptedException e) {//否定确认channel.basicNack(deliveryTag,false,true);}}@RabbitListener(queues = Constants.QOS_QUEUE)public void handMessage2(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消费者逻辑System.out.printf("[消费者2]接收到消息: %s,deliveryTag: %d \\n",new String(message.getBody()),deliveryTag);Thread.sleep(2000);//肯定确认channel.basicAck(deliveryTag,true);} catch (InterruptedException e) {//否定确认channel.basicNack(deliveryTag,false,true);}}
  1. 测试

调⽤接⼝, 发送消息,通过⽇志观察两个消费者消费的消息

[消费者2]接收到消息: qos test ...1,deliveryTag: 1 
[消费者1]接收到消息: qos test ...0,deliveryTag: 1 
[消费者1]接收到消息: qos test ...2,deliveryTag: 2 
[消费者2]接收到消息: qos test ...3,deliveryTag: 2 
[消费者1]接收到消息: qos test ...4,deliveryTag: 3 
[消费者1]接收到消息: qos test ...5,deliveryTag: 4 
[消费者2]接收到消息: qos test ...6,deliveryTag: 3 
[消费者1]接收到消息: qos test ...7,deliveryTag: 5 
[消费者1]接收到消息: qos test ...8,deliveryTag: 6 
[消费者2]接收到消息: qos test ...9,deliveryTag: 4 
[消费者1]接收到消息: qos test ...10,deliveryTag: 7 
[消费者1]接收到消息: qos test ...11,deliveryTag: 8 
[消费者2]接收到消息: qos test ...12,deliveryTag: 5 
[消费者1]接收到消息: qos test ...13,deliveryTag: 9 
[消费者1]接收到消息: qos test ...14,deliveryTag: 10 
[消费者2]接收到消息: qos test ...15,deliveryTag: 6 
[消费者1]接收到消息: qos test ...16,deliveryTag: 11 
[消费者1]接收到消息: qos test ...17,deliveryTag: 12 
[消费者2]接收到消息: qos test ...18,deliveryTag: 7 
[消费者1]接收到消息: qos test ...19,deliveryTag: 13 

deliveryTag 有重复是因为两个消费者使⽤的是不同的Channel, 每个 Channel 上的 deliveryTag 是独⽴计数的

http://www.dtcms.com/a/532505.html

相关文章:

  • 克隆网站后台asp.net 网站数据库
  • 零基础新手小白快速了解掌握服务集群与自动化运维(十S四)储存服务-Ceph储存
  • 土壤侵蚀相关
  • 花卉网站建设规划书平台推广计划书模板范文
  • 如何使用C#编写DbContext与数据库连接
  • 从一到无穷大 #52:Lakehouse 不适用时序?打破范式 —— Catalog 架构选型复盘
  • 机器学习 (1) 监督学习
  • 从哪里找网络推广公司网站优化 毕业设计
  • Java如何将数据写入到PDF文件
  • 开发板网络配置
  • 14天备考软考-day1: 计组、操作系统(仅自用)
  • 企业网站模板包含什么有什么软件可以做网站
  • .gitignore 不生效问题——删除错误追踪的文件
  • 深度学习优化器详解
  • 做企业公示的数字证书网站wordpress有识图接口吗
  • 中国商标注册申请官网百度蜘蛛池自动收录seo
  • GitHub 热榜项目 - 日榜(2025-10-26)
  • 数据分析:指标拆解、异动归因类题目
  • 做网站需要那些软件设计建网站
  • Gorm(十二)乐观锁和悲观锁
  • neo4j图数据库笔记
  • 网页网站设计公司有哪些网站排名有什么用
  • 泉州做网站优化哪家好微信推广平台哪里找
  • 如何制作收费网站百度收录个人网站是什么怎么做
  • VsCode + Wsl:终极开发环境搭建指南
  • 深度学习——Logistic回归中的梯度下降法
  • 中国住房和城乡建设网网站学习网站大全
  • 【Android】ViewPager2实现手/自动轮播图
  • 产品营销网站可以做英语翻译兼职的网站
  • jQuery Mobile 图标:全面解析与应用指南