当前位置: 首页 > news >正文

K8S重启引发的RocketMQ消息丢失问题记录

K8S重启引发的RocketMQ消息丢失问题归档

一句话总结:生产环境出现诡异的消息"丢失",最终发现是K8S强制Kill进程 + Redisson看门狗续期 + 防并发设计在异常场景下的失效,三者共同作用下的完美风暴。

写作背景

最近在整理去年的工作笔记时,翻到了一个当时困扰了我们团队近4个月的线上问题。回过头看这个问题的排查过程,发现其中涉及到容器化环境下进程管理、消息队列消费机制、分布式锁等多个技术点的交互,具有一定的典型性和参考价值。因此整理成文,希望能给遇到类似问题的同学一些启发。

问题的最终解决方案非常简单——在启动脚本前加一个exec命令。但在找到根因之前,我们走了不少弯路。这个案例也再次印证了:在分布式系统中,看似简单的问题背后往往隐藏着多个组件交互的复杂时序。

问题现象

现象描述

去年3月开始,生产环境出现了一个诡异的问题:3月、5月和6月各出现了1笔订单的返利没有到账。运营团队多次反馈,但排查起来非常困难:

  • 数据库事务执行正常
  • 网络连接稳定
  • 业务逻辑代码经过多轮Review未发现问题
  • 因为返利业务的周期性缘故,用户报障时为订单产生的下个月,生产日志已无法追溯

这个问题持续了近4个月,每次发生后都找不到明确的原因。直到我们开始关注问题发生的时间规律,才找到了突破口。

  • 影响范围:共三笔订单返利未到账
  • 发生频率:偶发性问题,发生频率较低
  • 业务影响:用户投诉,需要人工补录数据
  • 技术表现:RocketMQ Console显示消息已被确认消费,但订单未返利

已排除的可能性

在排查初期,我们按照常规思路检查了以下几个方面:

  1. 数据库层面:事务执行正常,没有死锁或超时
  2. 网络层面:应用与消息队列、Redis的网络连接稳定
  3. 代码层面:业务逻辑经过多轮Code Review,未发现明显问题
  4. 中间件层面:RocketMQ和Redis运行正常,无异常告警

排查思路与过程

1. 时间规律分析

在反复出现几次问题后,我们开始记录问题发生的时间点:

# 消息丢失的时间点记录
2024-03-15 02:30:xx  # 凌晨发版时间窗口
2024-05-24 03:15:xx  # 凌晨发版时间窗口
2024-06-14 02:45:xx  # 凌晨发版时间窗口

关键发现:所有问题都发生在K8S滚动更新期间。这个规律让我们将排查重点转向了容器重启场景

想象中的公司容器的停机流程

正常情况下,K8S的停机流程应该是这样的:

K8S发送SIGTERM信号
Spring Boot接收信号
开始优雅停机
停止接收新请求
等待现有请求处理完成
关闭MQ消费者
释放资源
进程退出

标准流程看起来很完美:

  1. 发送 kill -15 (SIGTERM) 信号,通知应用优雅停机
  2. 等待优雅停机时间窗口
  3. 执行 kill -9 (SIGKILL) 强制终止进程
现实的残酷真相

然而,通过与容器云同事的深入交流,发现了一个致命问题
kubectl delete pod 时是向容器内 PID 1 发送 SIGTERM 也就是 kill -15 PID 1号进程

# 容器启动脚本
#!/bin/bash
java ${APM_SET} ${APOLLO_SET} -jar app.jar

问题分析

  1. K8S发送kill -15信号给1号进程
  2. 但是容器中的1号进程是shell脚本,不是java进程
  3. Java进程成为了子进程,无法接收到SIGTERM信号
  4. 等待优雅停机超时后,K8S直接执行kill -9强制杀死进程
  5. 正在处理的MQ消息直接丢失!
PID 1号进程的身份危机

让我们深入容器内部,看看进程的真实面貌:

