当前位置: 首页 > news >正文

怎么理解使用MQ解决分布式事务 -- 以kafka为例

利用 Apache Kafka 实现分布式事务的完整指南

本文聚焦 Kafka 原生能力,从「事务语义 → 代码 → 运维 → 故障场景」逐层展开,给出可在生产环境直接落地的全套方案。


一、Kafka 分布式事务的 3 个核心语义

语义实现机制配置/代码标志
幂等性Broker 端去重 + Sequence Numberenable.idempotence=true
事务两阶段提交 + Transaction Coordinatortransactional.id
读已提交消费者过滤未提交事务消息isolation.level=read_committed

二、架构全景图

┌─────────────────────────────────────────────────────────────┐
│  Producer (订单服务)                                         │
│  1. beginTransaction()                                       │
│  2. insert into order_tbl …                                  │
│  3. send("stock-deduct", orderId)                            │
│  4. commitTransaction()   ─┐                                 │
└────────────────────────────┼─────────────────────────────┐   ││ 两阶段提交                   │   │
┌────────────────────────────┼─────────────────────────────┘   │
│  Broker                                                    │   │
│  • Transaction Coordinator (TC)                            │   │
│  • __transaction_state 日志 (3 副本)                       │   │
│  • 写入分区队列                                           │   │
└────────────────────────────┼─────────────────────────────┐   ││ 仅投递 committed 消息        │   │
┌────────────────────────────┼─────────────────────────────┘   │
│  Consumer (库存服务)                                       │
│  5. poll() → read_committed                               │
│  6. update stock_tbl set qty = qty - ? where id = ?        │
│  7. ack()                                                  │
└─────────────────────────────────────────────────────────────┘

三、Producer 端完整配置与代码

1. 通用 Producer 参数

bootstrap.servers=kafka:9092
enable.idempotence=true               # 幂等发送
transactional.id=order-service-tx-1   # 全局唯一
acks=all
max.in.flight.requests.per.connection=5
transaction.timeout.ms=30000          # 小于 broker 的 max.transaction.timeout.ms

2. Spring Boot 双事务(Kafka + JDBC)

@Configuration
public class KafkaChainedTxConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-tx");DefaultKafkaProducerFactory<String, String> pf =new DefaultKafkaProducerFactory<>(props);pf.setTransactionIdPrefix("order-tx-");          // 支持并发事务return pf;}@Beanpublic KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> pf) {return new KafkaTransactionManager<>(pf);}@Bean("chainedTxManager")public ChainedTransactionManager chainedTxManager(KafkaTransactionManager<?, ?> ktm,DataSourceTransactionManager dstm) {return new ChainedTransactionManager(ktm, dstm);}
}

3. Service 层

@Service
public class OrderService {private final OrderRepository repo;private final KafkaTemplate<String, OrderEvent> kafka;@Transactional("chainedTxManager")public void createOrder(CreateOrderCommand cmd) {// 1. 本地事务Order order = repo.save(new Order(cmd));// 2. 发送事务消息OrderEvent event = new OrderEvent(order.getId(), cmd.getSkuId(), cmd.getQty());kafka.send("stock-deduct", order.getId().toString(), event);// 3. 若 DB 回滚,Kafka 事务也回滚;反之亦然}
}

四、Consumer 端:幂等 + 重试 + 死信队列

1. 消费者配置

bootstrap.servers=kafka:9092
group.id=stock-service
isolation.level=read_committed
enable.auto.commit=false
max.poll.records=100

2. 监听器(批量 + 幂等)

@Component
public class StockConsumer {private final StockRepository stockRepo;@KafkaListener(topics = "stock-deduct",containerFactory = "batchFactory")public void listen(List<ConsumerRecord<String, OrderEvent>> records,Acknowledgment ack) {for (var r : records) {try {consumeOne(r.value());} catch (DuplicateKeyException ex) {// 幂等冲突,跳过} catch (DataIntegrityViolationException ex) {// 库存不足,记录告警并手动 ack,不再重试} catch (Exception ex) {// 其他异常:抛出让 SeekToCurrentErrorHandler 重试throw ex;}}ack.acknowledge();}@Transactionalpublic void consumeOne(OrderEvent e) {int affected = stockRepo.deductQty(e.getSkuId(), e.getQty(), e.getOrderId());if (affected == 0) {throw new IllegalStateException("库存扣减失败");}}
}

3. 重试与死信队列(Spring Kafka)

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> batchFactory(ConsumerFactory<String, OrderEvent> cf) {ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(cf);factory.setBatchListener(true);// 最多重试 3 次后发送到 DLQDefaultErrorHandler handler =new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate(), (r, e) -> new TopicPartition("stock-deduct.DLT", r.partition())),new FixedBackOff(1000L, 2));factory.setCommonErrorHandler(handler);return factory;
}

五、事务超时 & 死锁排查

指标触发场景解决
transaction.timeout.ms 超期Broker 未收到 commit/abort调大或优化业务耗时
producer.send 阻塞网络抖动、ISR < min.insync.replicas监控 kafka.server:RequestQueueTimeMs
消费者 lag 持续增大下游消费慢 / 重试风暴扩容消费者、减少 batch size

六、完整监控体系

