当前位置: 首页 > news >正文

Flink Keyed State 详解之二

Flink Operator State 详解

1. 基本概念

Operator State(算子状态)是 Flink 中另一种重要的状态类型。与 Keyed State 不同,Operator State 与特定的算子实例相关联,而不是与特定的 key 相关联。Operator State 的特点是:

  • 作用域限定在算子实例上
  • 每个并行算子实例都有其独立的状态副本
  • 状态会根据算子的并行度自动重新分布

2. 适用场景

Operator State 适用于以下场景:

  1. Kafka 消费者偏移量管理
  2. 需要维护算子级别状态的场景
  3. 广播状态模式
  4. 需要在算子实例间共享状态的场景

3. Operator State 类型

Flink 提供了多种 Operator State 类型:

3.1 ListState

ListState 是 Operator State 中最常用的状态类型,用于存储元素列表。

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;/*** Operator ListState 使用示例* 模拟一个带有状态的 SourceFunction*/
public class StatefulSourceFunction implements SourceFunction<String>, CheckpointedFunction {private volatile boolean isRunning = true;private ListState<Long> checkpointedCount;private long count = 0L;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (isRunning && count < 1000) {synchronized (ctx.getCheckpointLock()) {ctx.collect("Element: " + count);count++;Thread.sleep(100); // 模拟处理时间}}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedCount.clear();checkpointedCount.add(count);}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("count-state",Long.class);checkpointedCount = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Long countValue : checkpointedCount.get()) {count = countValue;}}}
}

3.2 UnionListState

UnionListState 与 ListState 类似,但在恢复时会将所有并行实例的状态合并。

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.util.ArrayList;
import java.util.List;/*** UnionListState 使用示例* 维护所有并行实例处理的元素列表*/
public class UnionListStateSink extends RichSinkFunction<String> implements CheckpointedFunction {private List<String> processedElements;private ListState<String> checkpointedState;@Overridepublic void open(Configuration parameters) throws Exception {processedElements = new ArrayList<>();}@Overridepublic void invoke(String value, Context context) throws Exception {processedElements.add(value);System.out.println("Processed: " + value + " by subtask " + getRuntimeContext().getIndexOfThisSubtask());}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();for (String element : processedElements) {checkpointedState.add(element);}}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<String> descriptor = new ListStateDescriptor<>("union-list-state",String.class);// 获取 UnionListStatecheckpointedState = context.getOperatorStateStore().getUnionListState(descriptor);if (context.isRestored()) {processedElements.clear();for (String element : checkpointedState.get()) {processedElements.add(element);}}}
}

3.3 BroadcastState<K, V>

BroadcastState 用于广播状态模式,允许将状态从一个流广播到所有并行实例。

import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;/*** BroadcastState 使用示例* 实现规则广播和数据处理*/
public class BroadcastStateExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建规则流(需要广播)DataStream<Rule> rulesStream = env.fromElements(new Rule("rule1", "pattern1"),new Rule("rule2", "pattern2"));// 创建数据流DataStream<Event> dataStream = env.fromElements(new Event("event1", "pattern1"),new Event("event2", "pattern2"),new Event("event3", "pattern1"));// 创建广播状态描述符MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",String.class,Rule.class);// 将规则流转换为广播流BroadcastStream<Rule> broadcastRules = rulesStream.broadcast(ruleStateDescriptor);// 连接数据流和广播流进行处理DataStream<String> output = dataStream.connect(broadcastRules).process(new RuleBroadcastProcessFunction());output.print();env.execute("Broadcast State Example");}/*** 广播状态处理函数*/public static class RuleBroadcastProcessFunction extends BroadcastProcessFunction<Event, Rule, String> {private final MapStateDescriptor<String, Rule> ruleStateDescriptor;public RuleBroadcastProcessFunction() {this.ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",String.class,Rule.class);}// 处理广播的规则元素@Overridepublic void processBroadcastElement(Rule rule, Context ctx, Collector<String> out) throws Exception {BroadcastState<String, Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);// 将规则存储到广播状态中broadcastState.put(rule.getName(), rule);}// 处理数据流元素@Overridepublic void processElement(Event event, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从广播状态中获取规则ReadOnlyBroadcastState<String, Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);// 遍历所有规则并匹配事件for (Map.Entry<String, Rule> entry : broadcastState.immutableEntries()) {Rule rule = entry.getValue();if (rule.getPattern().equals(event.getPattern())) {out.collect("Event " + event.getName() + " matches rule " + rule.getName());}}}}// 规则类public static class Rule {private String name;private String pattern;public Rule() {}public Rule(String name, String pattern) {this.name = name;this.pattern = pattern;}public String getName() { return name; }public String getPattern() { return pattern; }}// 事件类public static class Event {private String name;private String pattern;public Event() {}public Event(String name, String pattern) {this.name = name;this.pattern = pattern;}public String getName() { return name; }public String getPattern() { return pattern; }}
}

4. 配置方法

4.1 状态描述符

Operator State 使用相应的描述符:

// ListStateDescriptor
ListStateDescriptor<String> listDescriptor = new ListStateDescriptor<>("operator-list-state",String.class
);// BroadcastStateDescriptor(通过 MapStateDescriptor 创建)
MapStateDescriptor<String, String> broadcastDescriptor = new MapStateDescriptor<>("broadcast-state",String.class,String.class
);

4.2 状态后端配置

