【RabbitMQ】高级特性—发送方确认详解
文章目录
- 发送方确认
- confirm 确认模式
- 1. 配置 RabbitMQ
- 2. 设置确认回调逻辑并发送消息
- 3. 测试
- 完整代码
- return 退回模式
- 1. 配置 RabbitMQ
- 2. 设置返回回调逻辑并发送消息
- 3. 测试
- 常见面试题
- 如何保证 RabbitMQ 消息的可靠传输
发送方确认
在使用 RabbitMQ
的时候,可以通过消息持久化来解决因为服务器的一次崩溃而导致的消息丢失
但是还有一个问题,当消息的生产者将消息发送初期之后,消息到底有没有正确地到达服务器呢?
- 如果在消息到达服务器之前已经丢失(比如
RabbitMQ
重启,那么RabbitMQ
重启期间生产者消息投递失败),持久化操作也解决不了这个问题,因为消息根被没有到达服务器,何谈持久化?
RabbitMQ
为我们提供了两种解决方案:
- 通过事务机制实现
- 通过发送方确认(
publisher confirm
)机制实现
事务机制比较消耗性能,在实际工作中使用也不多,我们主要极少 confirm
机制来实现发送方的确认。
RabbitMQ
为我们提供了两个方式来控制消息的可靠性投递
confirm
确认模式return
返回模式
confirm 确认模式
Producer
在发送消息的时候,对发送端设置一个 ConfirmCallback
的监听,无论消息是否到达 Exchange
,这个监听都会被执行
- 如果
Exchange
成功收到,ACK
(Acknowledge character
,确认字符)为true
- 如果没有收到消息,
ACK
就为false
步骤如下:
- 配置
RabbitMQ
- 设置确认回调逻辑并发送消息
- 测试
接下来看实现步骤:
1. 配置 RabbitMQ
配置确认机制
spring: rabbitmq: addresses: amqp://guest:guest@127.0.0.1:5672/coding listener: simple: acknowledge-mode: manual # 消息接收确认publisher-confirm-type: correlated # 消息发送确认
2. 设置确认回调逻辑并发送消息
无论消息确认成功还是失败,都会调用 ConfirmCallback
的 confirm
方法。
- 如果消息成功发送到
Broker
,ack
为true
- 如果消息发送失败,
ack
为false
,并且cause
提供失败的原因
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(""); if (ack) { System.out.printf("消息接收成功,id: %s \n", correlationData.getId()); } else { System.out.printf("消息接收失败,id: %s \n", correlationData.getId()); } } }); return rabbitTemplate;
}
ConfirmCallback()
里面的confirm(@Nullable CorrelationData, boolean ack, @Nullables String cause)
- 确认回调
correlationDate
:发送消息时的附加信息,通常用于在确认回调中识别特定的消息ack
:交换机是否收到消息,收到为true
,未收到为false
cause
:当消息确认失败时,这个字符串参数将提供失败的原因,这个原因可以用于调试和错误处理。成功时,cause
为null
@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")
public String confirm() { CorrelationData correlationData = new CorrelationData("1"); confirmRabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "confirm", "confirm test...", correlationData); return "确认成功";
}
RabbitTemplate.ConfirmCallback
和ConfirmListener
区别
在RabbitMQ
中,ConfirmListener
和ConfirmCallback
都是用来处理消息确认的机制,但他们属于不同的客户端库,并且使用的场景和方式有所不同
ConfirmListener
是RabbitMQ
Java Client
库中的接口。这个库是RabbitMQ
官方提供的一个直接于RabbitMQ
服务器交互的客户端库。ConfirmListener
接口提供了两个方法:handleAck
和handleNack
,用于处理消息确认好否定确认的事件ConfirmCallback
是Spring AMQP
框架中的一个接口。专门为Spring
环境设计,用于简化与RabbitMQ
交互的过程,它只包含一个confirm
方法,用于处理消息确认的回调
在SpringBoot
应用中,通常会使用ConfirmCallback
,因为它与Spring
框架的其他部分更加整合,可以利用Spring
的配置和依赖注入功能。而在RabbitMQ
Java Client
库时,则可能会直接实现ConfirmListener
接口,更直接的与RabbitMQ
的Channel
交互
3. 测试
运行程序,调用接口: http://127.0.0.1:8080/producer/confirm
观察控制台,消息确认成功:
接下来把交换机名称改一下,重新运行,会触发另一个结果
// 发送失败
confirmRabbitTemplate.convertAndSend("confirm_exchange1", "confirm", "confirm test...", correlationData);
运行结果:
- 原因中,明确显示 “
no exchange 'confirm_exchange1' in vhost 'bite'
“,也就是说,bite
这个虚拟机,没有名字为confirm_exchange1
的交换机
完整代码
public class Constant { public static final String ACK_EXCHANGE_NAME = "ack_exchange"; public static final String ACK_QUEUE = "ack_queue";
}
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Bean("rabbitTemplate")public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.printf("");if (ack){System.out.printf("消息接收成功, id:%s \n", correlationData.getId());} else {System.out.printf("消息接收失败, id:%s, cause: %s", correlationData.getId(), cause);}}});return rabbitTemplate;}
}
import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitmqConfig {//publish-confirm模式//1. 交换机@Bean("confirmExchange")public Exchange confirmExchange() {return ExchangeBuilder.topicExchange(Constant.CONFIRM_EXCHANGE_NAME).durable(true).build();}//2. 队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(Constant.CONFIRM_QUEUE).build();}//3. 队列和交换机绑定 Binding@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmExchange") Exchange exchange, @Qualifier("confirmQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();}
}
package com.bite.rabbitmq.controller;import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/product")
public class ProductController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() throws InterruptedException {CorrelationData correlationData1 = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME,"confirm", "confirm test...", correlationData1);//发送失败// confirmRabbitTemplate.convertAndSend("confirm_exchange1", "confirm", "confirm test...", correlationData1);Thread.sleep(2000);return "确认成功";}
}
return 退回模式
消息到达 Exchange
之后,会根据路由规则匹配,把消息放入 Queue
中,Exchange
到 Queue
的过程:
- 如果一条消息无法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等),可以选择把消息退回给发送者
- 消息返回发送者时,我们可以设置一个返回回调方法,对消息进行处理
步骤如下:
- 配置
RabbitMQ
- 设置返回回调逻辑并发送信息
- 测试
接下来看实现步骤
1. 配置 RabbitMQ
# 配置确认机制
spring: rabbitmq: addresses: amqp://guest:guest@127.0.0.1:5672/coding listener: simple: acknowledge-mode: manual # 消息接收确认publisher-confirm-type: correlated # 消息发送确认
2. 设置返回回调逻辑并发送消息
- 消息无法被路由到人恶化队列,它将返回给发送者,这时
setReturnCallback
设置的回调将被触发
/** * retrun 返回模式 */
@Primary
@Bean("confirmRabbitTemplate2")
public RabbitTemplate confirmRabbitTemplate2(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.printf("消息被退回:%s", returnedMessage); } }); return rabbitTemplate;
}
@Resource(name = "confirmRabbitTemplate2")
private RabbitTemplate confirmRabbitTemplate2;@RequestMapping("/msgReturn")
public String msgReturn() { CorrelationData correlationData = new CorrelationData("2"); confirmRabbitTemplate2.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, "confirm1", "message return test...", correlationData); return "消息发送成功";
}
- 使用
RabbitTemplate
的setMandatory
方法设置消息的mandatory
属性为true
(默认为false
) - 这个属性的作用是告诉
RabbitMQ
,如果一条消息无法被任何队列消费,RabbitMQ
应该将消息返回给发送者,此时ReturnCallback
就会被触发
回调函数中,有一个参数:ReturnedMessage
,包含以下属性
public class ReturnedMessage { // 返回的消息对象,包含了消息体和消息属性private final Message message; // 由 Broker 提供的回复码,表示消息无法路由的原因,通常是一个数字代码,每个数字代表不同的含义private final int replyCode;// 一个文本字符串,提供了无法路由消息的额外信息或错误描述 private final String replyText; // 消息被发送到的交换机名称private final String exchange;// 消息的路由键,即发送消息时指定的键 private final String routingKey;
}
3. 测试
运行程序,调用接口: http://127.0.0.1:8080/producer/msgReturn
观察控制台,消息被退回:
常见面试题
#高频面试
如何保证 RabbitMQ 消息的可靠传输
先放一张 RabbitMQ
消息传递图
从这个图中,可以看出,消息可能丢失的场景以及解决方案:
-
生产者将消息发送到
RabbitMQ
失败- 可能原因:网络问题等
- 解决办法:参考——发送方确认,
confirm
模式
-
消息在交换机中无法路由到指定队列:
- 可能原因:代码或配置层面错误,导致消息路由失败
- 解决办法:参考——发送方确认,
return
模式
-
消息队列自身数据丢失
- 可能原因:消息到达
RabbitMQ
之后,RabbitMQ Server
宕机导致消息丢失 - 解决办法:参考——持久性。开启
RabbitMQ
持久化,就是消息写入之后会持久化到磁盘,如果RabbitMQ
挂了,回复之后会自动读取之前存储的数据(极端情况下,RabbitMQ
还未持久化就挂了,可能导致少量的数据丢失,这个概率极低,也可以通过集群的方式提高可靠性)
- 可能原因:消息到达
-
消息者异常,导致消息丢失
- 可能原因:消息到达消费者,还没来得及消费,消费者宕机、消费者逻辑有问题
- 解决办法:参考——消息确认。
RabbitMQ
提供了消费者应答机制来使RabbitMQ
能够感知到消费者是否消费成功消息。默认情况下消费者应答机制是自动答应的,可以开启手动确认,当消费者确认消费成功后才会删除消息,从而避免消息丢失。除此之外,也可以配置重试机制,当消息消费异常时,通过消息重试确保消息的可靠性