当前位置: 首页 > news >正文

【Note】《Kafka: The Definitive Guide》第11章:Stream Processing

《Kafka: The Definitive Guide》第11章:Stream Processing

一、引言:为什么要做流式处理?

在当今大数据时代,静态批处理往往无法满足实时分析、监控预警和用户实时个性化需求。流式处理(Stream Processing)技术应运而生,其核心理念是:对不断到达的事件数据,进行低延迟、连续的处理与分析,从而实现近实时的业务响应。

Kafka 作为分布式、高吞吐、可持久化的消息系统,自身天然就适合做流式数据的“传输与存储”层。第十一章正是围绕如何利用 Kafka 构建完备的流式处理应用,重点介绍了 Kafka Streams 与 ksqlDB 两大主流方案,并深入讨论了流–表模型、状态管理、窗口操作、连接与聚合等关键机制。


二、Kafka Streams:库级别的流处理框架

2.1 核心设计

  • 嵌入式库
    Kafka Streams 并非独立的集群,而是以 Java 库形式嵌入到应用程序中。开发者只需依赖 org.apache.kafka:kafka-streams,即可在 JVM 进程内启动流处理任务,无需额外部署集群。

  • “Streams” 与 “Tables”

    • KStream:“无界”的记录流,对应 Kafka Topic 中的原始事件序列。
    • KTable:“有界”的、可更新的键值表,对应 compacted Topic,用于存储状态。

    流–表(stream–table)模型是 Kafka Streams 的核心抽象,允许将无界事件流映射为可更新的表格视图,支持双向转换(stream-to-table、table-to-stream),以便灵活处理各种场景。

2.2 编程 API 示例

下面演示一个简单的 WordCount 示例——统计输入 Topic 中单词出现次数,并将结果输出到另一个 Topic:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();
// 1. 构建 KStream
KStream<String, String> textLines = builder.stream("input-topic");// 2. 拆分单词并计数
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))).groupBy((key, word) -> word).count(Materialized.as("counts-store"));// 3. 将结果写入输出 Topic
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
  • flatMapValues:对每条记录的 value 进行拆分,生成多条:
  • groupBy + count:按单词分组并计数,结果映射为 KTable;
  • Materialized State Store:内置 RocksDB 存储状态,并可通过 Interactive Queries 对外查询。

2.3 状态管理与容错

  • State Store
    Kafka Streams 将聚合、连接等状态操作结果存储在本地 RocksDB,并定期将状态更改写入到一个内部 changelog Topic。重启时可从该 Topic 恢复状态。

  • Exactly-Once 语义
    通过 Kafka 事务(transactional producer),可实现端到端的 Exactly-Once 处理语义,既保证输入消费不丢失也保证输出生产只执行一次,适用于金融、计费等对准确性要求极高的场景。


三、ksqlDB:SQL 友好的流处理

对于不熟悉 Java 编程、但对 SQL 非常熟悉的场景,ksqlDB 提供了以 SQL 语句进行流式处理的能力。

-- 注册输入流
CREATE STREAM pageviews (userid VARCHAR,pageid VARCHAR,viewtime BIGINT
) WITH (KAFKA_TOPIC='pageviews',VALUE_FORMAT='JSON'
);-- 统计每分钟页面浏览量
CREATE TABLE pv_counts ASSELECTuserid,TUMBLINGWINDOW(SECONDS 60) AS window,COUNT(*) AS viewsFROM pageviewsGROUP BY userid;
  • 流表一体:在 SQL 语法中,STREAM 表示无界流,TABLE 表示表格,两者之间可通过 SELECT 语句互转。
  • 可视化与交互:ksqlDB Server 提供 REST 接口,可与 Confluent Control Center 等图形化平台集成,实时监控查询执行状况。

四、关键概念与实践要点

