当前位置: 首页 > news >正文

Flink DataStream「上下文与状态处理」实战指南

1. 为什么需要“上下文(Context)”?

算子名称、并行度这种“静态配置”可以在构建拓扑时设置;但当前处理的 Key、任务并行度、执行模式、度量上报、定时器/水位线等运行时信息,只能在ProcessFunction 执行期间获取。
DataStream API 通过统一入口 Runtime Context 把执行引擎的能力暴露给算子。

1.1 Runtime Context 能拿到什么?

按职能可分为两类:

NonPartitionedContext(非分区上下文) — 与具体分区/Key 无关

  • JobInfo:Job 名称、执行模式等
  • TaskInfo:并行度、子任务索引等
  • MetricGroup:度量注册/上报
  • Watermark Manager:水位线相关

典型时机:open() 初始化、close() 清理

PartitionedContext(分区上下文) — 与当前分区/Key 强相关

  • State Manager:状态访问(getState*)、当前 Key(getCurrentKey()
  • ProcessingTime Manager:处理时间定时器、当前处理时间

典型时机:processRecord()、定时器回调触发

1.2 例子:在 open() 里读取并行度和执行模式

new OneInputStreamProcessFunction<String, String>() {private transient int parallelism;private transient ExecutionMode executionMode;@Overridepublic void open(NonPartitionedContext<String> ctx) throws Exception {parallelism   = ctx.getTaskInfo().getParallelism();executionMode = ctx.getJobInfo().getExecutionMode();}
}

2. 状态处理三步走:声明 → 注册 → 使用

原则先声明(define & declare),后使用(get & update)。写一个有状态的 ProcessFunction 通常分 3 步:

  1. 定义 State:用 StateDeclaration 定义状态的名称RedistributionMode
  2. 注册 State:在 ProcessFunction#usesStates()显式声明所有要用的状态
  3. 获取/更新 State:在处理逻辑里通过 StateManager 获取并读写

2.1 完整示例

private static class StatefulFunction implements OneInputStreamProcessFunction<Long, Long> {// Step 1: 定义一个 ListStatestatic final StateDeclaration.ListStateDeclaration<Long> LIST_STATE_DECL =StateDeclarations.listStateBuilder("example-list-state", TypeDescriptors.LONG).build();// Step 2: 在 usesStates 中注册@Overridepublic Set<StateDeclaration> usesStates() {return Collections.singleton(LIST_STATE_DECL);}// Step 3: 获取并更新@Overridepublic void processRecord(Long record, Collector<Long> out, RuntimeContext ctx) throws Exception {ListState<Long> state = ctx.getStateManager().getState(LIST_STATE_DECL);state.update(Collections.singletonList(record));}
}

3. 定义 State:名字 + 重分配(Redistribution)

StateDeclaration 需要两类信息:

  • Name:状态唯一标识

  • RedistributionMode:并行度/分区变化(尤其是 Non-Keyed)时,状态如何在分区间重新分布

    • NONE:不支持重分配(并行度变化将不安全)
    • REDISTRIBUTABLE:该状态可以安全重分配,具体策略由状态定义决定
    • IDENTICAL:保证不同分区中的状态恒等(一致),因此无需考虑重分配

经验法则:

  • Keyed 流:状态天然绑定到 Key/分区,一般不需要 Redistribute
  • Non-Keyed 流:分区随并行度变动而变化,强烈建议使用 REDISTRIBUTABLEIDENTICAL 的声明设计

3.1 支持的状态类型(对应 StateDeclaration

  • ValueState:单值,可 update(T) / value()
  • ListState:列表,可 add/addAll/get/update(List<T>)
  • ReducingState:基于 ReduceFunction 的聚合单值
  • AggregatingState<IN,OUT>:聚合单值,输入类型可与聚合结果不同,基于 AggregateFunction
  • MapState<UK,UV>:KV 映射,put/putAll/get/entries/keys/values/isEmpty
  • BroadcastState<K,V>:广播流上的 KV 状态(所有并行实例接收相同元素)

3.2 快速构造:StateDeclarationsTypeDescriptors

// ValueState<Long>
ValueStateDeclaration<Long> v = StateDeclarations.valueState("example-value-state", TypeDescriptors.LONG);// MapState<Long, String>
MapStateDeclaration<Long, String> m =StateDeclarations.mapState("example-map-state", TypeDescriptors.LONG, TypeDescriptors.STRING);// ReducingState<Long>,聚合函数为 sum
ReducingStateDeclaration<Long> r =StateDeclarations.reducingState("example-reducing-state", TypeDescriptors.LONG, Long::sum);
  • TypeDescriptors 提供常用类型:INT/LONG/BOOLEAN/STRING/LIST/MAP/...
  • 自定义类型:实现 TypeDescriptor 描述自己的序列化信息

4. 声明 State:usesStates() 必须覆盖

@Override
public Set<StateDeclaration> usesStates() { return Set.of( /* 所有将被使用的声明 */ );
}
  • 不在 usesStates() 中声明的状态一律不可用
  • Flink 会在作业编译期做合法性校验(详见 §6),非法声明会直接报错,更早暴露问题、避免运行期炸锅

5. 获取与更新:StateManager

StateManager 是状态入口,常用方法:

  • 当前 Key<K> K getCurrentKey()(非 Keyed 流调用会抛 UnsupportedOperationException

  • 获取状态

    • getState(...):未注册/不可用将抛异常
    • getStateOptional(...):返回 Optional,适合“存在即用”场景

示例要点:

@Override
public void processRecord(Event e, Collector<Result> out, RuntimeContext ctx) throws Exception {StateManager sm = ctx.getStateManager();String userId = sm.getCurrentKey(); // 仅在 Keyed 流合法ValueState<UserAgg> agg = sm.getState(USER_AGG_DECL);UserAgg cur = agg.value();agg.update(update(cur, e));
}

6. 合法性矩阵:哪些输入/状态组合会被拒绝?

不同 输入流类型(Global、Keyed、NonKeyed、Broadcast)对状态声明与访问的允许程度不同。文档给出了两张表(单输入 & 双输入),这里提炼核心规则:

  • Broadcast 输入:只允许使用 BroadcastState不允许按 Key 访问状态(没有 Key 的概念)
  • Global 输入:不具备 Key 语义,不能访问依赖 Key 的状态(如 getCurrentKey()、Keyed State)
  • Non-Keyed 输入:可以使用 Non-Keyed 维度的状态(取决于声明的 Redistribute 能力);不允许读取当前 Key
  • Keyed 输入:可以访问当前 Key,并使用各类 Keyed State(Value/List/Map/Reducing/Aggregating)

双输入(TwoInput):每个输入边各自遵循上述规则,以输入边维度判定合法性。例如:

  • TwoInputBroadcastStreamProcessFunction:广播边能用 BroadcastState;另一个(Keyed/Non-Keyed)边按其自身规则取用状态
  • TwoInputNonBroadcastStreamProcessFunction:两边均不为 Broadcast,则分别按其 Keyed/NonKeyed/Global 规则校验

编译期保护

  • Flink 会在编译期检查 usesStates() 声明的合法性
  • 若声明了与输入类型不兼容的状态,作业提交前就会报错

7. ProcessingTime / Watermark:定时与时间语义补充

  • ProcessingTime Manager

    • 读取当前处理时间
    • 注册/删除处理时间定时器(常用于“每 X 分钟清理”类逻辑)
  • Watermark Manager

    • 管理水位线推进
    • 驱动事件时间相关定时器(如果你使用事件时间扩展 API)

选型建议:

  • 业务时效以消息时间为准 → 用事件时间 + 水位线/事件定时
  • 更关注“处理节点的墙钟” → 用处理时间定时

8. 典型设计范式

8.1 Keyed 计数器(ValueState)

static final ValueStateDeclaration<Long> COUNT_DECL =StateDeclarations.valueState("cnt", TypeDescriptors.LONG);@Override public Set<StateDeclaration> usesStates() { return Set.of(COUNT_DECL); }@Override
public void processRecord(Event e, Collector<Out> out, RuntimeContext ctx) throws Exception {ValueState<Long> cnt = ctx.getStateManager().getState(COUNT_DECL);long v = Optional.ofNullable(cnt.value()).orElse(0L) + 1;cnt.update(v);if (v % 1000 == 0) out.collect(new Out(ctx.getStateManager().getCurrentKey(), v));
}

8.2 Non-Keyed 聚合(ReducingState,支持重分配)

static final ReducingStateDeclaration<Long> SUM_DECL =StateDeclarations.reducingState("sum", TypeDescriptors.LONG, Long::sum);
// 建议把该声明设计为 REDISTRIBUTABLE(构建器中配置),以支持并行度变更时的安全迁移

8.3 广播规则 + 事件主流

  • 广播边:BroadcastState<RuleId, Rule> 存规则

  • 事件边:Keyed 处理,按用户/设备应用规则

  • 合法性:

    • 广播边只能访问 BroadcastState
    • 事件边可访问 Keyed State 与 getCurrentKey()

9. 工程化建议与避坑清单

  • 声明即契约:所有状态都必须在 usesStates() 中显式声明;否则不可获取
  • Non-Keyed 状态要考虑并行度变化:优先选 REDISTRIBUTABLE,或保证 IDENTICAL
  • Keyed vs Non-Keyed 分清语义边界:不要在 Non-Keyed 中调用 getCurrentKey()
  • Broadcast 只配合使用:广播输入只用 BroadcastState,不可直接“转成”其它流
  • 类型描述一致性TypeDescriptors 与业务类的序列化保持一致,避免反序列化失败
  • 状态增长控制:必要时配置 TTL/定期清理;长列表/Map 建议分片或外溢
  • 编译期报错要重视:非法状态声明/访问,在提交前就会失败,修掉才是王道
  • 度量与命名:用 MetricGroup 暴露关键指标;对每个处理节点 withName,便于排障
  • 时间语义对齐:处理时间定时器 ≠ 事件时间定时器,选错会出现“迟到/早到”异常

10. 一页速记(Cheat Sheet)

  • Context

    • NonPartitionedContextopen/close 场景(Job/Task/Metric/Watermark)
    • PartitionedContextprocessRecord/定时器回调(State/CurrentKey/ProcessingTime)
  • State 三步:定义(声明对象)→ 注册(usesStates())→ 获取/更新(StateManager

  • Redistribution:Keyed 通常不需;Non-Keyed 选 REDISTRIBUTABLE/IDENTICAL

  • 合法性:按输入流类型(Global/Keyed/NonKeyed/Broadcast)判定;非法即编译期报错

  • Broadcast 限制:只能用 BroadcastState,且需与另一输入一同处理

http://www.dtcms.com/a/545540.html

相关文章:

  • MLP(Multilayer Perceptron,多层感知机)怎么解决异或问题
  • 惠普DL380服务器安装系统以后无法读取到系统盘启动解决方案(其他品牌服务器类似解决思路)
  • 做个网站的价格长荣建设深圳公司网站
  • 公司要做个网站吗成都短视频运营
  • Linux C/C++ 学习日记(37):协程(六):总结
  • NVIDIA Jetson Orin NX安装graspnet失败解决方案
  • 网站后台有些不显示自己怎么做外贸网站
  • 有几个网站可以做代发的制作小诗集
  • 迷你电脑主机哪个牌子好?有哪些源头OEM/ODM定制厂商
  • 微信小程序开发案例 | 通讯录小程序(下)
  • 大模型算法面试笔记——多头潜在注意力(MLA)
  • 常州城投建设工程招标有限公司网站泰安网站建设策划方案
  • 南通公司网站建设湖南网站优化公司
  • 做图标的网站广州海珠发布
  • 2022/12 JLPT听力原文 问题四
  • openEuler安装mysql8,流程详细
  • 【Linux】库制作与原理 从生成使用到 ELF 文件与链接原理解析
  • 【开题答辩全过程】以 儿童疫苗接种提醒系统的设计与实现为例,包含答辩的问题和答案
  • 【linux】基础开发工具(2)vim
  • 宁波找网站建设企业如何使用网络营销策略
  • 关于进一步做好网络安全等级保护有关工作的问题释疑-【二级以上系统重新备案】、【备案证明有效期三年】
  • Flink Keyed State 详解之三
  • LangChain4j学习3:模型参数
  • 驻马店做网站哪家好常州微网站建设
  • 深圳网站建设报价网站开发客户来源
  • 仓颉开发鸿蒙应用:深入理解组件生命周期的设计哲学与实践
  • Java 启动脚本-简介版
  • CFX Manager下载安装教程
  • 基于STM32HAL库判断传感器数据和系统定时器外部中断
  • 仓颉语言中的成员变量与方法:深入剖析与工程实践