当前位置: 首页 > news >正文

21、MQ常见问题梳理

目录

⼀ 、MQ如何保证消息不丢失

1 、哪些环节可能会丢消息

2 、⽣产者发送消息如何保证不丢失

2.1、⽣产者发送消息确认机制

2.2、Rocket MQ的事务消息机制

2.3 、Broker写⼊数据如何保证不丢失 

2.3.1** ⾸先需要理解操作系统是如何把消息写⼊到磁盘的**。

2.3.2然后来看MQ是如何调⽤fsync的 

2.4 、Broker主从同步如何保证不丢失 

2.5 、消费者消费消息如何不丢失

2.6 、如果MQ服务全部挂了, 如何保证不丢失 

2.7 、MQ消息零丢失⽅案总结

面试题:说说你的项目RocketMQ如何保证消息不丢失? 

⼆ 、MQ如何保证消息的顺序性

三、MQ如何保证消息幂等性 

1 、⽣产者发送消息到服务端如何保持幂等 

2 、消费者消费消息如何保持幂等 

 四、MQ如何快速处理积压的消息

1 、消息积压会有哪些问题。

2 、怎么处理⼤量积压的消息

请问RocketMQ消息积压一般产生原因是什么?如何解决消息积压问题呢?

五、Rocket MQ课程总结


 MQ如何保证消息不丢失

1 、哪些环节可能会丢消息

⾸先分析下MQ的整个消息链路中,有哪些步骤是可能会丢消息的

其中, 1 2 4三个场景都是跨⽹络的,⽽跨⽹络就肯定会有丢消息的可能。

然后关于3这个环节,通常MQ存盘时都会先写⼊操作系统的缓存page cache中,然后再由操作系统异步的将  消息写⼊硬盘 。这个中间有个时间差,就可能会造成消息丢失 。如果服务挂了,缓存中还没有来得及写⼊硬盘 的消息就会丢失。

2 、⽣产者发送消息如何保证不丢失

⽣产者发送消息之所以可能会丢消息,都是因为⽹络 。因为⽹络的不稳定性,容易造成请求丢失 。怎么解决这 样的问题呢?其实—个统—的思路就是⽣产者确 。简单来说,就是⽣产者发出消息后,给⽣产者—个确定的 通知, 这个消息在Broker端是否写⼊完成了 。就好⽐打电话,不确定电话通没通,那就互相说个   具体确认—下 。只不过基于这个同样的思路,各个MQ品有不同的实现⽅式。 

2.1、⽣产者发送消息确认机制

Rocket MQ中,提供了三种不同的发送消息的⽅式:

 异步发送, 不需要Broker确认 。效率很⾼ ,但是会有丢消息的可能。 

// 异步发送, 不需要Broker确认 。效率很⾼ ,但是会有丢消息的可能。
producer.sendOneway(msg);
// 同步发送, ⽣产者等待Broker的确认 。消息最安全 ,但是效率很低。
SendResult sendResult = producer.send(msg, 20 * 1000);
// 异步发送, ⽣产者另起—个线程等待Broker确认, 收到Broker确认后直接触发回调⽅法 。消息安全和效率之间⽐较均 衡 ,但是会加⼤客户端的负担。
producer.send(msg, new SendCallback() { @Overridepublic void onSuccess(SendResult sendResult) {// do something}@Overridepublic void onException(Throwable e) {// do something}
});

与之类似的, Kafka也同样提供了这种同步和异步的发送消息机制。

//直接send发送消息, 返回的是—个Future。这就相当于是异步调⽤ 。
Future<RecordMetadata> future = producer.send(record)
//调⽤future的get⽅法才会实际获取到发送的结果 。⽣产者收到这个结果后, 就可以知道消息是否成功发到broker了。 这个过程就变成了—个同步的过程。
RecordMetadata recordMetadata = producer.send(record).get();

⽽在RabbitMQ中,则是提供了—个Publisher Confirms⽣产者确认机制 。其思路也是Publiser收到Broker的响 应后再出发对应的回调⽅法。

//获取channel
Channel ch = ...;
//添加两个回调, —个处理ack响应, —个处理nack响应
ch.addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback)

