Flink DataStream「上下文与状态处理」实战指南
1. 为什么需要“上下文(Context)”?
算子名称、并行度这种“静态配置”可以在构建拓扑时设置;但当前处理的 Key、任务并行度、执行模式、度量上报、定时器/水位线等运行时信息,只能在ProcessFunction 执行期间获取。
DataStream API 通过统一入口 Runtime Context 把执行引擎的能力暴露给算子。
1.1 Runtime Context 能拿到什么?
按职能可分为两类:
NonPartitionedContext(非分区上下文) — 与具体分区/Key 无关
- JobInfo:Job 名称、执行模式等
- TaskInfo:并行度、子任务索引等
- MetricGroup:度量注册/上报
- Watermark Manager:水位线相关
典型时机:
open()初始化、close()清理
PartitionedContext(分区上下文) — 与当前分区/Key 强相关
- State Manager:状态访问(
getState*)、当前 Key(getCurrentKey()) - ProcessingTime Manager:处理时间定时器、当前处理时间
典型时机:
processRecord()、定时器回调触发
1.2 例子:在 open() 里读取并行度和执行模式
new OneInputStreamProcessFunction<String, String>() {private transient int parallelism;private transient ExecutionMode executionMode;@Overridepublic void open(NonPartitionedContext<String> ctx) throws Exception {parallelism = ctx.getTaskInfo().getParallelism();executionMode = ctx.getJobInfo().getExecutionMode();}
}
2. 状态处理三步走:声明 → 注册 → 使用
原则:先声明(define & declare),后使用(get & update)。写一个有状态的 ProcessFunction 通常分 3 步:
- 定义 State:用
StateDeclaration定义状态的名称与RedistributionMode - 注册 State:在
ProcessFunction#usesStates()中显式声明所有要用的状态 - 获取/更新 State:在处理逻辑里通过
StateManager获取并读写
2.1 完整示例
private static class StatefulFunction implements OneInputStreamProcessFunction<Long, Long> {// Step 1: 定义一个 ListStatestatic final StateDeclaration.ListStateDeclaration<Long> LIST_STATE_DECL =StateDeclarations.listStateBuilder("example-list-state", TypeDescriptors.LONG).build();// Step 2: 在 usesStates 中注册@Overridepublic Set<StateDeclaration> usesStates() {return Collections.singleton(LIST_STATE_DECL);}// Step 3: 获取并更新@Overridepublic void processRecord(Long record, Collector<Long> out, RuntimeContext ctx) throws Exception {ListState<Long> state = ctx.getStateManager().getState(LIST_STATE_DECL);state.update(Collections.singletonList(record));}
}
3. 定义 State:名字 + 重分配(Redistribution)
StateDeclaration 需要两类信息:
-
Name:状态唯一标识
-
RedistributionMode:并行度/分区变化(尤其是 Non-Keyed)时,状态如何在分区间重新分布
NONE:不支持重分配(并行度变化将不安全)REDISTRIBUTABLE:该状态可以安全重分配,具体策略由状态定义决定IDENTICAL:保证不同分区中的状态恒等(一致),因此无需考虑重分配
经验法则:
- Keyed 流:状态天然绑定到 Key/分区,一般不需要 Redistribute
- Non-Keyed 流:分区随并行度变动而变化,强烈建议使用
REDISTRIBUTABLE或IDENTICAL的声明设计
3.1 支持的状态类型(对应 StateDeclaration)
- ValueState:单值,可
update(T)/value() - ListState:列表,可
add/addAll/get/update(List<T>) - ReducingState:基于
ReduceFunction的聚合单值 - AggregatingState<IN,OUT>:聚合单值,输入类型可与聚合结果不同,基于
AggregateFunction - MapState<UK,UV>:KV 映射,
put/putAll/get/entries/keys/values/isEmpty - BroadcastState<K,V>:广播流上的 KV 状态(所有并行实例接收相同元素)
3.2 快速构造:StateDeclarations 与 TypeDescriptors
// ValueState<Long>
ValueStateDeclaration<Long> v = StateDeclarations.valueState("example-value-state", TypeDescriptors.LONG);// MapState<Long, String>
MapStateDeclaration<Long, String> m =StateDeclarations.mapState("example-map-state", TypeDescriptors.LONG, TypeDescriptors.STRING);// ReducingState<Long>,聚合函数为 sum
ReducingStateDeclaration<Long> r =StateDeclarations.reducingState("example-reducing-state", TypeDescriptors.LONG, Long::sum);
TypeDescriptors提供常用类型:INT/LONG/BOOLEAN/STRING/LIST/MAP/...- 自定义类型:实现
TypeDescriptor描述自己的序列化信息
4. 声明 State:usesStates() 必须覆盖
@Override
public Set<StateDeclaration> usesStates() { return Set.of( /* 所有将被使用的声明 */ );
}
- 不在
usesStates()中声明的状态一律不可用 - Flink 会在作业编译期做合法性校验(详见 §6),非法声明会直接报错,更早暴露问题、避免运行期炸锅
5. 获取与更新:StateManager
StateManager 是状态入口,常用方法:
-
当前 Key:
<K> K getCurrentKey()(非 Keyed 流调用会抛UnsupportedOperationException) -
获取状态:
getState(...):未注册/不可用将抛异常getStateOptional(...):返回Optional,适合“存在即用”场景
示例要点:
@Override
public void processRecord(Event e, Collector<Result> out, RuntimeContext ctx) throws Exception {StateManager sm = ctx.getStateManager();String userId = sm.getCurrentKey(); // 仅在 Keyed 流合法ValueState<UserAgg> agg = sm.getState(USER_AGG_DECL);UserAgg cur = agg.value();agg.update(update(cur, e));
}
6. 合法性矩阵:哪些输入/状态组合会被拒绝?
不同 输入流类型(Global、Keyed、NonKeyed、Broadcast)对状态声明与访问的允许程度不同。文档给出了两张表(单输入 & 双输入),这里提炼核心规则:
- Broadcast 输入:只允许使用 BroadcastState;不允许按 Key 访问状态(没有 Key 的概念)
- Global 输入:不具备 Key 语义,不能访问依赖 Key 的状态(如
getCurrentKey()、Keyed State) - Non-Keyed 输入:可以使用 Non-Keyed 维度的状态(取决于声明的 Redistribute 能力);不允许读取当前 Key
- Keyed 输入:可以访问当前 Key,并使用各类 Keyed State(Value/List/Map/Reducing/Aggregating)
双输入(TwoInput):每个输入边各自遵循上述规则,以输入边维度判定合法性。例如:
TwoInputBroadcastStreamProcessFunction:广播边能用BroadcastState;另一个(Keyed/Non-Keyed)边按其自身规则取用状态TwoInputNonBroadcastStreamProcessFunction:两边均不为 Broadcast,则分别按其Keyed/NonKeyed/Global规则校验
编译期保护:
- Flink 会在编译期检查
usesStates()声明的合法性; - 若声明了与输入类型不兼容的状态,作业提交前就会报错。
7. ProcessingTime / Watermark:定时与时间语义补充
-
ProcessingTime Manager:
- 读取当前处理时间
- 注册/删除处理时间定时器(常用于“每 X 分钟清理”类逻辑)
-
Watermark Manager:
- 管理水位线推进
- 驱动事件时间相关定时器(如果你使用事件时间扩展 API)
选型建议:
- 业务时效以消息时间为准 → 用事件时间 + 水位线/事件定时
- 更关注“处理节点的墙钟” → 用处理时间定时
8. 典型设计范式
8.1 Keyed 计数器(ValueState)
static final ValueStateDeclaration<Long> COUNT_DECL =StateDeclarations.valueState("cnt", TypeDescriptors.LONG);@Override public Set<StateDeclaration> usesStates() { return Set.of(COUNT_DECL); }@Override
public void processRecord(Event e, Collector<Out> out, RuntimeContext ctx) throws Exception {ValueState<Long> cnt = ctx.getStateManager().getState(COUNT_DECL);long v = Optional.ofNullable(cnt.value()).orElse(0L) + 1;cnt.update(v);if (v % 1000 == 0) out.collect(new Out(ctx.getStateManager().getCurrentKey(), v));
}
8.2 Non-Keyed 聚合(ReducingState,支持重分配)
static final ReducingStateDeclaration<Long> SUM_DECL =StateDeclarations.reducingState("sum", TypeDescriptors.LONG, Long::sum);
// 建议把该声明设计为 REDISTRIBUTABLE(构建器中配置),以支持并行度变更时的安全迁移
8.3 广播规则 + 事件主流
-
广播边:
BroadcastState<RuleId, Rule>存规则 -
事件边:Keyed 处理,按用户/设备应用规则
-
合法性:
- 广播边只能访问
BroadcastState - 事件边可访问 Keyed State 与
getCurrentKey()
- 广播边只能访问
9. 工程化建议与避坑清单
- 声明即契约:所有状态都必须在
usesStates()中显式声明;否则不可获取 - Non-Keyed 状态要考虑并行度变化:优先选
REDISTRIBUTABLE,或保证IDENTICAL - Keyed vs Non-Keyed 分清语义边界:不要在 Non-Keyed 中调用
getCurrentKey() - Broadcast 只配合使用:广播输入只用
BroadcastState,不可直接“转成”其它流 - 类型描述一致性:
TypeDescriptors与业务类的序列化保持一致,避免反序列化失败 - 状态增长控制:必要时配置 TTL/定期清理;长列表/Map 建议分片或外溢
- 编译期报错要重视:非法状态声明/访问,在提交前就会失败,修掉才是王道
- 度量与命名:用
MetricGroup暴露关键指标;对每个处理节点withName,便于排障 - 时间语义对齐:处理时间定时器 ≠ 事件时间定时器,选错会出现“迟到/早到”异常
10. 一页速记(Cheat Sheet)
-
Context:
NonPartitionedContext:open/close场景(Job/Task/Metric/Watermark)PartitionedContext:processRecord/定时器回调(State/CurrentKey/ProcessingTime)
-
State 三步:定义(声明对象)→ 注册(
usesStates())→ 获取/更新(StateManager) -
Redistribution:Keyed 通常不需;Non-Keyed 选
REDISTRIBUTABLE/IDENTICAL -
合法性:按输入流类型(Global/Keyed/NonKeyed/Broadcast)判定;非法即编译期报错
-
Broadcast 限制:只能用
BroadcastState,且需与另一输入一同处理