# 在容器内执行 ps aux
# 预期的进程树(理想状态)
USER   PID  %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root     1   0.1 15.2 2847316 621312 ?      Sl   09:30   0:05 java -jar app.jar# 实际的进程树(问题根源)
USER   PID  %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root     1   0.0  0.1  11284  2816 ?        Ss   09:30   0:00 /bin/sh -c startup.sh
root    42   0.1 15.2 2847316 621312 ?       Sl   09:30   0:05 java -jar app.jar

问题一目了然

  • 1号进程:shell脚本 (/bin/sh)
  • 42号进程:Java应用 (真正干活的)

当K8S发送SIGTERM信号时:

# K8S的操作
kill -15 1  # 只通知了shell进程
# shell进程收到信号后直接退出,没有转发给Java进程
# 然后30秒后...
kill -9 42  # 直接强杀Java进程!

这就像是层层代理的沟通问题:

  • K8S (老板): “通知Java应用优雅下班”
  • Shell (中间管理): 收到消息但没有传达
  • Java应用 (员工): 正在专心工作,突然被保安拖走

到了这里,心中又浮现一个疑问,服务被强制kill掉,为什么会丢消息呢,带着这个关键线索,我又对消费者代码进行分析

2. 消费者代码分析

我们仔细审查了消息消费者的代码实现:

@Component
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "rebate-consumer-group"
)
public class OrderRebateListener extends BaseRocketmqConsumer<OrderMessage> {@Resourceprivate RedisLockHelper redisLockHelper;@Overridepublic void processMsg(OrderMessage msgDTO, MessageExt msg) {RLock lock = null;try {// 看门狗模式:leaseTime = -1,自动续期lock = redisLockHelper.tryLock("order:" + msgDTO.getOrderId(),0, -1, TimeUnit.MINUTES);if (lock == null) {// 获取锁失败,抛出异常throw new MrpBusinessException(ResultCode.NOT_GET_REDIS_LOCK);}// 处理业务逻辑processRebate(msgDTO);} finally {RedisLockHelper.unLock(lock);}}
}

代码看起来很标准,使用了分布式锁来防止相同订单的消息被并发处理。但问题可能就隐藏在这些看似正常的代码中。

3. 防并发逻辑分析

继续审查基类的实现,我们发现了一个特殊的异常处理逻辑:

public abstract class BaseRocketmqConsumer<T> implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt msg) {try {this.doProcess(msg);} catch (Throwable e) {// 关键点:NOT_GET_REDIS_LOCK异常被特殊处理if (e instanceof MrpBusinessException&& ResultCode.NOT_GET_REDIS_LOCK.getCode().equals(((MrpBusinessException) e).getCode())) {log.warn("未获取到锁,认为消息正在被其他线程处理");// 设计初衷:防止相同key的消息并发处理// 获取不到锁 = 另一个线程正在处理 = 不需要重复处理return; // 直接返回成功,避免重复消费} else {throw e; // 其他异常才会重新抛出}}}
}

这段代码的设计初衷

这是一个防止消息重复消费的设计。正常情况下的逻辑是:

  • 线程A获取锁并处理消息
  • 线程B尝试获取同一个锁失败,认为线程A正在处理,直接返回成功
  • 这样可以避免同一个订单的消息被并发消费

潜在的问题

这个设计基于一个隐含假设——持有锁的线程一定会完成业务处理。但如果在容器重启场景下,持有锁的进程被强制Kill,这个假设就不成立了:

  • 旧Pod的进程被Kill,锁没有释放
  • 新Pod获取锁失败,误认为"正在处理"
  • 实际上没有任何线程在处理这条消息

到这里,我们初步锁定了问题的方向:

可能的原因1:容器被强制kill + RocketMQ消费机制(如果异步+自动确认),就可能导致了消息丢失

可能的原因2:容器被强制kill + 分布式锁 + 防并发逻辑,三者在特定时序下的交互可能导致了消息丢失。

RocketMQ消费机制源码分析

为了理解问题的本质,我们需要深入RocketMQ的源码,了解消息确认的机制。

RocketMQ消费完整流程图(基于4.9.1)

并发消费
顺序消费
正常返回
抛异常
SUCCESS
RECONSUME_LATER
Broker推送消息
Netty接收
DefaultMQPushConsumerImpl
消息拉取入口
PullCallback.onSuccess
拉取成功回调
ProcessQueue
本地消息队列
消费模式?
ConsumeMessageConcurrentlyService
ConsumeMessageOrderlyService
提交到线程池
ThreadPoolExecutor
ConsumeRequest.run
消费任务执行
调用MessageListener.consumeMessage
Spring RocketMQ?
DefaultRocketMQListenerContainer
Spring适配层
直接调用用户Listener
RocketMQListener.onMessage
用户业务代码
执行结果
返回CONSUME_SUCCESS
返回RECONSUME_LATER
processConsumeResult
处理消费结果
消费状态?
更新消费进度offset
发送消息回Broker重试
提交offset到Broker
消息进入重试队列
消费完成

关键时序:消息确认机制

BrokerConsumerDefaultMQPushConsumerImpl消费线程池用户ListenerProcessQueue推送消息批次存入本地队列提交消费任务调用consumeMessage()处理业务逻辑返回(无异常)status = CONSUME_SUCCESS业务异常throw Exceptionstatus = RECONSUME_LATERalt[正常执行][抛出异常]processConsumeResult(status)移除已消费消息更新offset确认sendMessageBack(重试)消息进入重试队列alt[status == CONSUME_SUCCESS][status == RECONSUME_LATER]关键:只有正常返回才确认如果代码内部catch异常并returnRocketMQ认为消费成功!BrokerConsumerDefaultMQPushConsumerImpl消费线程池用户ListenerProcessQueue

关键源码1:消息拉取与分发

RocketMQ Consumer端的核心流程:

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
class ConsumeRequest implements Runnable {@Overridepublic void run() {MessageListenerConcurrently listener =ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;try {// 执行业务消费逻辑status = listener.consumeMessage(msgs, context);} catch (Throwable e) {log.warn("consumeMessage exception", e);// 异常情况下默认重试status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 根据返回状态处理消息确认if (status == null) {status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 处理消费结果ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);}
}

关键源码2:消费结果处理

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
public void processConsumeResult(ConsumeConcurrentlyStatus status,ConsumeConcurrentlyContext context,ConsumeRequest consumeRequest) {int ackIndex = context.getAckIndex();switch (status) {case CONSUME_SUCCESS:// 消费成功:计算成功消费的消息数if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}// 统计消费成功数int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;// 更新统计数据this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup,consumeRequest.getMessageQueue().getTopic(), ok);break;case RECONSUME_LATER:// 消费失败:标记为需要重试ackIndex = -1;break;}// 处理失败的消息for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);// 发送回Broker进行重试boolean result = this.sendMessageBack(msg, context);if (!result) {// 发送失败,设置重新消费msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}// 移除已消费的消息,更新offsetlong offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {// 提交消费进度到Brokerthis.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
}

核心机制总结

  1. 只有正常返回CONSUME_SUCCESS,消息offset才会被提交
  2. 如果抛出异常、返回RECONSUME_LATER或消费者被kill,消息会重新投递
  3. 如果业务代码正常返回(如防并发设计中catch后return),RocketMQ会认为消息消费成功

Spring-RocketMQ适配层

// org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer
class DefaultMessageListenerConcurrentlyimplements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {try {// 调用用户定义的RocketMQListener.onMessage()rocketMQListener.onMessage(messageExt);// 如果没有抛异常,返回成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {log.warn("consume message failed", e);// 只有抛异常才返回重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}

结论:RocketMQ是基于业务返回值确认消息,不是自动确认!排查到这里,我们可以 排除容器被强制kill + RocketMQ消费机制,导致了消息丢失

Redisson看门狗机制源码分析

在理解了RocketMQ的消费机制后,我们需要继续深入Redisson的看门狗机制,这是问题的另一个关键环节。

Redisson分布式锁完整流程图(基于3.12.0)

flowchart TDA[调用tryLock方法] --> B{leaseTime参数?}B -->|leaseTime != -1| C[指定过期时间模式]B -->|leaseTime == -1| D[看门狗模式]C --> E[执行Lua脚本获取锁<br/>设置指定过期时间]D --> F[执行Lua脚本获取锁<br/>设置默认30秒过期]E --> G{获取锁结果?}F --> GG -->|成功 ttl=null| H{是否看门狗模式?}G -->|失败 ttl>0| I{waitTime > 0?}H -->|是| J[启动看门狗<br/>scheduleExpirationRenewal]H -->|否| K[返回成功,不启动看门狗]J --> L[创建定时任务<br/>internalLockLeaseTime/3<br/>默认10秒后执行]L --> M[TimerTask.run<br/>续期任务执行]M --> N[调用renewExpirationAsync<br/>Lua脚本续期]N --> O{续期成功?}O -->|成功| P[重新调度下次续期<br/>10秒后再次执行]O -->|失败/锁不存在| Q[停止看门狗]P --> MI -->|是| R[订阅锁释放事件<br/>等待重试]I -->|否| S[返回获取锁失败]R --> T[收到锁释放通知或超时]T --> Estyle J fill:#ffeb3bstyle M fill:#ff9800style P fill:#4caf50style Q fill:#f44336

看门狗在K8S Kill场景下的时序

应用进程RedissonLock看门狗TimerRedisK8StryLock(-1)SET锁,30秒过期启动看门狗,10秒续期T=0秒开始处理业务T=10秒续期至30秒成功T=20秒续期至30秒成功T=25秒K8S发送SIGKILL强制Kill进程进程终止finally未执行unlock()未调用看门狗Timer被Kill无法继续续期T=25-55秒锁仍然存在剩余5-35秒过期时间(取决于Kill时机)T=55秒锁自动过期释放危险窗口:25-55秒锁已失效但仍占用新Pod无法获取锁应用进程RedissonLock看门狗TimerRedisK8S

看门狗续期实现源码

// org.redisson.RedissonLock
private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);// 启动续期任务renewExpiration();}
}private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}// 创建定时任务,每10秒执行一次(默认30秒锁过期时间的1/3)Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}// 执行续期:重置过期时间为30秒RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock expiration", e);return;}if (res) {// 续期成功,继续调度下一次续期renewExpiration();}});}},internalLockLeaseTime / 3,  // 默认30秒/3 = 10秒TimeUnit.MILLISECONDS);ee.setTimeout(task);
}protected RFuture<Boolean> renewExpirationAsync(long threadId) {// Lua脚本:重置锁的过期时间return commandExecutor.evalWriteAsync(getName(),LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));
}

看门狗机制关键点