这些各种各样不同API的背后,都是—个统—的思路,就是给⽣产者响应,让⽣产者知道消息有没有发送成 。如果没有发送成功,也由⽣产者⾃⾏进⾏补救 。可以重发,也可以向业务抛异常 。都由⽣产者⾃⾏处理。

2.2Rocket MQ的事务消息机制

Rocket MQ提出了事务消息机制,其实也是保证⽣产者安全发送消息的利器 。事务消息机制的基本流程如下:

其实整体上来看, Rocket MQ的事务消息机制, 还是基于⽣产者确认构建的—种实现机制 。其核⼼思想, 还是  通过Broker主动触发⽣产者的回调⽅法,从⽽确认消息是否成功发送到了Broker 。只不过, 这⾥将—次确认变 成了多次确认 。在多次确认的过程中, 除了确认消息的安全性, 还给了⽣产者反悔的机会 。另外,事务消息 机制进—步将⽣产者确认与⽣产者的本地事务结合到了—起,从⽽让⽣产者确认这个机制有了更多的业务属性。

例如, 以最常⻅的电商订单场景为例,就可以在下完订单后,等到⽤户⽀付的过程中使⽤事务消息机制 。这样 可以保证本地下单和第三⽅⽀付平台⽀付这两个业务是事务性的,要么同时成功,就往下游发订单消息 。要么 就同时失败,不往下游发订单消息。

2.3 Broker写⼊数据如何保证不丢失 

接下来, Producer把消息发送到Broker上了之后, Broker是不是能够保证消息不丢失呢?这⾥也有—个核⼼的问题,那就是PageCache缓存。

数据会优先写⼊到缓存,然后过—段时间再写⼊到磁盘 。但是缓存中的数据有个特点,就是断电即丢失,所 以,如果服务器发⽣⾮正常断电, 内存中的数据还没有写⼊磁盘, 这时就会造成消息丢失。

怎么解决这个问题呢?

2.3.1** ⾸先需要理解操作系统是如何把消息写⼊到磁盘的**

 

Linux为例, ⽤户态的应⽤程序,不管是什么应⽤程序, 想要写⼊磁盘⽂件时,都只能调⽤操作系统提供的 write系统调⽤, 申请写磁盘 。⾄于消息如何经过PageCache再写⼊到磁盘中, 这个过程, 这个过程是在内核  态执⾏的,也就是操作系统⾃⼰执⾏的,应⽤程序⽆法⼲预 。这个过程中,应⽤系统唯—能够⼲预的,就是调 ⽤操作系统提供的sync系统调⽤, 申请—次刷盘操作, 主动将PageCache中的数据写⼊到磁盘。

>> man 2 write
WRITE(2)                                                         Linux Programmer 's
Manual
NAME
write - write to a file descriptor>> man 2 fsync
FSYNC(2)                                                         Linux Programmer 's
Manual
NAME
fsync, fdatasync - synchronize a file 's in-core state with storage device
2.3.2然后来看MQ是如何调⽤fsync 

先来看Rocket MQ

Rocket MQBroker提供了—个很明确的配置项flush DiskType ,可以选择刷盘模式 。有两个可选项, SYNC_FLUSH 同步刷盘和ASYNC_FLUSH 异步刷盘。

所谓同步刷盘, 是指broker每往⽇志⽂件中写⼊—条消息,就调⽤—次刷盘操 。⽽异步刷盘,则是指broker 每隔—个固定的时间,才去调⽤—次刷盘操作 。异步刷盘性能更稳定,但是会有丢消息的可能 。⽽同步刷盘的 消息安全性就更⾼ ,但是操作系统的IO⼒就会⾮常⼤ 

Rocket MQ中,就算是同步刷盘,其实也并不是真的写—次消息就刷盘—次, 这在海量消息的场景下,操作  系统是撑不住的 。所以,我们在之前梳理Rocket MQ核⼼源码的过程中看到, Rocket MQ同步刷盘的实现⽅   式其实也是以10毫秒的间隔去调⽤刷盘操作 。从理论上来说,也还是会有⾮正常断电造成消息丢失的可能,  ⾄严格意义上来说,任何应⽤程序都不可能完全保证断电消息不丢失 。但是, Rocket MQ的这—套同步刷盘机  制,却可以通过绝⼤部分业务场景的验证 。这其实就是—种平衡。

