rocketmq 拉取消息
理解netty的回调
对于都在内存里的回调, 是所有的调用栈持有回调对象,所以在当前执行节点执行完成时,可以直接从内存中拿到回调对象,直接回调即可
针对于网络编程,netty, 需要跨机器经历网络传输,所以不可能调用栈里持有回调对象,所以会额外使用map记录回调对象future, 方便在回调时, 通过requestid找到对应的回调对象,进行回调方法的调用;
所以发起调用的是 channalWrite(), 而netty发起回调的又是另外的方法,channelRecieve()方法。
要把来个方法联系起来看;
如果遇到线程start()方法, 不要看start, 而要直接取看run()方法
获取消息的方式
消费者并不是每次要消费一条数据就向Broker获取一条数据的,这样RPC的开销太大了,因此先从Broker获取一批数据到内存中,再进行消费
消费端获取消息通常有三种方式:推送消息、拉取消息、长轮询(推拉结合)
推送消息:消息持久化到Broker后,Broker监听到有新消息,主动将消息推送到对应的消费者
Broker主动推送消息具有很好的实时性,但如果消费端没有流控,推送大量消息时会增加消费端压力,导致消息堆积、吞吐量、性能下降
拉取消息:消费端可以根据自身的能力主动向Broker拉取适量的消息,但不好预估拉取消息的频率,拉取太慢会导致实时性差,拉取太快可能导致压力大、消息堆积
长轮询:在拉取消息的基础上进行改进,如果在broker没拉取到消息,则会等待一段时间,直到消息到达或超时再触发拉取消息
长轮询相当于在拉取消息的同时,通过监听消息到达,增加推送的优点,将拉取、推送的优点结合,但长连接会更占资源,大量长连接会导致开销大
RocketMQ中常用的消费者DefaultMQPushConsumer,虽然从名字看是“推送”的方式,但获取消息用的是长轮询的方式
这种特殊的拉取消息方式能到达实时推送的效果,并在消费者端做好流控(拉取消息达到阈值就延时拉取)以防压力过大
拉取消息原理
点进去start方法看,我们自定义的listener被注册入 private MessageListener messageListener;,等待拉到消息后被回调(在大的交互流程中,我们的消费是被回调的一部分)
主要涉及两个类 DefaultMQPushConsumer的内部实现DefaultMQPushConsumerImpl有一个MQ客户端实例MQClientInstance 它内部包含的PullMessageService组件,就是用于长轮询拉取消息的
拉取简化的流程为:
从队列取出PullRequest,然后封装请求向Broker异步发送
响应后通过回调将查到的消息放入其内存队列中,方便后续消费
在此期间最终都会将PullRequest放回队列(失败可能延时放回),便于下次拉取该队列的消息
如下图
进入PullMessageService的线程类run方法
-
pullMessage最终会调用DefaultMQPushConsumerImpl.pullMessage,代码虽然很多,但主要流程为校验、获取参数、调用核心方法
-
进行参数、状态、流控的校验,如果失败会调用executePullRequestLater后续延时50ms将拉取请求重新放回队列中,也就是后续再进行该队列的消息拉取
-
如果是第一次执行,要获取消费进度的偏移量computePullFromWhereWithException,后续使用PullRequest上的nextOffset(集群模式向Broker获取)
-
获取消费端相关信息(后续会封装成请求),创建回调,回调在RPC后调用
-
执行拉取消息的核心方法 pullKernelImpl
public void pullMessage(final PullRequest pullRequest) {//获取内存队列final ProcessQueue processQueue <