当前位置: 首页 > news >正文

RocketMQ DefaultMQPushConsumer vs DefaultLitePullConsumer

1. 核心架构差异

DefaultMQPushConsumer (推送模式)

// 服务端主动推送消息到消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 服务端推送消息到这里return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});

DefaultLitePullConsumer (拉取模式)

// 客户端主动从服务端拉取消息
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("group");
consumer.subscribe("TopicTest", "*");
consumer.start();// 手动控制拉取
List<MessageExt> messages = consumer.poll();
// 业务处理
consumer.commitSync(); // 手动提交

2. 消费重试机制对比

PushConsumer 重试机制

特点:支持重试,且是自动重试,支持到消息级别

// 自动重试 - 消息级别
public class DefaultMQPushConsumer {// 最大重试次数,默认16次private int maxReconsumeTimes = 16;// 重试消息进入 %RETRY% 队列// 重试间隔:1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
}// 消费失败处理
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {try {// 业务处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {// 返回失败,自动进入重试队列return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}
});

2.2 LitePullConsumer 重试机制

特点:没有内置的重试队列机制,不支持自动重试;要重试需要业务自己实现

// 手动控制重试 - 需要业务代码实现
public class DefaultLitePullConsumer {// 没有内置的重试队列机制// 需要业务自己处理失败消息
}// 手动重试示例
List<MessageExt> messages = consumer.poll(Duration.ofSeconds(10));
for (MessageExt message : messages) {try {processMessage(message);// 处理成功,记录偏移量} catch (Exception e) {// 处理失败,可以选择:// 1. 跳过此消息(可能丢失)// 2. 记录到死信队列手动处理// 3. 业务级重试逻辑log.error("Process message failed, msgId: {}", message.getMsgId(), e);}
}
// 批量提交,无法精确控制单条消息
consumer.commitSync();

3. 消费超时检查对比

PushConsumer 超时控制

特点:后台有消费超时检查,默认15min(从开始消费算起)。消费超时会将消息重发回broker(类似失败重试)。consumeTimeout可自行设置

public class DefaultMQPushConsumer {// 消费超时时间,默认15分钟private long consumeTimeout = 15 * 60 * 1000;// 并发消费线程数控制private int consumeThreadMin = 20;private int consumeThreadMax = 64;
}// 服务端会监控消费进度
// 如果消费线程卡住,Broker会重新投递消息

LitePullConsumer 超时控制

只有拉取超时,没有消费超时的检查

public class DefaultLitePullConsumer {// 拉取超时控制private long pollTimeoutMillis = 10 * 1000;// 业务自己控制处理超时// 没有服务端超时检查
}// 业务自己控制处理时间
List<MessageExt> messages = consumer.poll(Duration.ofSeconds(5));
long startTime = System.currentTimeMillis();
for (MessageExt message : messages) {// 业务需要自己控制单条消息处理超时if (System.currentTimeMillis() - startTime > 30000) {// 超时处理break;}processMessage(message);
}

4. 提交(Commit)机制对比

PushConsumer 提交机制

特点;仅可自动提交

// 自动提交偏移量
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 返回 CONSUME_SUCCESS 后自动提交偏移量// 提交的是这批消息的最大偏移量return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});// 异步提交,默认5秒提交一次
// 提交到 Broker 的消费进度管理

4.2 LitePullConsumer 提交机制

特点:默认自动提交,可手动提交

// 手动提交,更灵活的控制
public class DefaultLitePullConsumer {// 提交方式public void commitSync();                    // 同步提交public void commitSync(Map<MessageQueue, Long> offsetTable); // 指定偏移量提交public void commitAsync();                   // 异步提交}// 使用示例
List<MessageExt> messages = consumer.poll();
if (!messages.isEmpty()) {// 处理消息boolean allSuccess = true;for (MessageExt message : messages) {if (!processMessage(message)) {allSuccess = false;break;}}if (allSuccess) {// 全部成功才提交consumer.commitSync();}
}

5. 使用场景推荐

DefaultMQPushConsumer 使用场景

适合场景
  1. 常规业务消息处理
  2. 需要严格不丢消息的场景
  3. 希望服务端管理消费状态,减少客户端复杂度
  4. 消费逻辑相对固定,不需要精细控制
优势
  • 自动重试机制完善
  • 服务端管理消费进度
  • 开发简单,专注业务逻辑
  • 消息可靠性高

DefaultLitePullConsumer 使用场景

适合场景
  1. 批处理任务
  2. 需要精细控制消费逻辑
  3. 消息处理时间不确定,需要自定义超时
  4. 需要实现复杂的分支逻辑
具体用例
// 用例1:批处理控制
List<MessageExt> batch = consumer.poll();
if (batch.size() >= BATCH_SIZE) {processBatch(batch);consumer.commitSync();
}// 用例2:条件消费
List<MessageExt> messages = consumer.poll();
for (MessageExt msg : messages) {if (meetsCondition(msg)) {processMessage(msg);}// 不满足条件的消息跳过,但偏移量会提交// 需要更精细的偏移量管理
}// 用例3:自定义重试
Map<String, Integer> retryCountMap = new HashMap<>();
List<MessageExt> messages = consumer.poll();
for (MessageExt msg : messages) {String msgId = msg.getMsgId();int retryCount = retryCountMap.getOrDefault(msgId, 0);if (retryCount < MAX_RETRY) {if (processWithRetry(msg)) {retryCountMap.remove(msgId);} else {retryCountMap.put(msgId, retryCount + 1);// 不提交此消息偏移量}} else {// 超过重试次数,进入死信队列sendToDLQ(msg);}
}

6. 总结对比

在这里插入图片描述

选择建议

  • 大部分业务场景推荐使用 DefaultMQPushConsumer
  • 只有在需要精细控制消费逻辑时才使用 DefaultLitePullConsumer
  • 从 Push 切换到 Pull 需要慎重,因为失去了很多内置的可靠性保障
http://www.dtcms.com/a/611173.html

相关文章:

  • php和mysql网站毕业设计成都餐饮设计公司有哪些
  • 甘肃统计投资审核系统完成国产数据库替换:从MySQL到金仓的平稳跨越
  • 征求网站建设意见的通知seo优化网站排名
  • 电商网站流程优秀网络广告文案案例
  • 怎么做个人网站建设wordpress 迁移 工具
  • 两台arm服务器之间实现实时同步
  • 国外设计参考网站公司如何做网站宣传
  • 多用户自助建站系统wordpress iis 500.50
  • 福州网站建设需要多少钱ui设计的优势与不足
  • 网站建设方案书的内容网上学编程
  • 经典算法题之子集(四)
  • 自己动手写深度学习框架(反向传播)
  • 网站多大需要服务器活动手机网站开发
  • 网站推广原则做个网站大约多少钱
  • 政府机关选用GS 90盘位存储,保存Veeam备份数据
  • MySQL: 服务器性能优化全面指南:参数配置与数据库设计的最佳实践
  • 垫江集团网站建设商城外贸网站设计
  • 网站建设与维护方式电商设计课程
  • C语言进阶:文件管理(一)
  • 操作教程 | OpenHIS医院版:设置处方模板
  • 使用List集合专项实验
  • 网站开发程序用什么好wordpress 新建页面 超链接
  • 嘉兴网站开发学校2008建立的php网站慢
  • 训练100B 以上参数需要多少硬件?
  • 找深圳做网站的公司网页设计新手制作的网站代码
  • 怎么通过域名做网站dw做网页的步骤和代码
  • Linux学习日记12:无名通道与有名通道
  • 征程 6X 常见 kernel panic 问题
  • 复盘与导出工具最新版V35.0版本更新----修复东财智能选股,预测量能,开盘啦涨停闪退,炸板数量不匹配问题
  • 招聘网站咋做珠海溢动网络科技有限公司