然后来看Kafka:

Kafka中并没有明显的同步刷盘和异步刷盘的区别,不过他暴露了—系列的参数,可以管理刷盘的频率。

flush.ms : 多⻓时间进⾏—次强制刷盘。

log.flush.interval.messages:表示当同—个Partiton的消息条数积累到这个数量时, 就会申请—次刷盘操作 。默 认是Long.MAX

log.flush.interval.ms 当—个消息在内存中保留的时间, 达到这个数量时, 就会申请—次刷盘操作 。他的默认值是  。如果这个参数配置为空 ,则⽣效的是下—个参数。

log.flush.scheduler.interval.ms:检查是否有⽇志⽂件需要进⾏刷盘的频率 。默认也是Long.MAX

其实在这⾥⼤家可以思考下,对kafka来说,把log.flush.interval.messages参数设置成1 ,就是每写⼊—条消 息就调⽤—次刷盘操作, 这不就是所谓的同步刷盘了吗?

最后来看RabbitMQ

关于消息刷盘问题, RabbitMQ官⽹给了更明确的说法 。那就是对于Classic经典对列, 即便声明成了持久化对  列, RabbitMQ的服务端也不会实时调⽤fsync 因此⽆法保证服务端消息断电不丢失 。对于Stream流式对列, 则更加直接, RabbitMQ明确不会主动调⽤fsync进⾏刷盘,⽽是交由操作系统⾃⾏刷盘。

 ⾄于怎么办呢?他明确就建议了,如果对消息安全性有更⾼的要求,可以使⽤Publisher Confirms机制来进— 步保证消息安全 。这其实也是对KafkaRocket MQ同样适⽤的建议。

2.4 Broker主从同步如何保证不丢失 

对于Broker来说,通常Slave的作⽤就是做—个数据备份 。当Broker服务宕机了, 甚⾄是磁盘都坏了时,可以 Slave上获取数据记录 。但是,如果主从同步失败了,那Broker的这—层保证就会失效 。因此, 主从同步 也有可能造成消息的丢失。

我们这⾥重点来讨论—下, Rocket MQ的普通集群以及Dledger⾼可⽤集群。

先来看Rocket MQ的普通集群⽅案,在这种⽅案下,可以指定集群中每个节点的角⾊, 固定的作为Master或者 Slave

在这种集群机制下,消息的安全性还是⽐较⾼的 。但是有—种极端的情况需要考虑 。因为消息需要从Master Slave同步, 这个过程是跨⽹络的, 因此也是有时间延迟的 。所以,如果Master出现⾮正常崩溃,那么就有可   能有—部分数据是已经写⼊到了Master但是还来得及同步到Slave 。这—部分未来得及同步的数据,在Rocket MQ的这种集群机制下,就会—直记录在Master节点上 。等到Master重启后,就可以继续同步了 。另外 由于Slave并不会主动切换成Master ,所以Master服务崩溃后,也不会有新的消息写进来, 此也不会有消息  冲突的问题 。所以, 只要Mater的磁盘没有坏,那么在这种普通集群下, 主从同步通常不会造成消息丢失。

与之形成对⽐的是Kafka的集群机制 。在Kafka集群中,如Leader Partition的服务崩溃了,那么,那些Follower Partition就会选举产⽣—个新的Leadr Partition 。⽽集群中所有的消息,都以Leader Partition的为 。即便旧的Leader Partition重启了,也是作为Follower Partition启动, 主动删除掉⾃⼰的HighWater之后的 数据,然后从新的Leader Partition上重新同步消息 。这样,就会造成那些已经写⼊旧的Leader Partition但是   还没来得及同步的消息,就彻底丢失了。

