当前位置: 首页 > news >正文

Apache Flink 从流处理基础到恰好一次语义

1. 为什么是流:有界 vs 无界

  • 批处理(有界):数据“先到齐,再处理”。你能做全量排序、全局统计和“最终报告”。
  • 流处理(无界):数据“永不停止”,必须边到边算
  • 现实世界的数据(点击、交易、传感器)天生是。能统一跑实时与回放(重处理历史)且产出一致结果,是现代数据系统的关键诉求。

在 Flink 中,一切皆是流式数据流(streaming dataflow):从一个或多个 Source 流入,经一系列 Operator 转换,流向一个或多个 Sink

在这里插入图片描述

2. Flink 程序模型与数据流图

一个 Flink 作业可视为有向图:节点是算子(map/filter/keyBy/window/process),边是数据流。常见来源/去向:

  • Source:Kafka、Kinesis、文件、数据库 CDC、对象存储等
  • Sink:Kafka、Elastic、Iceberg/Hudi、JDBC、Data Warehouse、OLAP 引擎等

示意(Mermaid):

keyBy
Kafka / Kinesis / Files
Source
Map/Filter
Window/Aggregation
Process/Enrich
Sink: DW/ES/Kafka/DB

实践中,程序中的“一个转换”可能会在运行时被优化为“多个算子”。

3. 并行与数据重分发:one-to-one vs redistributing

  • 并行度(parallelism):同一算子可有 N 个子任务(subtask),各自独立、不同线程/机器运行。

  • one-to-one:保序、分区不变(如 Source -> map())。

  • redistributing:改变分区/路由(如 keyBy() 按 Key 哈希分发,rebalance() 随机打散,broadcast() 广播)。

    • 注意:重分发后,仅发送子任务与接收子任务这一对之间的相对顺序被保留;跨 Key 的全局顺序不可保证。

4. 事件时间与水位线:让“时间”说了算

  • 处理时间(Processing Time):算子机器的时钟。简单但不稳定。

  • 事件时间(Event Time):事件本身携带的时间戳。可按真实发生顺序计算,适合乱序、重放场景。

  • 水位线(Watermark):系统对“截至某时刻,应该已经看到所有早于它的事件”的一种进度宣告

    • 常见策略:有界乱序(允许最大延迟,如 5s),或基于观测的 自适应 策略。

要点:用事件时间 + 合理水位线,才能在“实时 + 历史重放”两种模式下得到一致可复算的结果。

5. 有状态流处理:本地状态、KeyBy 与算子并行实例

  • Flink 的很多算子是有状态的:当前事件的处理依赖于之前所有事件的累积影响
  • keyBy() 将同一 Key 的事件路由到同一并行子任务,从而让每个子任务维护“自己那份 Key 的本地状态”。
  • 本地状态(JVM 堆或托管到 RocksDB 等持久化结构)带来高吞吐、低延迟
  • 从系统视角看,有状态算子的并行实例集合,像一个分片的 KV 存储

6. 容错与恰好一次:检查点与回放

  • 检查点(Checkpoint):周期性、异步地捕获作业全局状态(包含 Source 的偏移和各算子本地状态)。

  • 失败恢复:从最近检查点恢复状态,同时让 Source 回退到检查点时的偏移,继续处理。

  • Exactly-Once 语义:对状态更新Source 偏移的一致快照 + 对 Sink 的幂等/两阶段提交支持,可实现“计算端恰好一次”。

    • 端到端 Exactly-Once 还取决于外部系统的事务/幂等等支持(如 Kafka 事务写、支持两阶段提交的 JDBC/文件写入等)。

示意(Mermaid):

Source (Kafka Offset)Stateful Operator (Keyed State)Checkpoint StorageSinkAsync snapshot(Keyed State)Save offsetsloop[every N seconds]Failure happensFailRestore last snapshotRestore offsetsReplay from restored offsetResume processing (exactly-once)Source (Kafka Offset)Stateful Operator (Keyed State)Checkpoint StorageSink

7. 实战一:实时订单统计(Java DataStream)

场景:从 Kafka 消费订单事件,按用户 userId 的事件时间做滚动窗口汇总(1 分钟订单数与金额),同时打开检查点实现容错与“恰好一次”。

依赖(Maven 核心)

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version>
</dependency>

