RocketMQ代码分析——DefaultLitePullConsumer
DefaultLitePullConsumer是业务消费时主动拉取的入口
分析
消费

每个队列一个PullTaskImpl,不停拉取消息放进阻塞队列consumeRequestCache。
拉取之前会有一些内存中堆积消息数量、消息大小等的检查,超过阈值会延迟拉取消息


PullTask是通过rebalance获取到要消费的queue,然后创建对应的PullTask,remove不再消费的queue的PullTask

位点
提交给业务消费之前,会先根据ProcessQueue中的位点更新assignedMessageQueueState(MessageQueueState)中的消费位点

业务手动commit时,更新offsetStore中的位点


如果是自动commit(默认), poll时会检查是否到自动commit的间隔,然后进行commit

commit是更新位点到offsetStore,位点还是存在本地。client会定时同步offsetStore中的位点到broker


注意

poll拉取消息时,直接从内存中移除消息,并且更新offset了,所以如果业务代码拉取消息之后消费失败,消息也不会重试消费了。
所以使用poll的方式需要在业务代码中保证对拉取到的一批消息一定消费成功
示例
初始化
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.setNamesrvAddr(namesrv地址);consumer.setAutoCommit(false); //关闭自动commitconsumer.setPullBatchSize(128);consumer.setPullThresholdForAll(20 * 1000); // consumeRequest size * pullBatchSize < pullThresholdForAll 156consumer.setPullThresholdForQueue(10 * 1000); // queue level cachedMessageCount limitconsumer.setConsumeMaxSpan(100 * 1000); // queue level message span limitconsumer.setPullThresholdSizeForQueue(100); // queue level message size limitconsumer.subscribe("test_topic", "*");consumer.start();this.litePullConsumer = consumer;
消费消息
List<MessageExt> batch = this.litePullConsumer.poll(5);if (batch.size() == 0) {return;}// may block for a whilethis.processBatch(batch); //要保证一定消费成功if (this.running) {this.litePullConsumer.commitSync(); //手动commit}