Rocket MQKafka之间的这种差异,其实还是来⾃于他们处MQ问题的初衷不同 Rocket MQ诞⽣于阿⾥的 ⾦融体系,天⽣对消息的安全性⽐较敏感 。⽽Kafka诞⽣于LinkedIn的⽇志收集体系,天⽣对服务的可⽤性要  求更⾼ 。这也体现了不同产品对业务的取舍。

然后来看下Rocket MQDledger⾼可⽤集群 。在Rocket MQ中, 直接使⽤基于Raft协议的Dledger来保存 CommitLog消息⽇志 。也就是说他的消息会通过DledgerRaft协议,在主从节点之间同步。

⽽关于Raft协议, 之前章节做给分析,他是—种基于两阶段的多数派同意机制 。每个节点会将客户端的治指令 Entry的形式保存到⾃⼰的Log⽇志当中 。此时Entryuncommited状态 。当有多数节点统统保存了Entry后,就可以执⾏Entry中的客户端指令,提交到StateMachine状态机中 。此时Entry更新为commited状态。

他优先保证的是集群内的数据—致性,⽽并不是保证不丢失 。在某些极端场景下, ⽐如出现⽹络分区情况时, 也会丢失—些未经过集群内确认的消息 。不过,基于Rocket MQ的使⽤场景, 这种丢失消息的可能性⾮常⼩  另外, 之前也提到过, 这种服务端⽆法保证消息安全的问题,其实结合客户端的⽣产者确认机制, 是可以得到 ⽐较好的处理的 。因此,在Rocket MQ中使⽤Dledger集群的话,数据主从同步这个过程,数据安全性还是⽐   较⾼的 。基本可以认为不会造成消息丢失。

2.5 、消费者消费消息如何不丢失

最后,消费者消费消息的过程中, 需要从Broker上拉取消息, 这些消息也是跨⽹络的,所以拉取消息的请求也 可能丢失 。这时,会不会有丢消息的可能呢?

⼏乎所有的MQ产品都设置了消费状态确认机制 。也就是消费者处理完消息后, 需要给Broker—个响应,表示 消息被正常处理了 。如果Broker端没有拿到这个响应,不管是因为Consumer没有拿到消息, 还是Consumer  处理完消息后没有给出相应, Broker都会认为消息没有处理成功 。之后, Broker就会向Consumer重复投递这 些没有处理成功的消息 Rocket MQKafka是根据Offset机制重新投递,⽽RabbitMQClassic Queue经典  对列,则是把消息重新⼊队 。因此,正常情况下, Consumer消费消息这个过程, 是不会造成消息丢失的,相 反,可能需要考虑下消息幂等的问题。

但是, 这也并不是说消费者消费消息不可能丢失 。例如像下⾯这种情况, Consumer异步处理消息,就有可能 造成消息丢失。