  • 默认锁过期时间:30秒(lockWatchdogTimeout
  • 续期间隔:10秒(过期时间的1/3)
  • 续期策略:每次重置为30秒
  • 停止条件:锁被释放或进程终止

问题根因分析

通过对RocketMQ和Redisson源码的深入分析,我们已经掌握了所有的技术细节。现在可以完整地还原问题发生的完整时序。

完整的问题发生时序图

K8SPod-A(旧版本)Pod-B(新版本)RocketMQRedis业务系统正常业务运行中拉取消息(订单A)返回消息tryLock(订单A, -1分钟)获取锁成功启动看门狗每10秒续期至30秒开始处理业务逻辑...触发滚动更新启动新Pod新Pod就绪发送SIGTERM(优雅停机)但Shell脚本没有转发信号Java进程继续运行30秒后...发送SIGKILL(强制杀进程)进程被强制Kill业务逻辑中断finally块未执行锁未释放看门狗续期Timer也被Kill但锁还剩余10-30秒过期时间未收到消费确认消息重新投递投递消息(订单A)tryLock(订单A, 0秒等待)锁已被占用,获取失败throw MrpBusinessException(NOT_GET_REDIS_LOCK)BaseRocketmqConsumer捕获异常catch异常,log.warn()return (不抛异常)返回CONSUME_SUCCESS提交offset,消息确认成功10-30秒后锁过期释放订单A的业务逻辑永远不会执行消息已被确认"消费成功"K8SPod-A(旧版本)Pod-B(新版本)RocketMQRedis业务系统

三个关键因素

1. K8S强制Kill:锁未释放

# 容器启动脚本
#!/bin/bash
java -jar app.jar  # Java进程不是PID 1# K8S的操作
kill -15 1   # SIGTERM发给Shell进程
# 30秒后
kill -9 42   # SIGKILL强杀Java进程,finally未执行

结果:分布式锁残留在Redis中,剩余10-30秒过期时间

2. Redisson看门狗:锁续期延长影响时间

// 看门狗配置
leaseTime = -1              // 启用看门狗
lockWatchdogTimeout = 30s   // 锁过期时间30秒
renewInterval = 10s         // 每10秒续期一次

最坏情况

  • Kill发生在续期后1秒:锁还有29秒才过期
  • 消息重新投递间隔:通常3-5秒
  • 消息多次重试都无法获取锁

3. 防并发设计的副作用:消息被误确认

// 业务代码
if (lock == null) {// 设计初衷:获取不到锁,说明其他线程正在处理,无需重复处理throw new MrpBusinessException(NOT_GET_REDIS_LOCK);
}// BaseRocketmqConsumer的防并发逻辑
catch (MrpBusinessException e) {if (e.getCode().equals(NOT_GET_REDIS_LOCK)) {log.warn("未获取到锁,认为消息正在被处理");return; // 防止重复消费,直接返回成功}
}

正常场景

  • 线程A:持有锁,正在处理业务
  • 线程B:获取锁失败,return成功(避免重复处理)

异常场景(K8S Kill)

  • 旧Pod:持有锁,被Kill中断
  • 新Pod:获取锁失败,return成功(误以为旧Pod在处理)
  • 结果:锁释放后也没有线程处理了,业务逻辑永远不会执行

解决方案

在理解了问题的根本原因后,我们制定了完整的解决方案。

修复K8S优雅停机(最终采用方案)

1.1 使用exec启动Java进程(最终解决方案)

经过与K8S容器负责同事的协调,我们采用了最简单有效的方案:修改启动脚本,使用exec命令。

#!/bin/bash
# 启动脚本修改前
java ${APM_SET} ${APOLLO_SET} -jar app.jar# 启动脚本修改后(最终方案)
exec java ${APM_SET} ${APOLLO_SET} -jar app.jar

exec的作用

  • exec命令会用新的程序替换当前进程
  • Java进程直接成为1号进程
  • 可以直接接收K8S的SIGTERM信号
# 不使用exec的进程关系
PID 1:  /bin/sh docker-entrypoint.sh└─ PID 42: /bin/sh startup.sh└─ PID 108: java -jar app.jar# 使用exec后的进程关系
PID 1: java -jar app.jar  # Java直接替换了所有shell进程

如果使用容器云控制台,还需要:

# 容器云控制台启动命令
exec /path/to/start.sh# start.sh内容
exec java ${APM_SET} ${APOLLO_SET} -jar app.jar

效果验证

# 在容器内执行
ps aux# 修改前的输出
USER   PID  %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root     1   0.0  0.1  11284  2816 ?        Ss   09:30   0:00 /bin/sh -c startup.sh
root    42   0.1 15.2 2847316 621312 ?       Sl   09:30   0:05 java -jar app.jar# 修改后的输出(Java成为PID 1)
USER   PID  %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root     1   0.1 15.2 2847316 621312 ?       Sl   09:30   0:05 java -jar app.jar

修复后的效果

  • Java进程直接接收SIGTERM信号
  • Spring Boot优雅停机机制生效
  • finally块正常执行,锁正常释放
  • 消息消费完成后才关闭Consumer
  • 问题彻底解决,后续发版未再出现消息丢失

1.2 配合Spring Boot优雅停机(可选增强)

虽然修改脚本已经解决了问题,但配置Spring Boot优雅停机可以让应用更健壮:

# application.yml
server:shutdown: gracefulspring:lifecycle:# 等待处理中的请求完成timeout-per-shutdown-phase: 30s

1.3 添加PreStop Hook(可选增强)

如果需要更保守的优雅停机策略,可以添加PreStop Hook:

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
spec:template:spec:containers:- name: applifecycle:preStop:exec:# 给应用15秒时间完成当前请求command: ["/bin/sh", "-c", "sleep 15"]# 优雅停机总时长:60秒terminationGracePeriodSeconds: 60

验证与测试

解决方案实施后,我们进行了充分的测试验证。

测试场景1:模拟K8S强制Kill

# 1. 启动应用并发送消息
curl -X POST <http://localhost:8080/send-message># 2. 查看进程ID
ps aux | grep java
# root  1234  ... java -jar app.jar# 3. 模拟K8S强制Kill
kill -9 1234# 4. 检查Redis锁状态
redis-cli
> KEYS order:*
> TTL order:123456  # 查看剩余过期时间# 5. 观察消息重新投递后的处理情况

测试结果

  • 修改前:消息被误认为消费成功,业务逻辑未执行
  • 修改后:消息能够正常重试并最终被正确消费

最佳实践总结

基于这次问题的排查和解决经验,总结以下最佳实践。

应该遵循的实践

  1. 容器化应用(最关键)
    • 强制要求:使用exec启动Java进程,让其成为PID 1
    • 上线检查:容器启动后执行ps aux验证Java是否为PID 1
    • 标准模板:团队统一启动脚本模板,避免重复踩坑
    • 配置建议:合理的terminationGracePeriodSeconds(建议60秒)
  2. 分布式锁策略
    • 本案例经验:看门狗模式在异常场景下有风险,建议指定过期时间
    • 性能权衡:评估业务真实耗时,设置合理的锁过期时间
  3. 消息消费设计
    • 防并发设计:需要考虑进程被Kill等异常中断场景
    • 幂等保护:使用消息ID作为幂等键
  4. 问题排查方法论
    • 关注时间规律:100%在特定时间发生说明什么?
    • 深入源码分析:理解底层机制才能找到根因
    • 绘制时序图:梳理各组件交互,找出时序窗口
    • 跨团队协作:容器、中间件、业务多方协同

快速自查清单

容器启动脚本检查

# 错误示例
java -jar app.jar# 正确示例
exec java -jar app.jar

验证方法

# 在容器内执行
ps aux | head -2# 期望看到
USER   PID  COMMAND
root     1  java -jar app.jar  # PID 1 是 Java# 而不是
USER   PID  COMMAND
root     1  /bin/sh startup.sh  # PID 1 是 Shell

深度思考

为什么这个Bug如此隐蔽?

  1. 触发条件苛刻:需要K8S容器重启 + 正在消费消息 + 消息重试时锁未释放
  2. 影响范围小:只影响重启瞬间正在处理的消息
  3. 无明显报错:进程被Kill,日志来不及记录
  4. 时间窗口短:30秒内锁释放后问题消失
  5. 表象迷惑性:看起来像消息丢失,实际是消费逻辑未执行
  6. 设计初衷良好:防并发逻辑在正常场景下工作完美,只在异常场景下失效

防并发设计的隐含假设

原有的防并发设计基于一个隐含假设:

“获取不到锁 = 其他线程正在处理 = 该线程一定会完成处理”

这个假设在正常情况下是成立的:

  • 线程A获取锁,处理业务,释放锁
  • 线程B获取锁失败,认为A在处理,直接返回成功(避免重复消费)

但在K8S强制Kill的场景下:

  • 线程A获取锁,被Kill中断,锁未释放
  • 线程B获取锁失败,认为A在处理,返回成功
  • 实际上A已经被Kill,业务逻辑永远不会完成

这就是典型的"假设在99.9%的场景下成立,但在0.1%的极端场景下失效"的案例。

系统设计的启示

  1. 防御式编程:假设任何外部依赖都可能失败
  2. 优雅降级:分布式锁获取失败应该重试而非放弃
  3. 可观测性:完善的日志、监控、告警体系
  4. 端到端测试:包含基础设施层面的混沌测试

分布式系统的复杂性

这个案例完美诠释了分布式系统的复杂性:

  • 容器编排层(K8S)
  • 进程管理层(Shell/Java)
  • 消息中间件层(RocketMQ)
  • 分布式协调层(Redis/Redisson)
  • 业务逻辑层(应用代码)

任何一层的细微问题,都可能在特定时序下被放大成系统性故障。

总结

问题解决成果

经过与K8S容器团队的协调,我们采用了最简单有效的方案——在启动脚本中添加exec命令。

修复效果

  • 一行代码解决问题:只需在启动脚本前加exec
  • 验证通过:后续发版,未再出现消息丢失
  • 零业务代码改动:无需修改业务逻辑和分布式锁代码
  • 治本之策:从根本上解决了优雅停机问题

深度思考与收获

这次深度剖析让我们认识到:

  1. 源码是最好的文档:理解RocketMQ和Redisson的实现原理,才能正确使用
  2. 细节决定成败:一个exec命令,看似简单,却是问题的关键
  3. 系统性思维:容器化环境下,问题往往跨越多个技术栈(K8S + Shell + JVM + MQ + Redis)
  4. 防御式设计的边界:防并发设计在99.9%的场景下工作完美,但需要考虑极端场景
  5. 团队协作的重要性:与容器云团队的深入沟通是解决问题的关键

记住:在分布式系统中,任何看似"不可能"的问题,在特定的时序条件下,都可能成为现实。而解决方案,往往比想象的更简单——一个exec命令足矣。


参考资料

  • RocketMQ 4.9.1 官方文档
  • Redisson 3.12.0 官方文档
  • Kubernetes优雅停机
  • Spring Boot优雅停机

如果这篇文章对你有帮助,欢迎点赞、收藏和分享!

作者:Mario

创作日期:2025-10-13

http://www.dtcms.com/a/477791.html

相关文章:

  • K8S(七)—— Kubernetes Pod 进阶配置与生命周期管理全解析
  • 主题库 1.15 | 提供风景、动漫、明星、动物等多种类型的高清壁纸,轻松更换手机壁纸
  • 百度云自助建站用微信做网站
  • 免费自助小型网站怎么制作链接视频教程
  • 国外那些网站做展厅比较好做本地房产网站
  • 本地部署开源持续集成和持续部署系统 Woodpecker CI 并实现外部访问
  • 从 FinalShell 迁移到 WindTerm:一次安全、高效、开源的终端升级之旅
  • 从 0 到 1 构建一个完整的 AGUI 前端项目的流程在 ESP32 上运行
  • 【具身智能】RoboTwin 2.0:一个可扩展的、强领域随机化的数据生成器,用于双臂机器人操作
  • 【STM32项目开源】基于STM32的智能家庭安防系统
  • Avalonia+ReactiveUI+Sourcegenerators实现异步命令
  • 个人网站建设心得网站开发的数据库设计实体是什么
  • Java的动态绑定机制(重要)
  • 2Docker自定义网络,compose多容器部署
  • Linux内存管理-malloc虚拟内存到物理映射详细分析
  • 桂林网站建设内容大专自考报名入口官网
  • AMS支持的融资业务如何优化风控流程?
  • 小杰深度学习(thirteen)——视觉-经典神经网络——GoogLeNet
  • jtag转swd
  • 多语言支持应用场景实战解析
  • 手机微网站怎么设计方案陕西建省级执法人才库
  • c# 中文数字转阿拉伯数字
  • 如何自定义 Qt 日志处理并记录日志到文件
  • Spring Boot 3零基础教程,类属性绑定配置文件中的值,笔记10
  • TypeScript 基础类型
  • 鸿蒙NEXT Function Flow Runtime Kit:解锁高效并发编程的利器
  • 一个小项目的记录:PHP 分账组件
  • excel-mcp-server rocky linux简单部署
  • 网站前台模块包括什么软件wordpress js放到oss
  • ENET_INIT卡死在DMA_MODE判断