当前位置: 首页 > news >正文

RocketMQ代码分析——DefaultLitePullConsumer

DefaultLitePullConsumer是业务消费时主动拉取的入口

分析

消费

在这里插入图片描述

每个队列一个PullTaskImpl,不停拉取消息放进阻塞队列consumeRequestCache。

拉取之前会有一些内存中堆积消息数量、消息大小等的检查,超过阈值会延迟拉取消息
在这里插入图片描述
在这里插入图片描述

PullTask是通过rebalance获取到要消费的queue,然后创建对应的PullTask,remove不再消费的queue的PullTask
在这里插入图片描述

位点

提交给业务消费之前,会先根据ProcessQueue中的位点更新assignedMessageQueueState(MessageQueueState)中的消费位点
在这里插入图片描述
业务手动commit时,更新offsetStore中的位点
在这里插入图片描述
在这里插入图片描述

如果是自动commit(默认), poll时会检查是否到自动commit的间隔,然后进行commit
在这里插入图片描述

commit是更新位点到offsetStore,位点还是存在本地。client会定时同步offsetStore中的位点到broker
在这里插入图片描述
在这里插入图片描述

注意

在这里插入图片描述
poll拉取消息时,直接从内存中移除消息,并且更新offset了,所以如果业务代码拉取消息之后消费失败,消息也不会重试消费了。

所以使用poll的方式需要在业务代码中保证对拉取到的一批消息一定消费成功

示例

初始化

 DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.setNamesrvAddr(namesrv地址);consumer.setAutoCommit(false); //关闭自动commitconsumer.setPullBatchSize(128);consumer.setPullThresholdForAll(20 * 1000); // consumeRequest size * pullBatchSize < pullThresholdForAll 156consumer.setPullThresholdForQueue(10 * 1000); // queue level cachedMessageCount limitconsumer.setConsumeMaxSpan(100 * 1000); // queue level message span limitconsumer.setPullThresholdSizeForQueue(100); // queue level message size limitconsumer.subscribe("test_topic", "*");consumer.start();this.litePullConsumer = consumer;

消费消息

		List<MessageExt> batch = this.litePullConsumer.poll(5);if (batch.size() == 0) {return;}// may block for a whilethis.processBatch(batch); //要保证一定消费成功if (this.running) {this.litePullConsumer.commitSync(); //手动commit}
http://www.dtcms.com/a/611124.html

相关文章:

  • 六安网站建设招聘企业电子商务网站建设规划
  • Qt开发——常见控件(1)
  • 【WSL】C盘迁移
  • 上海小企业网站建设平台天眼查企业查询
  • 建设团购网站电子商务网站开发公司
  • 1.1.1 将TIA Opennes中添加本电脑用户
  • 代码随想录 763.划分字母区间
  • 网站导航包括only网站建设分析
  • 网站建站要多少钱智慧团建网站登录平台官网
  • 基于PVLIB的光伏发电量计算模型:SAPM-Sandia模型的原理与全流程解析
  • redis 在网站开发中怎么用安阳信息港网站
  • 30、【Ubuntu】【远程开发】内网穿透:反向隧道建立(二)
  • 文化厅网站建设审核报告单无锡正规网站seo公司
  • Swift中View和ViewController的生命周期
  • 网站建设是前端么网站开发的岗位及职责
  • 视频网站建设的意义论文网络营销导向网站建设的基础
  • iTwin开源包系列(二)grid组件
  • wordpress编辑器插件ueditorseo搜索引擎优化原理
  • 青岛哪家公司做网站好网站建设需求调研
  • Java线程池原理深度解析
  • AI入门知识之RAFT方法:基于微调的RAG优化技术详解
  • 怎么用word做一个网站网络企业做网站
  • 百度做网站教程房地产集团网站建设方案
  • 文心 5.0:原生全模态时代的技术分水岭
  • 多模式融合(GFS/GRAPES/ICON/GEM)在新能源预测中的对比与加权(工程版)
  • 25级第一次测试题解
  • 常用网站域名学做窗帘的网站
  • 网站制作基础教程网站建设的软件平台
  • MySQL数据库操作完全指南:从创建到管理的完整教程
  • C语言编译器在线编译 | 提供快速高效的C语言编译环境,适用于学习与开发