consumer.registerMessageListener(new MessageListenerConcurrently{ @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>msgs,ConsumeConcurrentlyContext context) {new Thread(){public void run(){//处理业务逻辑System.out.printf("%s Receive New Messages: &s %n", Thread.currentThread()     .getN }};return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });

这⾥需要注意的是,通常在开发过程中,不太会这么直⽩的使⽤多线程异步机制去处理问题 。但是,很有可能 在处理业务时,使⽤—些第三⽅框架来处理消息 。他们是不是使⽤的多线程异步机制,那就不能确定了 。所以,线程并发,在任何业务场景下,都是必不可少的基本功。

2.6 、如果MQ服务全部挂了, 如何保证不丢失 

最后有—种⼩概率的极端情况,就是MQ的服务全部挂掉了, 这时,要如何保证业务能够继续稳定进⾏, 同时 业务数据不会丢失呢?

通常的做法是设计—个降级缓存 ProducerMQ发消息失败了,就往降级缓存中写,然后,依然正常去进⾏后续的业务。

此时,再启动—个线程,不断尝试将降级缓存中的数据往MQ中发送 。这样, ⾄少当MQ服务恢复过来后, 这些 消息可以尽快进⼊到MQ中,继续往下游Conusmer推送,⽽不⾄于造成消息丢失。

2.7 MQ消息零丢失⽅案总结

最后要注意到, 这⾥讨论到的各种MQ消息防⽌丢失的⽅案,其实都是以增加集群负载, 降低吞吐为代价的。

这必然会造成集群效率下降 。因此, 这些保证消息安全的⽅案通常都需要根据业务场景进⾏灵活取舍,⽽不是 —股脑的直接⽤上。

这⾥希望你能够理解到, 这些消息零丢失⽅案,其实是没有最优解的 。因为如果有最优解,那么这些MQ品,就不需要保留各种各样的设计了 。这和很多⾯试⼋股⽂是有冲突的 。⾯试⼋股⽂强调标准答案,⽽实际业 务中, 这个问题是没有标准答案的,—切,都需要根据业务场景去调整

面试题:说说你的项目RocketMQ如何保证消息不丢失? 


RocketMQ通过多层面的机制来确保消息的可靠性,包括生产者端、broker端和消费者端。
1. 生产者端保证
        a. 同步发送
                同步发送是最可靠的发送方式,它会等待broker的确认响应。
        b. 异步发送 + 重试机制
                异步发送通过回调来处理发送结果,并可以设置重试次数。
2.Broker端保证
        a. 同步刷盘,通过配置broker.conf文件,可以启用同步刷盘:
                brokerRole = SYNC_MASTER 
3. 消费者端保证
        a. 手动提交消费位移,使用手动提交可以确保消息被正确处理后再提交位移。
        b. 幂等性消费,在消费端实现幂等性处理,确保重复消费不会导致业务问题。
通过这些机制的组合,RocketMQ能够在各个环节保证消息的可靠性,极大地降低了消息丢失的风险。在实际应用中,可以根据业务需求选择合适的配置和实现方式,以在可靠性和性能之间取得平衡。

 MQ如何保证消息的顺序性

这⾥⾸先需要明确的是,通常讨论MQ的消息顺序性,其实是在强调局部有序,⽽不是全局有序 。就好⽐QQ 微信的聊天消息,通常只要保证同—个聊天窗⼝内的消息是严格有序的 。⾄于不同窗口之间的消息,顺序出了点偏差,其实是⽆所谓的 。所谓全局有序,通常在业务上没有太多的使⽤场景 。在Rocket MQKafka中把Topic的分区数设置成1 这类强⾏保证消息全局有序的⽅案,纯属思维体操。

那么怎么保证消息局部有序呢?最典型的还是Rocket MQ的顺序消费机制。

这个机制需要两个⽅⾯的保障。

  1. 1 Producer将—组有序的消息写⼊到同—个MessageQueue中。
  2. 2 Consumer每次集中从—个MessageQueue中拿取消息。

Producer端, Rocket MQKafka都提供了分区计算机制,可以让应⽤程序⾃⼰决定消息写⼊到哪—个分 。所以这—块, 是由业务⾃⼰决定的 。只要通过定制数据分⽚算法,把—组局部有序的消息发到同—个对列 当中,就可以通过对列的FI FO特性,保证消息的处理顺序 。对于RabbitMQ ,则可以通过维护ExchangeQueue之间的绑定关系,将这—组局部有序的消息转发到同—个对列中,从⽽保证这—组有序的消息,在 RabbitMQ内部保存时, 是有序的。

Conusmer端, Rocket MQ是通过让Consumer注⼊不同的消息监听器来进⾏区分的 。⽽具体的实现机制,在 之前章节分析过,核⼼是通过对Consumer的消费线程进⾏并发控制,来保证消息的消费顺序的 。类⽐到Kafka  Kafka中并没有这样的并发控制 。⽽实际上, KafkaConsumer对某—个Partition拉取消息时,天⽣就是   单线程的,所以,参照Rocket MQ的顺序消费模型, KafkaConsumer天⽣就是能保证局部顺序消费的。

⾄于RabbitMQ 以他的Classic Queue经典对列为例,他的消息被—个消费者从队列中拉取后,就直接从队列 中把消息删除了 。所以,基本不存在资源竞争的问题 。那就简单的是—个队列只对应—个Consumer ,那就是  能保证顺序消费的 。如果—个队列对应了多个Consumer 同—批消息,可能会进⼊不同的Consumer处理,所以也就没法保证消息的消费顺序

三、MQ如何保证消息幂等性 

1 、⽣产者发送消息到服务端如何保持幂等 

Producer发送消息时,如果采⽤发送者确认的机制,那么Producer发送消息会等待Broker的响应 。如果没有 收到Broker的响应, Producer就会发起重试 。但是, Producer没有收到Broker的响应,也有可能是Broker 经正常处理完了消息, 只不过发给Producer的响应请求丢失了 。这时候Producer再次发起消息重试,就有可能造成消息重复。

Rocket MQ的处理⽅式, 是会在发送消息时,给每条消息分配一个唯一的ID

//org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
//for MessageBatch,ID has been set in the generating processif ( !(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}public static void setUniqID(final Message msg) {if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null){msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,createUniqID());}
}

通过这个ID ,就可以判断消息是否重复投递。

⽽对于Kafka ,则会通过他的幂等性配置, 防⽌⽣产者重复投递消息造成的幂等性问题。

Kafka中, 需要打开idempotence幂等性控制后(默认是打开的,但是如果其他配置有冲突,会影响幂等性配 ) Kafka为了保证消息发送的Exactly-once语义,增加了⼏个概念:

  • .  PID:每个新的Producer在初始化的过程中就会被分配—个唯—的PID 。这个PID对⽤户是不可⻅的。
  • .  Sequence Numer: 对于每个PID 这个Producer针对Partition会维护—个sequenceNumber 。这是—个从 0开始单调递增的数字 。当Producer要往同—个Partition发送消息时, 这个Sequence Number就会加1 然后会随着消息—起发往Broker
  • .  Broker端则会针对每个<PID,Partition>维护—个序列号( SN), 只有当对应的SequenceNumber = SN+1 时, Broker才会接收消息, 同时将SN更新为SN+1 否则,SequenceNumber过⼩就认为消息已经写⼊了,不需要再重复写⼊ 。⽽如果SequenceNumber过⼤ ,就会认为中间可能有数据丢失了 对⽣产者就会 抛出—个OutOfOrderSequenceException

 

2 、消费者消费消息如何保持幂等 

这⾥以Rocket MQ来讨论如何防⽌消费者多次重复消费同—条消息。

⾸先,关于消息会如何往消费者投递 Rocket MQ官⽹明确做了回答: 

也就是说,在⼤多数情况下,不需要单独考虑消息重复消费的问题 。但是, 同样, 这个回答⾥也说明了,存在 —些⼩概率情况, 需要单独考虑消费者的消息幂等问题。

⾄于有哪些⼩概率情况呢?最典型的情况就是络出现波动的时候 Rocket MQ是通过消费者的响应机制来推offset的,如果consumerbroker上获取了消息,正常处理之后,他要往broker返回—个响应,但是如果⽹ 络出现波动,consumerbroker上拿取到了消息,但是等到他向broker发响应时,发⽣⽹络波动, 这个响应   丢失了,那么就会造成消息的重复消费 。因为broker没有收到响应,就会向这个Consumer所在的Group重复投递消息。

然后,消费者如何防⽌重复消费呢?

防⽌重复消费, 最主要是要找到—个唯—性的指标 。在Rocket MQ中, Producer发出—条消息后, Rocket MQ 内部会给每—条消息分配—个唯—的messageId 。⽽这个messageIdConsumer中是可以获取到的 。所以⼤ 多数情况下, 这个messageId就是—个很好的唯—性指标 Consumer只要将处理过的messageId记录下来, 就可以判断这条消息之前有没有处理过。

但是同样也有—些特殊情况 。如果Producer是采⽤批量发送,或者是事务消息机制发送,那么这个messageId 就没有那么好控制了 。所以,如果在真实业务中,更建议根据业务场景来确定唯—指标 。例如,在电商下单的  场景,订单ID就是—个很好的带有业务属性的唯—指标 。在使⽤Rocket MQ时,可以使⽤messagekey属性   来传递订单ID 。这样Consumer就能够⽐较好的防⽌重复消费。

最后,对于幂等性问题, 除了要防⽌重复消费外, 还需要防⽌消费丢失 。也就Consumer—直没有正常消费 消息的情况。

Rocket MQ中, 重复投递的消息,会单独放到以消费者组为维度构建的重试对列中 。如果经过多次重试后还 是⽆法被正常消费,那么最终消息会进⼊到消费者组对应的死信对列中 。也就是说,如果Rocket MQ中出现了 死信对列,那么就意味着有—批消费者的逻辑是—直有问题的, 这些消息始终⽆法正常消费 。这时就需要针对 死信对列, 单独维护—个消费者,对这些错误的业务消息进⾏补充处理 。这⾥需要注意—下的是, Rocket MQ  中的死信对列,默认权限是⽆法消费的, 需要⼿动调整权限才能正常消费。

 四、MQ如何快速处理积压的消息

1 、消息积压会有哪些问题。

Rocket MQKafka来说,他们的消息积压能⼒本来就是很强的, 因此,短时间的消息积压, 是没有太多问题  。但是需要注意,如果消息积压问题—直得不到解决, Rocket MQKafka在⽇志⽂件过期后,就会直接删除 过期的⽇志⽂件 。⽽这些⽇志⽂件上未消费的消息,就会直接丢失。

⽽对RabbitMQ来说, Classic Queue经典对列和Quorum Queue仲裁对列,如果有⼤量消息积压,未被消费,就会严重影响服务端的性能, 因此需要重点关注 。⽽⾄于Stream Queue流式对列,整体的处理机制已经 Rocket MQKafka ⽐较相似了,对消息积压的承受能⼒就会⽐较强 。但是还是需要注意和Rocket MQKafka相同的问题。

2 、怎么处理⼤量积压的消息

产⽣消息积压的根本原因还是Consumer处理消息的效率太低,所以最核⼼的⽬标还是要提升Consumer消费消息的效率 。如果不能从业务上提升Consumer消费消息的性能,那么最直接的办法就是针对处理消息⽐较慢 的消费者组,增加更多的Consumer实例 。但是这⾥需要注意—下,增加Consumer实例是不是会有上限。

对于RabbitMQ ,如果是Classic Queue经典对列,那么针对同—个Queue的多个消费者, 是按照Work Queue 的模式,在多个Consuemr之间依次分配消息的 。所以这时,如Consumer消费能⼒不够,那么直接加更多  Consumer实例就可以了 。这⾥需要注意下的是如果各个Consumer实例他们的运⾏环境,或者是处理消息  的速度有差别 。那么可以优化—下每个Consumer的⽐重(Qos属性) ,从⽽尽量⼤的发挥Consumer实例的性能。

⽽对于Rocket MQ 因为同—个消费者组下的多个Cosumer需要和对应Topic下的MessageQueue建⽴对应关   系,⽽—个MessageQueue最多只能被—个Consumer消费, 因此,增加的Consumer实例最多也只能和Topic 下的MessageQueue个数相同 。如果此时再继续增加Consumer的实例,那么就会有些Consumer实例是没有  MessageQueue去消费的, 因此也就没有⽤了。

这时,如果Topic下的MessageQueue配置本来就不够多的话,那就⽆法—直增加Consumer节点个数了 。这时 怎么处理呢?如果要快速处理积压的消息,可以创建—个新的Topic ,配置⾜够多的MessageQueue 。然后把   Consumer实例的Topic转向新的Topic ,并紧急上线—组新的消费者, 只负责消费旧Topic的消息,并转存到 新的Topic 。这个速度明显会⽐普通Consumer处理业务逻辑要快很多 。然后在新的Topic上,就可以通过添   加消费者个数来提⾼消费速度了 。之后再根据情况考虑是否要恢复成正常情况。

其实这种思路和Rocket MQ内部很多特殊机制的处理⽅式是—样的 。例如固定级别的延迟消息机制,也是 把消息临时转到—个系统内部的Topic下,处理过后,再转回来。 

⾄于Kafka ,也可以采⽤和Rocket MQ相似的处理⽅式。

请问RocketMQ消息积压一般产生原因是什么?如何解决消息积压问题呢?


一般消息出现堆积原因有:


● 消费者消息处理逻辑异常,导致消息无法正常消费。
● 消息生产应用出现突发流量,消息生产速度远大于消费速度,消息来不及消费出现堆积。
● 消费者依赖的下游服务耗时变长,消费线程阻塞等。
● 消费线程不够,消费并发度较小,消费速度跟不上生产速度。
 
解决方案有:
(1)确认消息的消费耗时是否合理,通过打印堆栈日志信息分析如果是消费耗时较长,可以参考出来解决方案:
        1. 分析和优化业务逻辑
                ● 简化逻辑:仔细分析业务逻辑,去除不必要的步骤和复杂计算。
                ● 分解任务:将复杂的任务分解为多个简单的子任务,逐步处理。
                ● 异步处理:对于不需要立即完成的任务,考虑使用异步处理,将其放到后台执行。
        2. 使用并行和并发技术
                ● 多线程处理:在消费者内部使用多线程来并行处理消息。
                ● 批量处理:如果业务允许,合并多条消息进行批量处理,减少处理次数。
        3. 优化I/O操作
                ● 数据库优化:优化数据库查询,使用索引、减少查询次数或使用批量操作。
                ● 缓存使用:对于频繁访问的数据,使用缓存来减少数据库或外部服务的访问次数。
                ● 网络优化:减少网络请求的次数和延迟,使用更高效的协议或配置
(2)如果消费耗时正常,则有可能是因为消费并发度不够导致消息堆积,需要逐步调大消费线程或扩容节点来解决。
(3)设置消息过期时间
在消息发送时设置TTL,消息在队列中超过一定时间后自动过期并被丢弃。这样可以确保系统不会处理过期的消息。这个要看具体的业务场景。

五、Rocket MQ课程总结

所有的MQ ,其实处理的都是—类相似的问题 。但是, 互联⽹却诞⽣了不下⼏⼗种MQ的产品 。为什么都做着差 不多的功能,但是却有这么多不同的产品呢?这就需要对MQ的业务场景进⾏逐步的深度挖掘 。把业务问题理   解得越深刻,那么对这些不同产品的理解才会更深刻, ⽇后处理各种各样的业务问题,也才会有更多的可选⽅  案,或者,换种说法,就是经验 。这才是程序员最⼤的价值。

http://www.dtcms.com/a/265824.html

相关文章:

  • 映射阿里云OSS(对象存储服务)
  • [创业之路-467]:企业经营层 - 《营销管理》的主要内容、核心思想以及对创业者的启示
  • 【Spring boot】tomcat Jetty Undertow对比,以及应用场景
  • Qt 事件
  • 医科+AI!和鲸支持南京医科大学医学数据挖掘课程实践教学落地
  • CCLinkIE转EtherCAT:食品产线测厚仪的“精准心跳”如何跳动?
  • 重学React(二):添加交互
  • 运维服务部中级服务工程师面试试题
  • 【Spring篇09】:制作自己的spring-boot-starter依赖1
  • 服务器如何配置防火墙规则开放/关闭端口?
  • ROS2---话题重映射
  • 能生成二维码的浏览器插件来了
  • 模型训练复习
  • RabbitMQ 高级特性之发送方确认
  • 12、jvm运行期优化
  • .Net Core 中RabbitMQ基本使用
  • [自然语言处理]计算语言的熵
  • 【Python办公】Excel转CSV文件(可指定拆分行数\可批量或单个)
  • 用C#编写一个读取磁盘第一扇区的程序
  • 架空线路云台监控系统应对线路故障的智能化解决方案
  • 深度学习中的逻辑回归:从原理到Python实现
  • leetcode:1049. 最后一块石头的重量 II[01背包][动态规划]
  • 实际开发如何快速定位和解决死锁?
  • PM2.5和PM10分别是什么
  • 基于MATLAB的风力发电机无人机巡检路径优化研究
  • 最新PDF转markdown软件MonkeyOCR整合包,文档图片解析工具
  • 深度解析:Java内部类与外部类的交互机制
  • odoo-057 pgadmin 登录忘记密码
  • 【实时Linux实战系列】实时以太网与 TSN 基础
  • ARM单片机启动流程(二)(详细解析)