RocketMQ DefaultMQPushConsumer vs DefaultLitePullConsumer
1. 核心架构差异
DefaultMQPushConsumer (推送模式)
// 服务端主动推送消息到消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 服务端推送消息到这里return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
DefaultLitePullConsumer (拉取模式)
// 客户端主动从服务端拉取消息
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("group");
consumer.subscribe("TopicTest", "*");
consumer.start();// 手动控制拉取
List<MessageExt> messages = consumer.poll();
// 业务处理
consumer.commitSync(); // 手动提交
2. 消费重试机制对比
PushConsumer 重试机制
特点:支持重试,且是自动重试,支持到消息级别
// 自动重试 - 消息级别
public class DefaultMQPushConsumer {// 最大重试次数,默认16次private int maxReconsumeTimes = 16;// 重试消息进入 %RETRY% 队列// 重试间隔:1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
}// 消费失败处理
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {try {// 业务处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {// 返回失败,自动进入重试队列return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}
});
2.2 LitePullConsumer 重试机制
特点:没有内置的重试队列机制,不支持自动重试;要重试需要业务自己实现
// 手动控制重试 - 需要业务代码实现
public class DefaultLitePullConsumer {// 没有内置的重试队列机制// 需要业务自己处理失败消息
}// 手动重试示例
List<MessageExt> messages = consumer.poll(Duration.ofSeconds(10));
for (MessageExt message : messages) {try {processMessage(message);// 处理成功,记录偏移量} catch (Exception e) {// 处理失败,可以选择:// 1. 跳过此消息(可能丢失)// 2. 记录到死信队列手动处理// 3. 业务级重试逻辑log.error("Process message failed, msgId: {}", message.getMsgId(), e);}
}
// 批量提交,无法精确控制单条消息
consumer.commitSync();
3. 消费超时检查对比
PushConsumer 超时控制
特点:后台有消费超时检查,默认15min(从开始消费算起)。消费超时会将消息重发回broker(类似失败重试)。consumeTimeout可自行设置
public class DefaultMQPushConsumer {// 消费超时时间,默认15分钟private long consumeTimeout = 15 * 60 * 1000;// 并发消费线程数控制private int consumeThreadMin = 20;private int consumeThreadMax = 64;
}// 服务端会监控消费进度
// 如果消费线程卡住,Broker会重新投递消息
LitePullConsumer 超时控制
只有拉取超时,没有消费超时的检查
public class DefaultLitePullConsumer {// 拉取超时控制private long pollTimeoutMillis = 10 * 1000;// 业务自己控制处理超时// 没有服务端超时检查
}// 业务自己控制处理时间
List<MessageExt> messages = consumer.poll(Duration.ofSeconds(5));
long startTime = System.currentTimeMillis();
for (MessageExt message : messages) {// 业务需要自己控制单条消息处理超时if (System.currentTimeMillis() - startTime > 30000) {// 超时处理break;}processMessage(message);
}
4. 提交(Commit)机制对比
PushConsumer 提交机制
特点;仅可自动提交
// 自动提交偏移量
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 返回 CONSUME_SUCCESS 后自动提交偏移量// 提交的是这批消息的最大偏移量return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});// 异步提交,默认5秒提交一次
// 提交到 Broker 的消费进度管理
4.2 LitePullConsumer 提交机制
特点:默认自动提交,可手动提交
// 手动提交,更灵活的控制
public class DefaultLitePullConsumer {// 提交方式public void commitSync(); // 同步提交public void commitSync(Map<MessageQueue, Long> offsetTable); // 指定偏移量提交public void commitAsync(); // 异步提交}// 使用示例
List<MessageExt> messages = consumer.poll();
if (!messages.isEmpty()) {// 处理消息boolean allSuccess = true;for (MessageExt message : messages) {if (!processMessage(message)) {allSuccess = false;break;}}if (allSuccess) {// 全部成功才提交consumer.commitSync();}
}
5. 使用场景推荐
DefaultMQPushConsumer 使用场景
适合场景
- 常规业务消息处理
- 需要严格不丢消息的场景
- 希望服务端管理消费状态,减少客户端复杂度
- 消费逻辑相对固定,不需要精细控制
优势
- 自动重试机制完善
- 服务端管理消费进度
- 开发简单,专注业务逻辑
- 消息可靠性高
DefaultLitePullConsumer 使用场景
适合场景
- 批处理任务
- 需要精细控制消费逻辑
- 消息处理时间不确定,需要自定义超时
- 需要实现复杂的分支逻辑
具体用例
// 用例1:批处理控制
List<MessageExt> batch = consumer.poll();
if (batch.size() >= BATCH_SIZE) {processBatch(batch);consumer.commitSync();
}// 用例2:条件消费
List<MessageExt> messages = consumer.poll();
for (MessageExt msg : messages) {if (meetsCondition(msg)) {processMessage(msg);}// 不满足条件的消息跳过,但偏移量会提交// 需要更精细的偏移量管理
}// 用例3:自定义重试
Map<String, Integer> retryCountMap = new HashMap<>();
List<MessageExt> messages = consumer.poll();
for (MessageExt msg : messages) {String msgId = msg.getMsgId();int retryCount = retryCountMap.getOrDefault(msgId, 0);if (retryCount < MAX_RETRY) {if (processWithRetry(msg)) {retryCountMap.remove(msgId);} else {retryCountMap.put(msgId, retryCount + 1);// 不提交此消息偏移量}} else {// 超过重试次数,进入死信队列sendToDLQ(msg);}
}
6. 总结对比

选择建议
- 大部分业务场景推荐使用 DefaultMQPushConsumer
- 只有在需要精细控制消费逻辑时才使用 DefaultLitePullConsumer
- 从 Push 切换到 Pull 需要慎重,因为失去了很多内置的可靠性保障
