当前位置: 首页 > news >正文

Kafka:专注高吞吐与实时流处理的分布式消息队列

目录

1、介绍

2、核心概念

3、核心功能及高级特性

3.1 高吞吐与低延迟

3.2 持久化存储与可靠性

3.3 水平扩展与高可用性

3.4 消息顺序性及防重复消费

3.5 连接生态(Kafka Connect)

3.6 流处理(Kafka Streams)

4、限制与挑战

5、优缺点

6、典型应用场景

7、与其他主流消息中间件对比

8、使用代码示例(Java)


1、介绍

Apache Kafka 本质上是一个分布式的、高吞吐量、高可用的发布-订阅消息系统。它可以处理实时数据流,并允许这些数据流被持久化到磁盘,同时提供横向扩展能力。

作用:

  • 消息系统/消息队列(Message Queue):解耦生产者和消费者,缓冲数据流,避免系统被冲垮。与 RabbitMQ 等传统消息队列相比,Kafka 提供了更高的吞吐量和数据持久化能力。

  • 流式处理平台(Streaming Platform):Kafka 不仅用于传输数据,其内置的 Kafka Streams 库和 ksqlDB 允许开发者直接以应用程序的方式对数据流进行实时转换、聚合和处理。

  • 存储系统(Storage System):Kafka 将数据持久化到磁盘,并提供了可配置的保留策略(基于时间或大小)。因此,它可以被视为一个分布式的、高可靠的提交日志(Commit Log)存储系统。

2、核心概念

  • 消息与流(Stream):Kafka 处理的基本单位是消息(或记录)。一个消息由键、值和时间戳组成。一个特定主题的持续不断流入的消息序列,就构成了一个

  • 主题(Topic):消息流的逻辑分类或名称。生产者将消息发布到特定的主题,消费者从特定的主题订阅并拉取消息。例如,可以有 user-clickorder-payment 等主题。

  • 分区(Partition):为了实现横向扩展和高吞吐,每个 Topic 可以被分成一个或多个 Partition。Partition 是 Kafka 并行处理的基本单位。

    • 每个 Partition 是一个有序的、不可变的消息序列。

    • 消息在追加到 Partition 时会被分配一个唯一的、递增的偏移量(Offset)。

    • Partition 允许 Topic 的数据分布在不同的服务器(Broker)上。

    • 每个Partition在物理上对应一个文件夹。

  • Offset: 消息在Partition中的唯一标识,是一个单调递增的long型数字。消费者通过管理Offset来追踪自己的消费进度

  • 生产者(Producer):向 Kafka Topic 发布消息的客户端应用程序。

  • 消费者(Consumer):从 Kafka Topic 订阅并拉取消息进行处理的客户端应用程序。

  • 消费者组(Consumer Group):由多个 Consumer 实例组成的逻辑组,共同消费一个或多个 Topic。

    • 核心机制:一个 Partition 在同一时间只能被同一个 Consumer Group 内的一个 Consumer 消费。

    • 这种设计实现了两种模式:

      • 组内竞争: 同一个Consumer Group内的消费者,共同消费一个Topic的消息,每条消息只会被组内的一个消费者消费。这实现了队列模式(负载均衡)。

      • 组间广播: 不同Consumer Group之间是独立的,它们都能收到Topic的全量消息。这实现了发布-订阅模式

  • Broker:一个独立的 Kafka 服务器节点。多个 Broker 组成一个 Kafka 集群

  • 副本(Replication):每个 Partition 有多个副本,分布在不同的 Broker 上,用于提供数据冗余和高可用性。

    • 其中一个副本被指定为 Leader,负责处理所有读写请求。

    • 其他副本称为 Follower,它们从 Leader 同步数据。如果 Leader 失效,系统会从 Follower 中自动选举出新的 Leader。

  • ZooKeeper

    • 在 Kafka 早期版本中,ZooKeeper 是 Kafka 的“大脑”,负责管理集群元数据、Broker 注册、Leader 选举、消费者组偏移量等。

    • 在最新版本(自 2.8.0 起,并在 3.0+ 中稳定)中,Kafka 正在移除对 ZooKeeper 的依赖,引入了 KRaft 协议,使用内置的共识机制来管理元数据,简化了部署和运维。

3、核心功能及高级特性