示例代码(精简版)

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Duration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;public class OrdersJob {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1) 开启检查点(每 10s,一次性语义)env.enableCheckpointing(10_000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5_000);env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-ckpt"); // 示例:本地存储// 2) Kafka SourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("orders").setGroupId("orders-consumer").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<String> raw = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-orders");// 3) 解析 JSON 并抽取事件时间(假设字段 ts 为毫秒)DataStream<Order> orders = raw.map(Order::fromJson) // 自行实现:String -> Order(userId, amount, ts).assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((e, ts) -> e.ts));// 4) 按 userId 做 1 分钟滚动窗口统计DataStream<UserAgg> result = orders.keyBy(o -> o.userId).window(TumblingEventTimeWindows.of(Time.minutes(1))).reduce((a, b) -> new Order(a.userId, a.amount + b.amount, Math.max(a.ts, b.ts)),(key, window, it, out) -> {Order agg = it.iterator().next();out.collect(new UserAgg(key.getKey(), window.getStart(), window.getEnd(), 1, agg.amount));});// 5) Sink(示例打印;生产环境请用支持事务/幂等的 Sink)result.print();env.execute("orders-1min-aggregation");}static class Order {public String userId;public double amount;public long ts;static Order fromJson(String s) { /* TODO: 解析实现 */ return null; }public Order() {}public Order(String userId, double amount, long ts) {this.userId = userId; this.amount = amount; this.ts = ts;}}static class UserAgg {public String userId;public long windowStart;public long windowEnd;public long cnt;public double amount;public UserAgg(String userId, long ws, long we, long cnt, double amount) {this.userId = userId; this.windowStart = ws; this.windowEnd = we; this.cnt = cnt; this.amount = amount;}@Override public String toString() {return String.format("user=%s, win=[%d,%d), cnt=%d, amount=%.2f",userId, windowStart, windowEnd, cnt, amount);}}
}

要点回顾

  • 事件时间 + 有界乱序 5s 水位线。
  • keyBy(userId) 将同一用户的事件路由到同一子任务,窗口聚合依赖本地状态。
  • 打开 EXACTLY_ONCE 检查点;生产环境搭配事务性 Sink获得端到端“恰好一次”。

8. 实战二:Flink SQL 统计 10 分钟 UV

场景:统计近 10 分钟的去重访客数(UV),按事件时间滑动。

