优化 Flink 基于状态的 ETL少 Shuffle、不膨胀、可落地的工程
一、背景与常见痛点
- 网络放大:每一步
keyBy
/shuffle
都在“复制-搬运-发散”,热点 key 甚至拖垮全链路。 - 状态无界:
ListState
存明细、窗口 allowed lateness 太大、TTL 没开,RocksDB 胀到不可控。 - Checkpoint 慢:状态太多 + 全量快照,导致 checkpoint/backpressure 恶性循环。
核心对策:先压缩再重分区,增量代替明细,生命周期严格管理。
二、原则与总体思路
- 能本地先压缩,就不要早 shuffle(预聚合、小批、去重)。
- 维表广播或异步查,事实流不改 key、不改并行度,消除重分区。
- 增量状态胜过原始元素(Aggregating/Reducing > List),配TTL + 空闲清理。
- 二阶段聚合 + 倾斜治理,热点拆散再合并。
- 窗口增量聚合,窗口结束立即清理;迟到数据侧输出别再占状态。
三、少 Shuffle 的 7 条硬招
-
同 Key、同并行度、同分区器
- 保持
keyBy
字段一致与并行度稳定;避免“二次 keyBy”。 - 能链的算子(map/filter → keyBy → process)尽量 chain,减少网络边界。
- 保持
-
维表 Broadcast-Hash Join(小表)
- 维度用
broadcast()
,事实流不重分区:广播只走一次网络,事实流全本地匹配。
- 维度用
-
本地预聚合(Map-side Combine / MiniBatch)→ 再全局聚合
- 在 map/flatMap 内做子任务 HashMap 累加,时间/条数触发 flush,把数据量先压一遍再
keyBy
。
- 在 map/flatMap 内做子任务 HashMap 累加,时间/条数触发 flush,把数据量先压一遍再
-
倾斜 key 治理:Salt + 两阶段
- 本地阶段给热点 key 加盐(如 8~64 份),二次聚合再去盐合并,防止单分区被打爆。
-
窗口增量聚合 + 合理触发
- 选用滚动/滑动窗口 +
aggregate
,避免保存原始元素。触发频率配合 MiniBatch 降边界交互。
- 选用滚动/滑动窗口 +
-
避免无谓宽依赖
- 提前
filter
/去重;同 schema 的流再union
;尽量把“压缩”放在keyBy
之前。
- 提前
-
Shuffle/网络参数
- 离线回填或批风格阶段优先 blocking shuffle;开启网络压缩;调好批大小和网络内存。
四、控住状态体积的 7 条硬招
-
增量状态替代明细
- 用
AggregatingState
/ReducingState
代替ListState
;去重用 Bloom/Count-Min 等近似结构。
- 用
-
TTL + 清理策略
- 为 MapState/ListState 开启 TTL(OnCreateAndWrite 或 OnReadAndWrite);
- 事件时间场景:窗口结束立刻 clear,
allowedLateness
尽量小,迟到走侧输出。
-
结构化与分段
- 大 MapState 按时间桶(小时/天)分段,整桶到期统一删除;维表广播用版本号整体替换旧桶。
-
幂等下游 + 窗口最小化
- Sink 用主键 Upsert;窗口尽量滚动/滑动/会话 + 增量函数,减少存明细的需求。
-
RocksDB/Changelog 与快照
- 调写缓冲/后台线程/目标文件尺寸;开启增量 checkpoint;必要时评估 Changelog State。
-
数据“入口即减肥”
- 采样/限流无价值事件;JSON 大字段只保参与计算的列或做短编码。
-
监控与守护
- 盯住
totalStateSize
、numStateEntries
、Checkpoint size/duration、RocksDB compaction backlog。
- 盯住
五、典型场景的模板化方案
A. 近 24 小时唯一去重 + 计数
- 做法:
map
内小批预聚合 →keyBy(userId)
→ 使用 Bloom + TTL(24h) 检查(userId, objId)
,首次命中再计数。 - 收益:先压缩再 shuffle,状态只存位图 + 累计值。
B. 事实流 + 维表 Join(百万行级)
- 做法:维表广播,事实流不重分区;广播算子里
MapState<key, dim>
,全量/增量带版本,旧版本整桶清理。 - 维表太大/更新频繁:改 Async I/O + 本地 LRU 缓存,限制缓存上限。
C. 倾斜订单聚合(热点商家)
- 做法:本地阶段
shopId#rand(0..63)
局部聚合;全局阶段去盐再合并。 - 配合:MiniBatch + TWO_PHASE;热点商家可单独旁路到更高并行度。
六、端到端参考实现
1) DataStream:本地小批 + 全局聚合(二阶段)
// 本地小批聚合(每 2s 或 2000 条 flush 一次)
SingleOutputStreamOperator<Agg> local =src.process(new LocalMiniBatchAggFunction(2000, 2000L)); // 自定义,小批内 HashMap 聚合// 全局 reduce/aggregate(状态极小)
SingleOutputStreamOperator<GlobalAgg> global =local.keyBy(a -> a.key).process(new GlobalReduceProcessFunction()); // 仅存累加值,不存明细
2) DataStream:布隆去重 + TTL
public class DedupFn extends KeyedProcessFunction<String, Event, Event> {private transient MapState<Long, RoaringBitmap> bloomByHour; // 按小时桶分段private transient AggregatingState<Event, Long> cnt;@Overridepublic void open(Configuration parameters) {StateTtlConfig ttl = StateTtlConfig.newBuilder(Time.days(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).cleanupFullSnapshot() // 结合检查点清理.build();MapStateDescriptor<Long, RoaringBitmap> d =new MapStateDescriptor<>("bloom", LongSerializer.INSTANCE, new RoaringSer());d.enableTimeToLive(ttl);bloomByHour = getRuntimeContext().getMapState(d);// cnt: AggregatingState 初始化略}@Overridepublic void processElement(Event e, Context ctx, Collector<Event> out) throws Exception {long hourBucket = e.ts / 3_600_000L;RoaringBitmap bm = bloomByHour.get(hourBucket);if (bm == null) bm = new RoaringBitmap();int h = hash(e.userId + "|" + e.objId);if (!bm.contains(h)) { // 首次出现bm.add(h);bloomByHour.put(hourBucket, bm);// cnt.add(e) 或输出增量out.collect(e);}// 空闲清理定时器(30 分钟)ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 30 * 60 * 1000);}@Overridepublic void onTimer(long ts, OnTimerContext ctx, Collector<Event> out) throws Exception {// 可在此按 hourBucket 删除过期桶(结合 watermark/当前时间)}
}
3) SQL:MiniBatch + 两阶段聚合 + DISTINCT 拆分
-- 打开小批和两阶段聚合
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '2 s';
SET 'table.exec.mini-batch.size' = '5000';
SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';
SET 'table.optimizer.distinct-agg.split.enabled' = 'true';-- 维表广播(小表)示意
CREATE TEMPORARY VIEW dim WITH (... ) AS SELECT ... ;
-- Hint 方式(不同版本语法略有差异)
SELECT /*+ BROADCAST(dim) */f.user_id, f.ts, d.level
FROM fact f
LEFT JOIN dim d ON f.key = d.key;
七、RocksDB / Checkpoint 调优要点
- RocksDB Options(示例):写缓冲 64MB、3 个 memtable、目标文件 256MB;后台 compaction 线程 ≥2。
- 增量 checkpoint:大状态作业必须开,配合对象存储。
- 分层清理:TTL + 版本桶 + 窗口结束清理,共同把快照体积压小。
- 避免写放大:监控 compaction backlog;必要时调大 write buffer 或适当放宽 flush 频率。
小贴士:如果 checkpoint 仍慢,先确认是否有长尾 Key 导致状态不清;其次检查MiniBatch 粒度是否过细(批太小会放大网络与状态波动)。
八、监控与 SLO 建议
-
核心指标
totalStateSize
,numStateEntries
(总体状态规模)- Checkpoint 对齐时间 / 持续时间 / 大小 / 失败率
- RocksDB write stalls / compaction pending
- 任务级 busyTime / backPressure、网络 I/O 量、反压传播
-
SLO 示例
- 99p 端到端延迟 ≤ 5s;Checkpoint 成功率 ≥ 99.9%;Checkpoint 99p 时延 ≤ 30s;重启恢复 ≤ 60s。
九、压测方法(如何量化收益)
-
基线作业:不做预聚合/小批/广播,记录网络流量、状态大小、checkpoint 时延。
-
逐项启用策略:
- 先开 MiniBatch → 记录指标变化;
- 再改为广播维表 / Async+缓存;
- 最后上 Bloom/分桶 TTL。
-
记录三组数:Shuffle 字节数、平均/99p Checkpoint 时长、总状态体积。
-
目标(经验值,具体以数据分布为准)
- Shuffle 下降 30%~80%,Checkpoint 时长下降 30%+,状态体积下降 50%+。
十、常见反模式与“踩坑图鉴”
- 为了“方便”把所有明细都丢进
ListState
; - allowed lateness 设很大但没侧输出,窗口状态永不清;
- 维表 Join 一律事实流重分区(其实可以广播/Async);
- 无脑 DISTINCT 聚合,没开两阶段与 MiniBatch;
- 倾斜 key 不治理,单分区热点把 checkpoint 拖爆;
- 有 TTL 但没空闲清理定时器,冷 key 永远留在状态里。
十一、上线前自查清单
- 预压缩再 shuffle:MiniBatch、局部去重/聚合是否已在
keyBy
之前? - 维表处理:能广播的广播;不能广播的 Async+LRU+上限。
- 二阶段聚合:是否开启 TWO_PHASE / DISTINCT 拆分?
- 倾斜治理:热点 key 是否使用盐分片 + 二次聚合?
- 状态类型:尽量用 Aggregating/ReducingState;避免 List 存明细。
- TTL/清理:所有长生命周期状态都配 TTL;窗口结束
clear()
;空闲定时器已注册。 - RocksDB/Checkpoint:增量快照已开;OptionsFactory 已按量级压测过。
- 监控告警:state size、checkpoint、backpressure、compaction backlog 阈值齐全。
十二、结语
Flink 的“高吞吐 + 低延迟”不是白来的:算子链路要瘦身(少 shuffle)、状态要增量化(不存明细)、生命周期要严格(TTL + 及时清理)。把上面的策略按“模板化”落到每个环节,配合可观测与压测,很容易把成本打下来、把稳定性拉上去。