Operator State 的存储依赖于状态后端:

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 配置 HashMap 状态后端(默认)
StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
env1.setStateBackend(new HashMapStateBackend());// 配置 RocksDB 状态后端(适用于大状态)
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
env2.setStateBackend(new EmbeddedRocksDBStateBackend());

5. CheckpointedFunction 接口

实现 Operator State 的主要方式是实现 CheckpointedFunction 接口:

import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;public class MyCheckpointedFunction implements CheckpointedFunction {private ListState<Integer> checkpointedState;private int count = 0;@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {// 快照状态时调用checkpointedState.clear();checkpointedState.add(count);}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {// 初始化状态时调用ListStateDescriptor<Integer> descriptor = new ListStateDescriptor<>("checkpointed-count",Integer.class);checkpointedState = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {// 从检查点恢复状态for (Integer countValue : checkpointedState.get()) {count = countValue;}}}
}

6. 完整使用示例

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;/*** Operator State 完整使用示例* 实现一个带有状态的并行数据源*/
public class OperatorStateExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000); // 每5秒进行一次检查点DataStream<Integer> numbers = env.addSource(new ParallelNumberSource(100));numbers.print();env.execute("Operator State Example");}/*** 并行数字源函数* 每个并行实例维护自己的状态*/public static class ParallelNumberSource extends RichParallelSourceFunction<Integer> implements CheckpointedFunction {private final int maxCount;private volatile boolean isRunning = true;private int currentCount = 0;private ListState<Integer> checkpointedCount;private Random random = new Random();public ParallelNumberSource(int maxCount) {this.maxCount = maxCount;}@Overridepublic void run(SourceContext<Integer> ctx) throws Exception {while (isRunning && currentCount < maxCount) {synchronized (ctx.getCheckpointLock()) {ctx.collect(currentCount);currentCount++;Thread.sleep(100 + random.nextInt(200)); // 随机延迟}}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedCount.clear();checkpointedCount.add(currentCount);}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Integer> descriptor = new ListStateDescriptor<>("number-source-count",Integer.class);checkpointedCount = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {// 从检查点恢复状态for (Integer count : checkpointedCount.get()) {currentCount = count;}System.out.println("Subtask " + getRuntimeContext().getIndexOfThisSubtask() + " restored count to " + currentCount);} else {System.out.println("Subtask " + getRuntimeContext().getIndexOfThisSubtask() + " starting with count " + currentCount);}}}
}

7. 最佳实践建议

  1. 合理选择状态类型

    • 一般状态管理使用 ListState
    • 需要合并状态时使用 UnionListState
    • 广播场景使用 BroadcastState
  2. 状态命名规范

    • 使用有意义的名称
    • 避免重复名称
    • 建议使用小写字母和连字符
  3. 状态管理

    • 在 snapshotState 中正确保存状态
    • 在 initializeState 中正确恢复状态
    • 处理并行度变化时的状态重分布
  4. 性能优化

    • 避免在状态中存储大量数据
    • 合理设置检查点间隔
    • 考虑使用 RocksDB 状态后端处理大状态
  5. 容错处理

    • 确保状态操作的幂等性
    • 处理状态恢复时的异常情况
    • 定期检查点以保证状态一致性
  6. 广播状态注意事项

    • 广播状态只允许在 processBroadcastElement 中修改
    • 在 processElement 中只能读取广播状态
    • 注意广播状态的内存使用

通过合理使用 Operator State,可以有效地在 Flink 应用程序中维护和处理与特定算子实例相关联的状态信息,实现复杂的状态管理和计算逻辑。

http://www.dtcms.com/a/540202.html

相关文章:

  • AI IN ALL王炸霸屏|战神数科与腾讯字节等深度践行AI
  • 【技术干货】在Stimulsoft中使用Google Sheets作为数据源创建报表与仪表盘
  • PCIe协议之唤醒篇之 WAKE# 信号
  • 搜狗做网站怎么样做静态网站有什么用
  • 潍坊网站建设公司哪家好大庆+网站建设
  • 推理成本吞噬AI未来,云计算如何平衡速度与成本的难题?
  • 基于VaR模型的ETF日内动态止损策略实现与理论验证
  • Linux云计算基础篇(28)-Samba文件服务
  • 学习经验分享【42】数学建模大赛参赛经历
  • 5.3 大数据方法论与实践指南-存储成本优化(省钱)
  • 运营商网站服务密码搜索引擎优化seo信息
  • 【案例实战】鸿蒙元服务开发实战:从云原生到移动端,包大小压缩 96% 启动提速 75% 的轻量化设计
  • 网站开发人员介绍网络营销研究现状文献综述
  • html5制作网站一个网站建立团队大概要多少钱
  • AceContainer类中用于初始化任务执行系统的核心方法--AceContainer::InitializeTask
  • Ubuntu部署 Kubernetes1.23
  • 悟空 AI CRM 的回访功能:深化客户关系,驱动业务增长
  • Qt的.pro文件中INSTALLS的作用和用法
  • 我的项目该选LoRa还是RF超短波全数字加密传输?
  • vue3 实现记事本手机版01
  • 03_全连接神经网络
  • 生成式AI重塑教学生态:理论基础、核心特征与伦理边界
  • html5手机网站调用微信分享wordpress缩略图加载慢
  • 动环监控:数据中心机房的“智慧守护者”
  • 5.6对象
  • 生命线与黑箱:LIME和Anchor作为两种事后可解释性分析
  • VMware安装配置CentOS 7
  • 链表算法题
  • 织梦制作wap网站高端网站开发建设
  • 网站建设公司销售经理职责全网最大的精品网站