当前位置: 首页 > news >正文

【MQ】kafka同步和异步的区别

Kafka的生产者(Producer)在发送消息时,可以选择同步(Sync)或异步(Async)的方式。这两种方式在发送消息的机制和性能表现上有显著区别。

同步发送(Sync)

  • 工作机制:生产者发送一条消息后,会等待Kafka服务器的响应(ACK),收到响应后才会发送下一条消息。这个过程是阻塞的。
  • 优点
  • 确保消息被成功写入Kafka,可靠性高。
  • 发送失败可以立即感知,便于重试或处理异常。
  • 缺点
  • 发送效率低,每次发送都需要等待响应,延迟高。
  • 吞吐量受网络延迟和服务器响应时间影响。
  • 适用场景:对消息可靠性要求高,允许一定延迟的场景,如金融交易。

异步发送(Async)

  • 工作机制:生产者发送消息后,不需要等待Kafka服务器的响应,而是继续发送下一条消息。消息会被缓存到内存的缓冲区(RecordAccumulator),然后由后台线程(Sender线程)批量发送到Kafka服务器。服务器响应由回调函数(Callback)处理。
  • 优点
  • 发送效率高,无需等待响应,吞吐量大。
  • 可批量发送,减少网络请求次数,提升性能。
  • 缺点
  • 存在消息丢失的风险:如果生产者宕机,内存中尚未发送的消息会丢失。
  • 错误处理复杂:需要通过回调函数处理发送失败的情况,不能立即感知错误。
  • 适用场景:高吞吐量、低延迟的场景,且允许少量消息丢失(如日志收集)。

对比总结

特性

同步发送

异步发送

发送方式

阻塞,等待响应

非阻塞,不等待响应

吞吐量

延迟

可靠性

高(立即感知失败)

较低(可能丢失未发送的消息)

错误处理

立即处理异常

通过回调函数异步处理

适用场景

要求高可靠性的场景

要求高吞吐量的场景

代码示例

同步发送
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
try {RecordMetadata metadata = producer.send(record).get(); // 阻塞等待响应System.out.println("Sent to partition " + metadata.partition());
} catch (Exception e) {e.printStackTrace();
}
异步发送(带回调)
producer.send(record, (metadata, exception) -> {if (exception != null) {exception.printStackTrace();} else {System.out.println("Sent to partition " + metadata.partition());}
});

配置参数

  • ​acks​​:控制消息的可靠性(0:不等待ACK;1:Leader写入即响应;all/-1:所有ISR副本写入才响应)。
  • ​retries​​:发送失败后的重试次数(同步和异步均可配置)。
  • ​batch.size​​:异步发送时,批量发送的批次大小(字节数)。
  • ​linger.ms​​:异步发送时,批次等待时间(毫秒),超时后即使批次未满也发送。

如何选择?

  • 需要高可靠性(如订单交易)→ 同步发送(或异步发送+回调处理+高可靠性配置)。
  • 需要高吞吐量(如日志采集)→ 异步发送(配合批量发送参数优化)。

注意

即使使用异步发送,也可以通过设置合适的重试机制(​​retries​​)和ACK机制(​​acks=all​​)来提高可靠性,同时使用回调函数处理异常。但异步发送仍无法完全避免生产者进程崩溃导致的内存中未发送消息丢失。

# Kafka 同步与异步的区别详解

在 Kafka 中,同步(Sync)异步(Async) 主要指的是生产者发送消息的模式,它们在可靠性、性能和使用场景上有显著区别。

核心区别对比

特性

同步发送

异步发送

工作机制

阻塞发送,等待服务器确认

非阻塞发送,不等待确认

可靠性

⭐⭐⭐⭐⭐ (最高)

⭐⭐⭐ (可能丢失未确认消息)

吞吐量

⭐⭐ (较低)

⭐⭐⭐⭐⭐ (最高)

延迟

高 (每次发送都需要等待响应)

低 (无需等待响应)

错误处理

立即感知错误

通过回调函数异步处理错误

内存使用

较低

较高 (需要消息缓冲区)

适用场景

金融交易、关键操作日志

日志收集、指标监控、大数据管道

同步发送 (Sync)

工作原理

生产者发送消息后阻塞当前线程,直到收到 Kafka 服务器的确认响应(ACK),然后才发送下一条消息。

// Java 同步发送示例
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
try {// 阻塞等待服务器响应RecordMetadata metadata = producer.send(record).get();System.out.println("消息发送到分区: " + metadata.partition());
} catch (Exception e) {// 立即处理异常e.printStackTrace();
}

特点

  • ✅ 强一致性保证:每条消息都确认成功写入
  • ✅ 即时错误处理:发送失败立即抛出异常
  • ⚠️ 性能瓶颈:吞吐量受网络延迟影响
  • ⚠️ 高延迟:需要等待服务器响应

异步发送 (Async)

工作原理

生产者发送消息后不等待响应,立即返回继续处理下一条消息。服务器响应通过回调函数处理。

// Java 异步发送示例
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");producer.send(record, (metadata, exception) -> {// 回调函数处理结果if (exception != null) {System.err.println("发送失败: " + exception.getMessage());} else {System.out.println("消息已确认: " + metadata.offset());}
});// 程序继续执行,不阻塞

