中间件面试题
引用,https://blog.csdn.net/ThinkWon/article/details/120928777 ,做了补充。
RabbitMQ
核心概念
Producer / Consumer:生产者与消费者模型
Queue:消息队列,存储消息
Exchange:交换机,决定消息如何路由到队列
Binding:队列与交换机的绑定关系
Routing Key / Binding Key:消息路由规则
消息确认机制:自动 / 手动 ACK
消息持久化:防止消息丢失
MQ 的常见问题有:
-
消息的顺序问题
-
消息的重复问题
-
消息的可靠传输
-
消息的顺序问题
-
消息的重复问题
-
消息的可靠传输
消息不可靠的情况可能是消息丢失,劫持等原因;
丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;
生产者丢失消息:从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;
transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;
confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,生产者可以进行重试操作。
发送方确认机制
先说配置和使用:
配置文件
spring:rabbitmq:publisher-confirm-type: correlated # 开启发送方确认机制
配置属性有三种分别为:
none:表示禁用发送方确认机制correlated:表示开启发送方确认机制simple:表示开启发送方确认机制,并支持waitForConfirms()和waitForConfirmsOrDie()的调用。
这里一般使用 correlated 开启发送方确认机制即可,至于 simple 的 waitForConfirms() 方法调用是指串行确认方法,即生产者发送消息后,调用该方法等待 RabbitMQ Server 确认,如果返回 false 或超时未返回则进行消息重传。由于串行性能较差,这里一般都是用异步 confirm 模式。
通过调用 setConfirmCallback() 实现异步 confirm 模式感知消息发送结果
/*** 消息业务实现类** @author 单程车票*/
@Service
public class RabbitMQServiceImpl {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendMessage() {// 发送消息rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);// 设置消息确认回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** MQ确认回调方法* @param correlationData 消息的唯一标识* @param ack 消息是否成功收到* @param cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// 记录日志log.info("ConfirmCallback...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");if (!ack) {// 出错处理...}}});}
}
3. 保证消息在 RabbitMQ Server 中的持久化
对于消息的持久化,只需要在发送消息时将消息持久化,并且在创建交换机和队列时也保证持久化即可。
配置如下:
/*** 消息队列*/
@Bean
public Queue queue() {// 四个参数:name(队列名)、durable(持久化)、 exclusive(独占)、autoDelete(自动删除)return new Queue(MESSAGE_QUEUE, true);
}/*** 直接交换机*/
@Bean
public DirectExchange exchange() {// 四个参数:name(交换机名)、durable(持久化)、autoDelete(自动删除)、arguments(额外参数)return new DirectExchange(Direct_Exchange, true, false);
}
在创建交换机和队列时通过构造方法将持久化的参数都设置为 true 即可实现交换机和队列的持久化。
@Override
public void sendMessage() {// 构造消息(将消息持久化)Message message = MessageBuilder.withBody("单程车票".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 向MQ发送消息(消息内容都为消息表记录的id)rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
}
在发送消息前通过调用 MessageBuilder 的 setDeliveryMode(MessageDeliveryMode.PERSISTENT) 在构造消息时设置消息持久化(MessageDeliveryMode.PERSISTENT)即可实现对消息的持久化。
生产者发送消息后通过调用 setConfirmCallback() 可以将信道设置为 confirm 模式,所有消息会被指派一个消息唯一标识,当消息被发送到 RabbitMQ Server 后,Server 确认消息后生产者会回调设置的方法,从而实现生产者可以感知到消息是否正确无误的投递,从而实现发送方确认机制。并且该模式是异步的,发送消息的吞吐量会得到很大提升。
上面就是发送放确认机制的配置和使用,使用这种机制可以保证生产者的消息可靠性投递,并且性能较好。
消息队列丢数据:消息持久化。
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。
这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。
这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢?
这里顺便说一下吧,其实也很容易,就下面两步
-
将queue的持久化标识durable设置为true,则代表是一个持久的队列
-
发送消息的时候将deliveryMode=2
这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据
消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!
消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;
如果这时处理消息失败,就会丢失该消息;
解决方案:处理消息成功后,手动回复确认消息。
4. 保证消费者消费的消息不丢失
在保证发送方和 RabbitMQ Server 的消息可靠性的前提下,只需要保证消费者在消费消息时异常消息不丢失即可保证消息的可靠性。
RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 能够感知到消费者是否消费成功消息,默认情况下,消费者应答机制是自动应答的,也就是RabbitMQ 将消息推送给消费者,便会从队列删除该消息,如果消费者在消费过程失败时,消息就存在丢失的情况。所以需要将消费者应答机制设置为手动应答,只有消费者确认消费成功后才会删除消息,从而避免消息的丢失。
下面来看看如何配置消费者手动应答:
spring:rabbitmq:publisher-confirm-type: correlated # 开启发送方确认机制publisher-returns: true # 开启消息返回template:mandatory: true # 消息投递失败返回客户端listener:simple:acknowledge-mode: manual # 开启手动确认消费机制
通过 listener.simple.acknowledge-mode = manual 即可将消费者应答机制设置为手动应答。
之后只需要在消费消息时,通过调用 channel.basicAck() 与 channel.basicNack() 来根据业务的执行成功选择是手动确认消费还是手动丢弃消息。
/*** 监听消费队列的消息*/
@RabbitListener(queues = RabbitMQConfig.MESSAGE_QUEUE)
public void onMessage(Message message, Channel channel) {// 获取消息索引long index = message.getMessageProperties().getDeliveryTag();// 解析消息byte[] body = message.getBody();...try {// 业务处理...// 业务执行成功则手动确认channel.basicAck(index, false);}catch (Exception e) {// 记录日志log.info("出现异常:{}", e.getMessage());try {// 手动丢弃信息channel.basicNack(index, false, false);} catch (IOException ex) {log.info("丢弃消息异常");}}
}
这里说明一下 basicAck() 与 basicNack() 的参数说明:
void basicAck(long deliveryTag, boolean multiple) 方法(会抛异常):
- deliveryTag:该消息的index
- multiple:是否批量处理(true 表示将一次性ack所有小于deliveryTag的消息)
void basicNack(long deliveryTag, boolean multiple, boolean requeue) 方法(会抛异常):
- deliveryTag:该消息的index
- multiple:是否批量处理(true 表示将一次性ack所有小于deliveryTag的消息)
- requeue:被拒绝的是否重新入队列(true 表示添加在队列的末端;false 表示丢弃)
通过设置手动确认消费者应答机制即可保证消费者在消费信息时的消息可靠性。