3.1 高吞吐与低延迟

  • 顺序磁盘 I/O:Kafka 采用追加写入的方式将消息顺序写入磁盘,这种操作速度极快,甚至优于内存的随机读写。

  • 零拷贝技术:通过 sendfile 系统调用,数据直接从磁盘缓冲区发送到网络通道,减少了内核态与用户态之间的数据拷贝,显著降低了 CPU 消耗和上下文切换。

  • 页缓存:充分利用操作系统的页缓存,将磁盘文件缓存在内存中,极大加速了读写访问,减少实际磁盘I/O。

  • 批处理和压缩: 生产者和Broker都支持批处理和消息压缩(Snappy, GZIP, LZ4, Zstandard)。减少了网络请求次数,提高了效率。

即使处理海量数据,也能保持毫秒级的延迟。这是 Kafka 最引以为傲的特性。

3.2 持久化存储与可靠性

  • 所有消息都会持久化到磁盘,并可以通过配置策略(如保留7天或1TB数据)进行管理,允许消费者重放历史数据。

  • 通过 ACK 应答机制保证消息的可靠传递。生产者可以配置不同的 acks 级别:

    • acks=0:无需 Broker 确认,速度最快,但可能丢失消息。

    • acks=1:仅需主副本确认,是吞吐和安全性的平衡点。

    • acks=all:需要所有同步副本(ISR)确认,提供最强的持久性保证。

  • 保证消息不丢失:

    • 生产者端: 设置acks=all,并正确处理发送失败(重试+回调检查)。

    • Broker端: 设置replication.factor >= 3min.insync.replicas >= 2,确保消息被足够多的副本确认。

    • 消费者端: 关闭自动提交,在业务处理成功后手动提交Offset。

3.3 水平扩展与高可用性

  • 通过增加 Broker,可以轻松扩展集群能力。

  • 主题的多个分区可以分布在不同的 Broker 上,实现负载分散。

  • 分区的多副本机制确保了即使部分 Broker 宕机,服务依然可用,数据不会丢失。

3.4 消息顺序性及防重复消费

保证消息顺序:

  • 全局顺序: 设置Topic只有1个Partition(不推荐,牺牲扩展性)。

  • 分区顺序: 通过指定消息Key,将需要保证顺序的消息发送到同一个Partition。注意:如果发生重试,由于Producer的retries机制,可能导致前一条消息发送失败后一条消息先成功,从而乱序。需要开启max.in.flight.requests.per.connection=1来保证。

解决重复消费:

  • 根本原因: 网络问题等导致消费者处理了消息但没成功提交Offset,下次拉取会再次拿到同一条消息。

  • 解决方案: 幂等性设计。让消息处理逻辑可以安全地重复执行。例如:

    • 在数据库插入前先做select查询。

    • 利用数据库的唯一键约束。

    • 使用Redis等中间件记录已处理的消息ID。

3.5 连接生态(Kafka Connect)

提供了丰富的连接器(Connector)生态系统,可以轻松地与各种外部系统(如数据库、Hadoop、ES等)进行数据集成,实现数据的流入和流出。无需编写代码。

3.6 流处理(Kafka Streams)

一个轻量级的客户端库,允许开发者构建实时的流处理应用程序,直接在 Kafka 上进行复杂的计算。可以直接在Java应用中编写流处理逻辑(如聚合、连接、窗口操作)。

4、限制与挑战

  • 运维复杂性:Kafka 集群的部署、监控、调优和扩容需要专业的知识和经验,比单机消息队列复杂。

  • “智慧”Broker与“傻瓜”Consumer:Kafka 的设计理念是 Broker 只做简单的事(存储和推送),将复杂的处理逻辑(如消息确认、消费状态跟踪)交给 Consumer。这增加了客户端的复杂性。

  • 消息延迟并非绝对最低:虽然延迟很低,但对于某些要求微秒级延迟的场景,Kafka 可能不如某些专门的内存消息队列(如 Aeron)。

  • 功能相对单一:相比于 RabbitMQ,Kafka 在消息路由(如复杂路由键)、优先级队列、死信队列等方面的功能较弱。它更专注于高吞吐的流数据管道。

  • 资源消耗:为了达到高性能,Kafka 会充分利用磁盘和内存,对硬件资源有一定要求。

5、优缺点

方面优点缺点
性能极高吞吐量,低延迟,能处理百万级消息/秒。在极端低延迟(微秒级)场景下非最佳选择。
可靠性数据持久化,多副本,高可用,数据不易丢失。-
扩展性水平扩展能力极强,通过增加节点轻松扩容。分区数过多可能影响性能和管理。
生态生态系统极其丰富(Kafka Connect, Streams, ksqlDB)。学习曲线相对陡峭。
功能提供精确一次处理,支持重放历史数据缺乏高级消息路由功能(如复杂路由规则)。
运维社区活跃,成熟稳定。运维复杂,需要专业团队管理集群。
成本开源,软件免费。硬件和人力运维成本较高。

