当前位置: 首页 > news >正文

rocketmq 拉取消息

理解netty的回调

对于都在内存里的回调, 是所有的调用栈持有回调对象,所以在当前执行节点执行完成时,可以直接从内存中拿到回调对象,直接回调即可

针对于网络编程,netty, 需要跨机器经历网络传输,所以不可能调用栈里持有回调对象,所以会额外使用map记录回调对象future, 方便在回调时, 通过requestid找到对应的回调对象,进行回调方法的调用;
所以发起调用的是 channalWrite(), 而netty发起回调的又是另外的方法,channelRecieve()方法。
要把来个方法联系起来看;

如果遇到线程start()方法, 不要看start, 而要直接取看run()方法

获取消息的方式

消费者并不是每次要消费一条数据就向Broker获取一条数据的,这样RPC的开销太大了,因此先从Broker获取一批数据到内存中,再进行消费

消费端获取消息通常有三种方式:推送消息、拉取消息、长轮询(推拉结合)

推送消息:消息持久化到Broker后,Broker监听到有新消息,主动将消息推送到对应的消费者

Broker主动推送消息具有很好的实时性,但如果消费端没有流控,推送大量消息时会增加消费端压力,导致消息堆积、吞吐量、性能下降

拉取消息:消费端可以根据自身的能力主动向Broker拉取适量的消息,但不好预估拉取消息的频率,拉取太慢会导致实时性差,拉取太快可能导致压力大、消息堆积

长轮询:在拉取消息的基础上进行改进,如果在broker没拉取到消息,则会等待一段时间,直到消息到达或超时再触发拉取消息

长轮询相当于在拉取消息的同时,通过监听消息到达,增加推送的优点,将拉取、推送的优点结合,但长连接会更占资源,大量长连接会导致开销大

RocketMQ中常用的消费者DefaultMQPushConsumer,虽然从名字看是“推送”的方式,但获取消息用的是长轮询的方式

这种特殊的拉取消息方式能到达实时推送的效果,并在消费者端做好流控(拉取消息达到阈值就延时拉取)以防压力过大

拉取消息原理

在这里插入图片描述
点进去start方法看,我们自定义的listener被注册入 private MessageListener messageListener;,等待拉到消息后被回调(在大的交互流程中,我们的消费是被回调的一部分)

主要涉及两个类 DefaultMQPushConsumer的内部实现DefaultMQPushConsumerImpl有一个MQ客户端实例MQClientInstance 它内部包含的PullMessageService组件,就是用于长轮询拉取消息的

拉取简化的流程为:
从队列取出PullRequest,然后封装请求向Broker异步发送
响应后通过回调将查到的消息放入其内存队列中,方便后续消费
在此期间最终都会将PullRequest放回队列(失败可能延时放回),便于下次拉取该队列的消息
如下图
在这里插入图片描述
在这里插入图片描述
进入PullMessageService的线程类run方法
在这里插入图片描述

  • pullMessage最终会调用DefaultMQPushConsumerImpl.pullMessage,代码虽然很多,但主要流程为校验、获取参数、调用核心方法

  • 进行参数、状态、流控的校验,如果失败会调用executePullRequestLater后续延时50ms将拉取请求重新放回队列中,也就是后续再进行该队列的消息拉取

  • 如果是第一次执行,要获取消费进度的偏移量computePullFromWhereWithException,后续使用PullRequest上的nextOffset(集群模式向Broker获取)

  • 获取消费端相关信息(后续会封装成请求),创建回调,回调在RPC后调用

  • 执行拉取消息的核心方法 pullKernelImpl

public void pullMessage(final PullRequest pullRequest) {//获取内存队列final ProcessQueue processQueue <

相关文章:

  • AI智能体 | 使用Coze一键制作“假如书籍会说话”视频,18个作品狂吸17.6万粉,读书博主新标杆!(附保姆级教程)
  • 输入一个正整数,将其各位数字倒序输出(如输入123,输出321)
  • 【行为型之模板方法模式】游戏开发实战——Unity标准化流程与可扩展架构的核心实现
  • Prometheus 的介绍与部署(入门)
  • 第二章 变量和运算符
  • git push 报错:send-pack: unexpected disconnect while reading sideband packet
  • c#队列及其操作
  • vscode调试c/c++
  • 在linux中,如何使用malloc()函数向操作系统申请堆内存,使用free()函数释放内存。
  • python打包exe报错:处理文件时错误:Excel xlsx file; not supported
  • Python常见问题
  • 深入理解 Dijkstra 算法:原理、实现与优化
  • openfeign与dubbo调用下载excel实践
  • 如何获得sqoop-1.4.6.2.3.99.0-195.jar
  • 保持视频二维码不变,更新视频的内容
  • GMT之Bash语言使用
  • 濒危仙草的重生叙事:九仙尊米斛花节如何以雅集重构中医药文化IP
  • Qt原型模式实现与应用
  • (4)python开发经验
  • BRPickerView
  • 明查| 新一代AI诊疗系统可3秒筛查13种癌症?没有证据
  • 外交部:反对美方人士发表不负责任谬论
  • 上海145家博物馆、73家美术馆将减免费开放
  • 重庆市委原常委、政法委原书记陆克华被决定逮捕
  • 人民日报评外卖平台被约谈:摒弃恶性竞争,实现行业健康发展
  • 在笔墨金石间,看胡问遂与梅舒适的艺术对话