-- 1) 定义 Kafka 源(简化示意)
CREATE TABLE page_view (user_id STRING,url STRING,ts BIGINT,WATERMARK FOR ts_rowtime AS ts_rowtime - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'pv','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','scan.startup.mode' = 'latest-offset'
);-- 2) 计算 10 分钟滑动 UV,每 1 分钟出一次结果
SELECTwindow_start,window_end,COUNT(DISTINCT user_id) AS uv
FROM TABLE(TUMBLE(TABLE page_view, DESCRIPTOR(ts_rowtime), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end;

说明

  • 通过 WATERMARK 声明事件时间列,设定乱序容忍度。
  • 使用窗口 TVF 进行聚合,语义清晰、易于维护。

9. 常见坑与最佳实践清单

  1. 端到端 Exactly-Once ≠ 仅开启检查点

    • Source 偏移 + 算子状态 + Sink 写入三者需要原子一致。选择支持两阶段提交/事务/幂等的 Sink(如 Kafka 事务、支持 XA/2PC 的 JDBC Sink、文件写入的临时+原子 rename 策略等)。
  2. 水位线过于保守或激进

    • 过小:容忍不了真实乱序,导致早关窗、丢失迟到数据;过大:延迟增大、状态胀大。基于数据分布观测合理设置。
  3. Key 倏地倾斜

    • 热 Key 导致单个子任务背压。方案:拆分热 Key、基于“二级 Key + 后聚合”的 Key 拆分(salting),或使用分层聚合。
  4. 状态体量不可控

    • 定期 TTL、合理的窗口策略、布隆过滤/近似去重结构、外部维表缓存策略;必要时选择 RocksDB 状态后端并调优。
  5. 反压与资源

    • 监控背压链路、合理设置并行度与重分发策略;Source 拉取速率、Sink 刷写批次、网络缓冲都影响端到端吞吐。
  6. 历史重放一致性

    • 必须使用事件时间语义 + 稳定的水位线策略;避免处理时间驱动的逻辑影响可复算性。

10. 进一步学习与参考路线

  • 优先打牢四件事:事件时间、水位线、有状态算子、检查点。

  • 实践优先:先跑通一个 Kafka→Flink→Sink 的端到端流水线,再逐步引入窗口、状态与容错。

  • 升级路径

    1. 单流 ETL/聚合 →
    2. 多流 Join/会话窗口 →
    3. 事务性 Sink 与端到端恰好一次 →
    4. 复杂事件驱动(ProcessFunction/CEP)与侧输出流 →
    5. 生产级部署(资源隔离、HA、监控告警、滚动升级、Savepoint 运维)。

文章转载自:

http://3ERVXRUx.tpnxj.cn
http://Tc5NbNTK.tpnxj.cn
http://4xxf1H5K.tpnxj.cn
http://osDoPv0q.tpnxj.cn
http://1aJUlLUL.tpnxj.cn
http://v6sLP4NI.tpnxj.cn
http://UmbBICFc.tpnxj.cn
http://ZEbqOsH6.tpnxj.cn
http://NQ3liqVt.tpnxj.cn
http://pAUeQ1Rm.tpnxj.cn
http://QRQuszLt.tpnxj.cn
http://3DyaQHiV.tpnxj.cn
http://O8FfcXl0.tpnxj.cn
http://WTNlwkhJ.tpnxj.cn
http://oht7onBf.tpnxj.cn
http://kVriAden.tpnxj.cn
http://uR3gjtGt.tpnxj.cn
http://mG1XbRdL.tpnxj.cn
http://064DHvF0.tpnxj.cn
http://HE4gSuPc.tpnxj.cn
http://z07B8vk0.tpnxj.cn
http://Z9kwjXAG.tpnxj.cn
http://FccdtniW.tpnxj.cn
http://1IENwFOc.tpnxj.cn
http://XPXitXmw.tpnxj.cn
http://AniJgZFA.tpnxj.cn
http://eF3XZ22e.tpnxj.cn
http://WZg2ma27.tpnxj.cn
http://Y9AwzGfH.tpnxj.cn
http://Zff2f8qE.tpnxj.cn
http://www.dtcms.com/a/379932.html

相关文章:

  • 第2篇:数据持久化实战
  • redis sentinel 与 clauster 的区别
  • Vue: 侦听器(Watch)
  • HTML 设计与使用入门
  • 【大数据专栏】流式处理框架-Apache Fink
  • 老项目CSS样式失效?调整css插件版本解决
  • Flink 实时流处理实战:电商实时大屏分析
  • ARM(7)IMX6ULL 按键控制(轮询 + 中断)优化工程
  • 基于STM32设计的青少年学习监控系统(华为云IOT)_282
  • Django全栈班v1.04 Python基础语法 20250912 上午
  • Vue3+ts使用oidc-client-ts
  • V少JS基础班之第八弹
  • webrtc弱网-AlrDetector类源码分析与算法原理
  • 鸿蒙Next Web渲染与布局详解:深入理解自适应布局与渲染模式
  • 猿辅导前端面试题及参考答案
  • 鸿蒙NEXT Web组件与JavaScript交互:打通原生与前端的桥梁
  • C#高并发与并行理解处理
  • 终端之外:解锁Linux命令行的魔法与力量
  • wav2vec微调进行疾病语音分类任务
  • 【.Net技术栈梳理】10-.NET Core 程序的执行
  • 【完整源码+数据集+部署教程】仓库物品分类检测图像分割系统源码和数据集:改进yolo11-convnextv2
  • 软件定义汽车(SDV)与区域电子电气架构(Zonal EEA)的技术革新
  • R语言:数据读取与重构、试验设计(RCB/BIB/正交/析因)、ggplot2高级绘图与统计检验(t检验/方差分析/PCA/聚类)
  • ffmpeg切割音频
  • 【论文笔记】RadarOcc: Robust 3D Occupancy Prediction with 4D Imaging Radar
  • 【Axios 教程】从入门到高级
  • 数据库重演Real Application Testing: Database Capture FAQ (Doc ID 1920275.1)
  • 一个海康相机OCR的程序
  • 蚂蚁 S19 Pro+ Hyd 191T:高效能矿机解析与性能评测
  • C++并发编程:std::thread右值形式传参解析