6、典型应用场景

  • 活动流数据处理(Activity Tracking)

    • 描述:收集网站或App的用户行为数据(如点击、浏览、搜索、分享),并将其实时发布到 Kafka Topic 中,供下游系统(如实时分析、推荐系统、欺诈检测)消费。

    • 例子:Netflix 用 Kafka 实时处理用户的观看事件,以提供个性化推荐。

  • 日志聚合(Log Aggregation)

    • 描述:从各个服务器上收集应用日志,统一写入 Kafka,然后再导入到中央存储系统(如 Elasticsearch、HDFS、S3)中,用于搜索、分析和长期存储。

    • 例子:LinkedIn 使用 Kafka 作为其所有系统日志的统一入口。

  • 消息系统/系统解耦(Messaging/Decoupling)

    • 描述:作为微服务架构中的通信总线,解耦服务之间的直接依赖。服务 A 将事件发布到 Kafka,服务 B 订阅该事件,两者无需知道彼此的存在。

    • 例子:电商系统中,订单服务创建订单后,只需向 order-created Topic 发送一个消息,库存服务、积分服务、通知服务等会各自消费这个消息并执行相应逻辑。

  • 流式处理(Stream Processing)

    • 描述:使用 Kafka Streams 或 Flink/Spark Streaming 对 Kafka 中的实时数据流进行连续处理。

    • 例子:监控告警(实时计算系统指标,超过阈值则告警)、实时反欺诈(分析交易流,识别可疑模式)、实时数据 enrich(将点击流数据与用户画像数据实时关联)。

  • 事件溯源(Event Sourcing)

    • 描述:将应用的状态变化记录为一系列不可变的事件(Event),并存储在 Kafka 中。应用的状态可以通过重放这些事件来重建。Kafka 的持久化和保留策略非常适合此场景。

  • 变更数据捕获(CDC, Change Data Capture)

    • 描述:通过 Debezium 等工具,捕获数据库的 binlog 变更,并将其作为事件流发布到 Kafka。这使得数据库的任何变更都能实时同步到搜索索引、数据仓库或其他微服务中。

7、与其他主流消息中间件对比

维度KafkaRabbitMQRocketMQActiveMQ
核心定位分布式事件流平台传统消息代理 (Message Broker)金融级分布式消息中间件经典JMS实现,标准消息服务
吞吐量极高 (百万级 QPS)一般 (万级至十万级 QPS) (十万级 QPS)较低 (万级 QPS)
典型功能持久化、消息回溯、事务消息优先级队列延迟队列死信队列、事务消息延迟消息、死信队列、消息回溯、事务消息持久化、事务、死信队列
消息顺序单分区内有序队列内有序(条件严苛)严格全局有序(通过单一队列)队列内有序
消费模式客户端主动拉取 (Pull)支持推送 (Push)和拉取 (Pull)支持推送和拉取支持推送和拉取
优势高吞吐、高可用、生态丰富、支持流处理路由灵活、可靠性高、协议支持多、延迟极低金融级高可靠、低延迟、事务消息符合JMS标准、部署简单
劣势功能相对单一、运维复杂吞吐量较低、扩展性一般、Erlang语言二次开发难社区活跃度一般性能较低、社区维护减弱

8、使用代码示例(Java)

添加依赖:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.5.1</version> <!-- 建议使用较新版本 -->
</dependency>