主题要点
窗口(Window)支持 Tumbling、Hopping、Session 三种窗口类型,用于将无界流分割为有界时间段,以便执行聚合操作。
流–表转换stream.toTable()table.toStream();在流处理管道中灵活切换视图。
连接(Join)KStream–KStream、KStream–KTable 与 KTable–KTable 三种类型,支持内连接与外连接。
分区与重新分区groupByKey()selectKey() 会触发分区重分区(repartition),需要关注磨合吞吐与网络开销。
容错与 SLA设置 processing.guaranteeat_least_onceexactly_once_v2,并合理调优 commit.interval.ms
性能调优调整状态存储配置(RocksDB)、批量消费配置 max.poll.records、缓存大小 cache.max.bytes.buffering

五、应用场景

  1. 实时监控与告警
    利用窗口聚合统计指标(如每分钟失败率、延迟分布),结合 Kafka Streams 的 Exactly-Once 语义,可构建可靠的实时监控系统。

  2. 在线特征计算
    针对实时推荐或风控场景,在用户行为流上做 stateful 做特征提取,输出到 Kafka 供下游服务实时消费。

  3. 跨系统数据同步
    使用 ksqlDB 简单配置,将一个系统的事件流实时转化并写入目标系统(如 Elasticsearch、数据库),无需写大量样板代码。

  4. 微服务事件驱动
    将微服务间通信解耦为 Kafka Topic,通过 Kafka Streams 实现事件过滤、路由与聚合,提升系统松耦合性与可扩展性。


六、小结

Kafka 生态下两条主流流式处理路径:

  • Kafka Streams:面向 JVM 开发者,提供灵活的 Java API、可嵌入的运行时和强大的状态管理;
  • ksqlDB:面向 SQL 用户,降低学习门槛,快速构建流处理管道。
http://www.dtcms.com/a/269684.html

相关文章:

  • BERT代码简单笔记
  • C#中封装halcon函数的报错
  • 代码详细注释:C语言实现控制台用户注册登录系统
  • Google AI 刚刚开源 MCP 数据库工具箱,让 AI 代理安全高效地查询数据库
  • 前后端分离(java) 和 Nginx在服务器上的完整部署方案(redis、minio)
  • JxBrowser 7.43.4 版本发布啦!
  • 人工智能驱动下的可再生能源气象预测:构建绿色能源时代的新大脑
  • 微服务化采集平台:可扩展性与容错机制
  • 相机Camera日志实例分析之五:相机Camx【萌拍闪光灯后置拍照】单帧流程日志详解
  • AiPy实战:问界汽车交付速度破纪录的背后是什么?
  • Vue的初步学习
  • 146.在 Vue3 中使用 OpenLayers 地图上 ECharts 模拟飞机循环飞行
  • OS学习笔记
  • B站视频下载器 Bili23-Downloader v1.63.1 绿色版
  • LLMs之DeepSeek:AI模型市场深度分析:DeepSeek的挑战与机遇,模型市场份额、Token经济学与未来发展
  • 力扣 239 题:滑动窗口最大值的两种高效解法
  • 【python】 time_str = time_str.strip() 与 time_str = str(time_str).strip() 的区别
  • 基于物联网的智能交通灯控制系统设计
  • 使用 Docker 搭建 Java(SpringBoot)开发环境——AI教你学Docker
  • 零基础|宝塔面板|frp内网穿透|esp32cam远程访问|微信小程序
  • 电商业务是如何防护DDoS攻击的?
  • 2563、统计公平数对的数目
  • ElasticSearch集群状态查询及_cat 命令详解
  • JDBC 获取新增行主键值详解
  • 向量与向量组的线性相关性 线性代数
  • 【Android】搭配安卓环境及设备连接
  • 17-C#的socket通信TCP-1
  • 静态路由实验以及核心原理
  • 计算机网络第九章——数据链路层《局域网》
  • 裂变时刻:全球关税重构下的券商交易系统跃迁路线图(2025-2027)