【Note】《Kafka: The Definitive Guide》第11章:Stream Processing
《Kafka: The Definitive Guide》第11章:Stream Processing
一、引言:为什么要做流式处理?
在当今大数据时代,静态批处理往往无法满足实时分析、监控预警和用户实时个性化需求。流式处理(Stream Processing)技术应运而生,其核心理念是:对不断到达的事件数据,进行低延迟、连续的处理与分析,从而实现近实时的业务响应。
Kafka 作为分布式、高吞吐、可持久化的消息系统,自身天然就适合做流式数据的“传输与存储”层。第十一章正是围绕如何利用 Kafka 构建完备的流式处理应用,重点介绍了 Kafka Streams 与 ksqlDB 两大主流方案,并深入讨论了流–表模型、状态管理、窗口操作、连接与聚合等关键机制。
二、Kafka Streams:库级别的流处理框架
2.1 核心设计
-
嵌入式库
Kafka Streams 并非独立的集群,而是以 Java 库形式嵌入到应用程序中。开发者只需依赖org.apache.kafka:kafka-streams
,即可在 JVM 进程内启动流处理任务,无需额外部署集群。 -
“Streams” 与 “Tables”
- KStream:“无界”的记录流,对应 Kafka Topic 中的原始事件序列。
- KTable:“有界”的、可更新的键值表,对应 compacted Topic,用于存储状态。
流–表(stream–table)模型是 Kafka Streams 的核心抽象,允许将无界事件流映射为可更新的表格视图,支持双向转换(stream-to-table、table-to-stream),以便灵活处理各种场景。
2.2 编程 API 示例
下面演示一个简单的 WordCount 示例——统计输入 Topic 中单词出现次数,并将结果输出到另一个 Topic:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();
// 1. 构建 KStream
KStream<String, String> textLines = builder.stream("input-topic");// 2. 拆分单词并计数
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))).groupBy((key, word) -> word).count(Materialized.as("counts-store"));// 3. 将结果写入输出 Topic
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
- flatMapValues:对每条记录的 value 进行拆分,生成多条:
- groupBy + count:按单词分组并计数,结果映射为 KTable;
- Materialized State Store:内置 RocksDB 存储状态,并可通过 Interactive Queries 对外查询。
2.3 状态管理与容错
-
State Store
Kafka Streams 将聚合、连接等状态操作结果存储在本地 RocksDB,并定期将状态更改写入到一个内部 changelog Topic。重启时可从该 Topic 恢复状态。 -
Exactly-Once 语义
通过 Kafka 事务(transactional producer),可实现端到端的 Exactly-Once 处理语义,既保证输入消费不丢失也保证输出生产只执行一次,适用于金融、计费等对准确性要求极高的场景。
三、ksqlDB:SQL 友好的流处理
对于不熟悉 Java 编程、但对 SQL 非常熟悉的场景,ksqlDB 提供了以 SQL 语句进行流式处理的能力。
-- 注册输入流
CREATE STREAM pageviews (userid VARCHAR,pageid VARCHAR,viewtime BIGINT
) WITH (KAFKA_TOPIC='pageviews',VALUE_FORMAT='JSON'
);-- 统计每分钟页面浏览量
CREATE TABLE pv_counts ASSELECTuserid,TUMBLINGWINDOW(SECONDS 60) AS window,COUNT(*) AS viewsFROM pageviewsGROUP BY userid;
- 流表一体:在 SQL 语法中,STREAM 表示无界流,TABLE 表示表格,两者之间可通过 SELECT 语句互转。
- 可视化与交互:ksqlDB Server 提供 REST 接口,可与 Confluent Control Center 等图形化平台集成,实时监控查询执行状况。
四、关键概念与实践要点
主题 | 要点 |
---|---|
窗口(Window) | 支持 Tumbling、Hopping、Session 三种窗口类型,用于将无界流分割为有界时间段,以便执行聚合操作。 |
流–表转换 | stream.toTable() 、table.toStream() ;在流处理管道中灵活切换视图。 |
连接(Join) | KStream–KStream、KStream–KTable 与 KTable–KTable 三种类型,支持内连接与外连接。 |
分区与重新分区 | groupByKey() 与 selectKey() 会触发分区重分区(repartition),需要关注磨合吞吐与网络开销。 |
容错与 SLA | 设置 processing.guarantee 为 at_least_once 或 exactly_once_v2 ,并合理调优 commit.interval.ms 。 |
性能调优 | 调整状态存储配置(RocksDB)、批量消费配置 max.poll.records 、缓存大小 cache.max.bytes.buffering 。 |
五、应用场景
-
实时监控与告警
利用窗口聚合统计指标(如每分钟失败率、延迟分布),结合 Kafka Streams 的 Exactly-Once 语义,可构建可靠的实时监控系统。 -
在线特征计算
针对实时推荐或风控场景,在用户行为流上做 stateful 做特征提取,输出到 Kafka 供下游服务实时消费。 -
跨系统数据同步
使用 ksqlDB 简单配置,将一个系统的事件流实时转化并写入目标系统(如 Elasticsearch、数据库),无需写大量样板代码。 -
微服务事件驱动
将微服务间通信解耦为 Kafka Topic,通过 Kafka Streams 实现事件过滤、路由与聚合,提升系统松耦合性与可扩展性。
六、小结
Kafka 生态下两条主流流式处理路径:
- Kafka Streams:面向 JVM 开发者,提供灵活的 Java API、可嵌入的运行时和强大的状态管理;
- ksqlDB:面向 SQL 用户,降低学习门槛,快速构建流处理管道。