生产者 (Producer) 代码示例:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class SimpleKafkaProducer {public static void main(String[] args) {// 1. 配置生产者参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka集群地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 键序列化方式props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 值序列化方式props.put(ProducerConfig.CLIENT_ID_CONFIG, "my-producer-client"); // 客户端ID,便于追踪// 2. 创建生产者实例KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 3. 构建消息。指定Topic和消息内容ProducerRecord<String, String> record = new ProducerRecord<>("your-topic-name", "Hello, Kafka!");try {// 4. 发送消息(异步)producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("消息发送成功! Topic: " + metadata.topic() + ", Partition: " + metadata.partition() + ", Offset: " + metadata.offset());} else {System.err.println("消息发送失败:");exception.printStackTrace();}}});} catch (Exception e) {e.printStackTrace();} finally {// 5. 关闭生产者,释放资源producer.close();}}
}

生产者关键配置项说明:

配置项说明示例值
bootstrap.serversKafka集群地址,建议提供多个"host1:9092,host2:9092"
key.serializer消息键的序列化器StringSerializer.class
value.serializer消息值的序列化器StringSerializer.class
acks消息确认机制,控制可靠性all (最强)
retries发送失败后的重试次数3
batch.size批量发送的数据大小16384

发送模式选择:

  • 异步发送 (默认):使用 producer.send(record, callback),通过 Callback 处理结果,不阻塞主线程。

  • 同步发送:通过 producer.send(record).get() 实现,会阻塞直到收到Broker确认。

消费者 (Consumer) 代码示例:

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class SimpleKafkaConsumer {public static void main(String[] args) {// 1. 配置消费者参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka集群地址props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 键反序列化方式props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 值反序列化方式props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组ID,必填props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 无偏移量时从何处开始props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交偏移量// 2. 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 订阅主题。可以订阅单个或多个主题consumer.subscribe(Collections.singletonList("your-topic-name"));// 如需订阅多个主题,可以使用 Arrays.asList("topic1", "topic2")try {while (true) {// 4. 轮询获取消息 (长轮询),设置超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) {// 5. 处理消息System.out.printf("收到消息: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());// 模拟业务处理processMessage(record.value());}// 6. 成功处理批消息后,手动提交偏移量 (至少一次语义)consumer.commitAsync(); }} catch (Exception e) {e.printStackTrace();} finally {try {consumer.commitSync(); // 最后尝试同步提交} finally {consumer.close(); // 关闭消费者}}}private static void processMessage(String message) {// 模拟消息处理耗时try {Thread.sleep(100); // PROCESSING_DELAY_MS} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

消费者关键配置说明:

配置项说明示例值
bootstrap.serversKafka集群地址"localhost:9092"
group.id消费者组ID,必填"my-group"
key.deserializer消息键的反序列化器StringDeserializer.class
value.deserializer消息值的反序列化器StringDeserializer.class
auto.offset.reset无偏移量时从何处读earliest
enable.auto.commit是否自动提交偏移量false (建议手动)

偏移量 (Offset) 提交:

  • 自动提交:设置 enable.auto.commit=true,简单但可能导致重复或丢失消息。

  • 手动提交 (推荐):设置 enable.auto.commit=false,在处理后手动调用 consumer.commitSync() 或 commitAsync(),能更好控制语义。

消费策略:

  • earliest: 从最早的消息开始消费。

  • latest: 从最新的消息开始消费(默认)。

长轮询: poll方法采用长轮询,如果没有消息,会等待指定的超时时间。

http://www.dtcms.com/a/564596.html

相关文章:

  • 【基于one-loop-per-thread的高并发服务器】--- 项目介绍模块划分
  • 玩转Rust高级应用 如何于 `match` 分支模式之后的额外 `if` 条件,指定匹配守卫提供的额外条件
  • 太原理工大学头歌作业--2025数据结构实验一:顺序表
  • GNSS 高精度定位一体机的测试
  • Rust编程学习 - 如何学习有关函数和闭包的高级特性,这包括函数指针以及返回闭包
  • 学校建设网站前的市场分析网站可以不进行icp备案吗
  • MATLAB电力系统等值电路建模工具
  • C语言内功强化之函数
  • GAOXian_CAD_KURUICHENG
  • 【MRTK3踩坑记录】Unity 2022 中 MRTK3 Input Simulator 无法使用 WASD 控制相机的完整排查记录
  • 高校网站建设的意义流量网站建设教程
  • 布局具身智能赛道,深圳作为科技完成近亿元融资
  • 无zookeeper Kafka 4.1.0 Raft 集群搭建
  • 十五五规划产业布局正式落地,美尔斯通加速深耕量子科技
  • 解决glibc版本低VSCode无法远程问题
  • 线上编程哪家比较好阳西网站seo
  • 数据分析-数据沙箱
  • 【JUnit实战3_26】第十五章:表现层测试(下)—— Selenium 在网页测试中的用法
  • 浏览器——CSDN网站的页面就是打不开,显示无法访问的解决办法
  • 110、23种设计模式之状态模式(19/23)
  • 做一手楼盘的网站嵌入式工程师能干多久
  • Spring Boot 应用 Docker 监控:Prometheus + Grafana 全方位监控
  • git clone失败
  • Linux 命令与运维终极手册(2025 完整版)
  • 05-异常处理-导读
  • Pandas-之 数据聚合与分组
  • Rust之基础入门项目实战:构建一个简单的猜谜游戏
  • 数据结构之二叉树-初见介绍
  • 【Java 开发日记】finally 释放的是什么资源?
  • VsCode中终端无法运行前端命令