特点

  • 🚀 高吞吐量:可批量发送消息(利用​​batch.size​​和​​linger.ms​​配置)
  • ⚡ 低延迟:发送操作不阻塞主线程
  • ⚠️ 潜在消息丢失:生产者崩溃时缓冲区消息会丢失
  • 🔄 回调处理:通过Callback接口处理成功/失败

关键配置参数

影响可靠性的通用参数

参数

说明

​acks​

0: 不等待确认

1: 等待Leader确认

all: 等待所有副本确认(最安全)

​retries​

失败后重试次数(建议配合​​max.in.flight.requests.per.connection=1​​使用)

​enable.idempotence​

启用精确一次语义(exactly-once)防止重复消息

异步发送优化参数

参数

说明

​batch.size​

批量发送的大小(字节),默认16KB

​linger.ms​

消息在缓冲区等待时间(毫秒),默认0

​buffer.memory​

生产者缓冲区总大小,默认32MB

使用场景建议

适合同步发送的场景

  1. 金融交易:每笔交易必须保证送达
  2. 关键操作日志:如用户注册、支付等关键事件
  3. 低吞吐场景:每秒消息量<1000

适合异步发送的场景

  1. 日志收集:如应用日志、访问日志
  2. 指标监控:服务器指标、性能数据
  3. 大数据管道:ETL处理、流式计算
  4. 高吞吐场景:每秒消息量>10,000

混合模式(推荐实践)

实际生产中常采用带回调的异步发送+适当重试的混合模式,平衡性能和可靠性:

// 推荐:带回调的异步发送 + 重试配置
Properties props = new Properties();
props.put("acks", "all"); // 最高可靠性
props.put("retries", 5); // 失败重试
props.put("max.in.flight.requests.per.connection", 1); // 防止乱序KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.send(record, (metadata, ex) -> {if (ex != null) {// 记录失败消息,可加入重试队列log.error("消息发送失败", ex);} else {// 成功处理log.debug("消息确认: {}", metadata.offset());}
});

性能对比数据

在典型场景下(Kafka集群3节点,消息大小1KB):

发送模式

吞吐量(msg/s)

P99延迟(ms)

CPU使用率

同步(ack=all)

8,000

25-50

35%

异步(无回调)

85,000

2-5

70%

异步(带回调)

65,000

5-10

60%

注:实际性能受网络、硬件和配置影响

总结建议

  1. 优先选择异步发送:大多数场景下异步模式更优
  2. 关键数据加同步:对可靠性要求极高的消息使用同步
  3. 合理配置参数
  • 设置​​acks=all​​确保持久性
  • 启用​​idempotence​​防止重复
  • 调整​​batch.size​​和​​linger.ms​​优化吞吐
  1. 完善错误处理
  • 异步发送必须实现回调函数
  • 记录失败消息并实现重试机制
  1. 监控与告警:监控生产者指标(如buffer-full、retries)

通过合理选择发送模式和配置参数,可以在可靠性和性能之间取得最佳平衡,满足不同业务场景的需求。

http://www.dtcms.com/a/311815.html

相关文章:

  • Windows中使用Qwen模型:VSCode+Cline
  • 64GB U盘实际显示容量为57.2GB的原因解析
  • innoDB的buffer pool
  • Wasatch SoftRIP数码打印 印花软件
  • 谷歌开源Agent框架ADK快速入门
  • 深入理解 Go 语言中 Map 的底层原理
  • Python爬虫实战:研究SimpleCV技术,构建图像获取及处理系统
  • Apache Doris数据库——大数据技术
  • 【LeetCode刷题指南】--二叉树的前序遍历,二叉树的中序遍历
  • MCP Agent 工程框架Dify初探
  • pytorch简单理解
  • 我的世界之战争星球 暮色苍茫篇 第二十六章、身世
  • 分布在内侧内嗅皮层的层Ⅱ或层Ⅲ的头部方向细胞(head direction cells)对NLP中的深层语义分析的积极影响和启示
  • JVM中年轻代、老年代、永久代(或元空间)、Eden区和Survivor区概念介绍
  • Mysql insert 语句
  • 入门MicroPython+ESP32:开启科技新旅程
  • 机试备考笔记 2/31
  • FastAPI--一个快速的 Python Web
  • C++ 自定义简单的异步日志类
  • oect刷入arm系统安装docker
  • Python深度学习:从入门到精通
  • retro-go 1.45 编译及显示中文
  • 联合索引全解析:一棵树,撑起查询的半边天
  • 【01】OpenCV C#——C#开发环境OpenCvSharp 环境配置 工程搭建 及代码测试
  • 【QT】Qt信号与槽机制详解信号和槽的本质自定义信号和槽带参数的信号和槽
  • 计算机网络:为什么IPv6没有选择使用点分十进制
  • 数据结构初学习、单向链表
  • Python 字典为什么查询高效
  • 数据结构---概念、数据与数据之间的关系(逻辑结构、物理结构)、基本功能、数据结构内容、单向链表(该奶奶、对象、应用)
  • SpringBoot3.x入门到精通系列:2.2 依赖注入与IoC容器