  1. JMX 指标

    • Producer:record-send-rate, transaction-duration-avg
    • Broker:transaction-coordinator-metricstransactional-id-count
    • Consumer:records-lag-max, commit-latency-avg
  2. Prometheus + Grafana

    - pattern: kafka.producer<type=producer-metrics, client-id=(.+)><>(transaction-duration-avg)name: kafka_producer_transaction_duration_avglabels:client_id: "$1"
    
  3. 告警规则示例

    - alert: KafkaTransactionStuckexpr: kafka_producer_transaction_duration_avg > 20for: 1mannotations:summary: "事务长时间未完成"
    

七、故障演练清单

场景操作预期行为
Broker 重启docker kill kafka-1事务协调器 failover,事务仍可完成
Producer 进程崩溃kill -9事务超时后 Broker 自动 abort
消费者消费异常业务抛异常重试 3 次 → DLQ → 人工处理

八、小结

维度结论
一致性本地事务 + Kafka 事务 API → 原子提交
可用性异步投递,高吞吐,支持水平扩容
复杂度仅需幂等消费与重试策略,2PC 网络阻塞消失
性能实测 TPS 下降 < 10%,远低于数据库 2PC

至此,从配置、代码到监控、故障演练 的 Kafka 分布式事务闭环已完整落地。

http://www.dtcms.com/a/304009.html

相关文章:

  • ABP VNext + GraphQL Federation:跨微服务联合 Schema 分层
  • Java 课程,每天解读一个简单Java之判断101-200之间有多少个素数,并输出所有素数。
  • 如何制定项目计划?核心要点
  • 枚举中间位置高级篇
  • Apache Ignite 的对等类加载(Peer Class Loading, P2P Class Loading)机制
  • Qt windows 全屏弹幕工具
  • 【Golang】Go语言指针
  • 鱼皮项目简易版 RPC 框架开发(六)----最后的绝唱
  • Qt|槽函数耗时操作阻塞主界面问题
  • go标准库log模块学习笔记
  • spring cloud sentinel 动态规则配置
  • css3之三维变换详说
  • Windows系统ffmpeg.dll丢失怎么办?从错误分析到永久修复的完整流程
  • FPGA实现SRIO高速接口与DSP交互,FPGA+DSP异构方案,提供3套工程源码和技术支持
  • 处理订单过期但支付成功的系统设计:平衡用户体验与业务规则
  • 设计模式:中介者模式 Mediator
  • Oracle发布MCP Server,自然语言交互说“人话”
  • Kubernetes高级调度01
  • 设计模式十三:代理模式(Proxy Pattern)
  • pygame 模拟放飞气球
  • hive专题面试总结
  • Python 日期时间格式化与解析的瑞士军刀:`strftime()` 与 `strptime()`
  • 三、Linux用户与权限管理详解
  • Baumer工业相机堡盟工业相机如何通过YoloV8深度学习模型实现各种食物的类型检测识别(C#代码UI界面版)
  • 学习嵌入式的第三十四天-数据结构-(2025.7.29)数据库
  • 小杰数据结构(one day)——心若安,便是晴天;心若乱,便是阴天。
  • 【数据可视化-75】北京密云区2025年7月暴雨深度分析:Python + Pyecharts 炫酷大屏可视化(含完整数据、代码)
  • Prometheus + Grafana + Micrometer 监控方案详解
  • Java:为什么需要通配符捕获(wildcard capture)
  • HbuilderX开发小程序