Rabbit 实战指南-学习笔记
第 4 章 RabbitMQ 进阶
mandatory 参数
当 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者,通过调用channel.addReturnListener 来添加 ReturnListener 监听器实现。当 mandatory 参数设置为 false 时,出现上述情形,则消息直接被丢弃。
channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());channel.addReturnListener(new ReturnListener() {public void handleReturn(int replyCode, String replyText,String exchange, String routingKey,AMQP.BasicProperties basicProperties,byte[] body) throws IOException {String message = new String(body);System.out.println("Basic.Return返回的结果是:" + message);}});
备份交换器
// 创建备份交换机、队列、声明绑定关系String myAeExchange = "myAe";channel.exchangeDeclare(myAeExchange, "fanout", true, false, null);channel.queueDeclare("unroutedQueue", true, false, false, null);channel.queueBind("unroutedQueue", myAeExchange, "");// 创建普通交换机, 并设置 alternate-exchange,队列、声明绑定关系Map<String, Object> args = new HashMap<String, Object>();args.put("alternate-exchange", myAeExchange);channel.exchangeDeclare("normalExchange", "direct", true, false, args);channel.queueDeclare("normalQueue", true, false, false, null);channel.queueBind("normalQueue", "normalExchange", "normalKey");
如果备份交换器和 mandatory 参数一起使用,那么 mandatory 参数无效。
4.2过期时间 TTL
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。channel.queueDeclare 方法中加入x-message-ttl 参数实现的,这个参数的单位是毫秒。
第二种方法是对消息本身进行单独设置,
每条消息的TTL 可以不同。如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的 TTL 值时,就会变成“死信”(Dead Message),消费者将无法再收到该消息(这点不是绝对的,可以参考 4.3 节)。
// 声明带TTL的队列Map<String, Object> queueArgs = new HashMap<>();queueArgs.put("x-message-ttl", 30000);channel.queueDeclare("ttl.queue", true, false, false, queueArgs);// 发送带TTL属性的消息AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("5000").build();channel.basicPublish("", "ttl.queue", props, "Test TTL Message".getBytes())
4.3 死信&延时队列
DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX 的队列就称之为死信队列。
出现的场景
- 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false;
- 消息过期;
- 队列达到最大长度。
// 创建一个持久化、非排他的、非自动删除的队列channel.exchangeDeclare("exchange.dlx", "direct", true);channel.queueDeclare("queue.dlx", true, false, false, null);channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");// 创建一个不用队列 并设置 TTL 以及 DLX、DLXroutingkeychannel.exchangeDeclare("exchange.normal", "fanout", true);Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 10000);args.put("x-dead-letter-exchange", "exchange.dlx");args.put("x-dead-letter-routing-key", "routingkey");channel.queueDeclare("queue.normal", true, false, false, args);channel.queueBind("queue.normal", "exchange.normal", "");channel.basicPublish("exchange.normal", "rk",MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());
由Web 管理页面(图4-3)可以看出,两个队列都被标记了“D”,这个是durable 的缩写,即设置了队列持久化。queue.normal 这个队列还配置了 TTL、DLX 和 DLK,其中 DLX 指的是x-dead-letter-routing-key 这个属性。
在 AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 DLX 和 TTL 模拟出延迟队列的功能。
4.7 持久化
RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。
deliveryMode(2) // 设置为持久化
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?答案是否定的。
首先从消费者来说,如果在订阅消费队列时将 autoAck 参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。这种情况很好解决,将autoAck 参数设置为 false,并进行手动确认,详细可以参考3.5 节。
其次,在持久化的消息正确存入 RabbitMQ 之后,还需要有一段时间(虽然很短,但是不可忽视)才能存入磁盘之中。RabbitMQ 并不会为每条消息都进行同步存盘(调用内核的fsync1方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内
RabbitMQ 服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将丢失。RabbitMQ 在运行时会根据统计的消息传送速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count),如果alpha 状态的消息数量大于此值时,就会引起消息的状态转换会丢失。 这里可以引入 RabbitMQ 的镜像队列机制(详细参考 9.4 节)
4.8 生产者确认
生产者如何知道消息有没有正确地到达服务器
4.8.1 事务机制
- 通过事务机制实现;
- 通过发送方确认(publisher confirm)机制实现。
try { channel.txSelect(); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); int result = 1 / 0; channel.txCommit();
} catch (Exception e) { e.printStackTrace(); channel.txRollback();
}
4.8.2 发送方确认机制
采用事务机制实现会严重降低 RabbitMQ 的消息吞吐量,这里就引入了一种轻量级的方式——发送方确认(publisher confirm)机制。
生产者通过调用 channel.confirmSelect 方法(即 Confirm.Select 命令)将信道设置为 confirm 模式,之后 RabbitMQ 会返回 Confirm.Select-Ok 命令表示同意生产者将当前信道设置为 confirm 模式。所有被发送的后续消息都被 ack 或者 nack 一次,不会出现一条消
息既被 ack 又被 nack 的情况,并且 RabbitMQ 也并没有对消息被 confirm 的快慢做任何保证
try {channel.confirmSelect();//将信道置为publisher confirm模式//之后正常发送消息channel.basicPublish("exchange", "routingKey", null,"publisher confirm test".getBytes());if (!channel.waitForConfirms()) {// 当消息发送不成功时候进入 if 代码块 do something else....System.out.println("send message failed");}} catch (InterruptedException e) {e.printStackTrace();}
注意要点:
(1)事务机制和 publisher confirm 机制两者是互斥的,不能共存。如果企图将已开启事务模式的信道再设置为 publisher confirm 模式,RabbitMQ 会报错:{amqp_error, precondition_ failed, "cannot switch from tx to confirm mode", 'confirm.select'};或者如果企图将已开启 publisher confirm 模式的信道再设置为事务模式,RabbitMQ 也会报错:{amqp_error, precondition_failed, "cannot switch from confirm to tx
mode", 'tx.select' }。
(2)事务机制和 publisher confirm 机制确保的是消息能够正确地发送至 RabbitMQ,这里的“发送至 RabbitMQ”的含义是指消息被正确地发往至 RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有匹配的队列。更进一步地讲,发送方要配合 mandatory 参数或者备份交换器一起使用来提高消息传输的可靠性。
publisher confirm 的优势在于并不一定需要同步确认。这里我们改进了一下使用方式,总结有如下两种:
- 批量 confirm 方法:每发送一批消息后,调用 channel.waitForConfirms 方法,等待服务器的确认返回。
相比于前面示例中的普通 confirm 方法,批量极大地提升了 confirm 的效率,但是问题在于出现返回 Basic.Nack 或者超时情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量 confirm 的性能应该是不升反降的。
channel.confirmSelect(); // 开启确认模式
for(int i=0; i<100; i++){channel.basicPublish("", "queue", null, message.getBytes());
}
// 批量确认所有未确认消息
channel.waitForConfirmsOrDie(5000); // 超时5秒// 缺点:简单但会阻塞生产者线程,批量失败需重发全部消息
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
channel.addConfirmListener((sequenceNumber, multiple) -> {if(multiple) {outstandingConfirms.headMap(sequenceNumber, true).clear();} else {outstandingConfirms.remove(sequenceNumber);}
}, (sequenceNumber, multiple) -> {// NACK处理逻辑
});// 批量发送100条消息for (int i = 0; i < 100; i++) {String message = "Msg-" + i;long seqNo = channel.getNextPublishSeqNo();outstandingConfirms.put(seqNo, message);channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());}
4.9 消费端要点介绍
消费者客户端可以通过推模式或者拉模式(推荐方式)的方式来获取并消费消息,当消费者处理完业务逻辑需要手动确认消息已被接收,这样 RabbitMQ才能把当前消息从队列中标记清除。当然如果消费者由于某些原因无法处理当前接收到的消息,可以通过 channel.basicNack 或者 channel.basicReject 来拒绝掉。
- 消息分发;
- 消息顺序性;
- 弃用 QueueingConsumer。
消息顺序
在RabbitMQ中保证消息顺序性需结合队列特性和业务设计,以下是核心方案:
一、基础保障机制
单队列单消费者模式
- 利用队列FIFO特性,仅允许一个消费者处理队列,避免并发消费导致乱序
- 缺点:吞吐量受限,需配合消息持久化和手动ACK确保可靠性
分区消费策略
- 通过路由键将关联消息(如相同订单ID)固定路由到同一队列,每个队列对应独立消费者
- 示例:使用Direct交换机按业务ID路由,实现"局部顺序性"
二、增强控制手
消息序列化标记
- 在消息体中嵌入序列号,消费者端通过缓存排序实现逻辑顺序控制
- 需配合幂等处理避免重复消息干扰
单活消费者模式
- 通过
x-single-active-consumer
参数确保队列同一时间仅有一个活跃消费者,故障时自动切换
// Spring AMQP配置示例 Map<String, Object> args = new HashMap<>(); args.put("x-single-active-consumer", true); new Queue("seq_queue", true, false, false, args);
- 通过
三、高级方案
事务与发布确认
- 生产者启用事务或发布确认机制,确保消息按发送顺序持久化到队列
- 事务适用于批量消息,确认机制适合单条消息
死信队列重试
- 对处理失败的消息进入死信队列延时重试,避免立即重入破坏顺序
4.10 消息传输保障
第九章 RabbitMQ 高阶
9.1 存储机制
不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。这两种类型的消息的落盘处理都在RabbitMQ 的“持久层”中完成。
- 持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。
- 非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。
持久层
- 队列索引(rabbit_queue_index):负责维护队列中落盘消息的信息,包括消息的存储地点、是否已被交付给消费者、是否已被消费者ack 等。每个队列都有与之对应的一个rabbit_queue_index
- 消息存储(rabbit_msg_store):以键值对的形式存储消息,它被所有队列共享,在每个节点中有且只有一个。从技术层面上来说,rabbit_msg_store 具体还可以分为msg_store_persistent 和 msg_store_transient
- msg_store_persistent 负责持久化消息的持久化,重启后消息不会丢失;
- msg_store_transient 负责非持久化消息的持久化,重启后消息会丢失。
结构查看 /opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@node1
队列的结构
- rabbit_amqqueue_process 负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的 confirm 和消费端的 ack)等。
- backing_queue 是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。
队列消息状态
- alpha:消息内容(包括消息体、属性和 headers)和消息索引都存储在内存中。 (alpha状态最耗内存,但很少消耗CPU)
- beta:消息内容保存在磁盘中,消息索引保存在内存中(只需要一次I/O 操作就可以读取到消息(从 rabbit_msg_store 中))。
- gamma:消息内容保存在磁盘中,消息索引在磁盘和内存中都有(只需要一次I/O 操作就可以读取到消息(从 rabbit_msg_store 中))。
- delta:消息内容和索引都在磁盘中。(状态基本不消耗内存,但是需要消耗更多的 CPU 和磁盘 I/O 操作,delta 状态需要执行两次I/O 操作才能读取到消息,一次是读消息索引(从 rabbit_queue_index 中),一次是读消息
内容(从 rabbit_msg_store 中))
- channel.txSelect 用于将当前的信道设置成事务模式,
- 发布消息给 RabbitMQ ,
- channel.txCommit 用于提交事务,
- channel.txRollback 用于事务回滚。
- 异步 confirm 方法:提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法进行处理。