Flink状态编程之算子状态(OperatorState)


✨博客主页: https://blog.csdn.net/m0_63815035?type=blog
💗《博客内容》:大数据、Java、测试开发、Python、Android、Go、Node、Android前端小程序等相关领域知识
📢博客专栏: https://blog.csdn.net/m0_63815035/category_11954877.html
📢欢迎点赞 👍 收藏 ⭐留言 📝
📢本文为学习笔记资料,如有侵权,请联系我删除,疏漏之处还请指正🙉
📢大厦之成,非一木之材也;大海之阔,非一流之归也✨

Flink 算子状态前言
在 Flink 流处理中,状态(State) 是算子在处理数据过程中需要保存的中间结果或元数据(如累计计数、缓存的历史数据等)。算子状态(Operator State)是 Flink 状态的一种重要类型,它与算子实例绑定,仅由当前算子实例访问和管理。本文将详细讲解算子状态的核心概念、类型、使用场景及编程实践。

一、算子状态(Operator State)的核心概念
算子状态(Operator State)是与单个算子实例绑定的状态,即状态属于算子本身,而非输入数据中的特定键(Key)。所有流经该算子实例的数据都会共享这个状态。
状态(State)
状态 是指在数据流的计算过程中,Flink 为每个任务保留的中间信息或计算结果。这样的上下文可以实现例如累加器,窗口聚合等。
算子的分类:① 有状态的算子(计算不依赖于其他数据 如A转B 例如 map flatmp) ② 无状态的算子(当数据传入后先获取历史的状态信息,然后进行计算出新的值更新历史状态,最后把结果输出)
状态存在的意义:状态就是帮助 Flink 记住之前的数据,让它能够累加、统计、计算,而不仅仅处理每次到来的新数据。
例如用户的点击记录,状态就像一个笔记本,可以记下每个用户的点击次数。 计算过去一分钟内的销售额,状态就会记下每个订单的金额,并不断把它们加起来。
托管方式:Flink统一管理,故障恢复和重组等一系列问题都由Flink实现不需要我们手动管理,值桩头多样,内部支持的类型多样。(值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、聚合状态(Aggregatestate)等)我们需要做的只有调用接口即可。
当然如果你想要自己写也可以自定义,不过只默认存储byte类型。
- 核心特点:
-
状态与算子实例绑定,不依赖输入数据的 Key;
-
当算子并行度发生变化时,状态会在新的并行实例间重新分配(重分区);
-
适用于无 Key 分区的场景(如 Source 算子、Map 算子等)。
二、算子状态的类型
Flink 为算子状态提供了 4 种具体实现类型,分别对应不同的数据结构和分配策略,需根据业务场景选择:
| 状态类型 | 数据结构 | 适用场景 | 重分区策略 |
|---|---|---|---|
| ListState | 列表(List) | 存储多个元素,元素间无关联(如缓存的历史数据) | 均匀分配:将列表中的元素平均分配给新的并行实例(Round-Robin) |
| UnionListState | 列表(List) | 存储多个元素,重分区时需保留所有元素(如 Source 算子的偏移量) | 全量复制:每个新实例都获取完整的状态列表,由用户决定如何处理重复数据 |
| BroadcastState | 键值对(Map) | 广播状态:将一个算子的状态广播到下游所有并行实例(如配置表、规则更新) | 全量复制:每个下游实例都持有完整的广播状态,确保所有实例状态一致 |
| ReducingState | 单一值(通过 Reduce 聚合) | 需对数据持续聚合(如累计求和、求最大值) | 同 ListState,按聚合后的中间结果进行均匀分配 |
三、算子状态的使用场景
算子状态适用于无需按 Key 分区的场景,典型案例包括:
-
Source 算子的偏移量管理:如 Kafka Source 需要保存每个分区的消费偏移量(offset),确保故障恢复后从断点继续消费(常用 UnionListState)。
-
无 Key 的聚合计算:如统计整个流的总条数(用 ReducingState 累计计数)。
-
广播配置 / 规则:如动态更新过滤规则,将规则广播到所有并行实例(用 BroadcastState)。
-
临时缓存数据:如缓存最近 10 条数据用于滑动窗口计算(用 ListState)。
四、算子状态的编程实践(以 ListState 为例)
使用算子状态需通过CheckpointedFunction接口实现,该接口定义了状态的初始化、快照保存和恢复逻辑。以下是具体步骤和示例:
1. 实现CheckpointedFunction接口
算子需实现CheckpointedFunction,重写两个核心方法:
-
initializeState(FunctionInitializationContext context):初始化状态(首次创建或故障恢复时调用)。 -
snapshotState(FunctionSnapshotContext context):生成状态快照(Checkpoint 时调用)。
2. 示例:用 ListState 缓存最近 3 条数据
需求:实现一个算子,缓存输入的最近 3 条数据,并在每条新数据到来时输出缓存的所有数据。
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.util.ArrayList;
import java.util.List;public class CacheSink implements SinkFunction<String>, CheckpointedFunction {// 定义ListState,用于缓存数据private transient ListState<String> cacheState;// 内存中的临时缓存(避免频繁操作状态)private List<String> cache = new ArrayList<>();// 缓存最大条数private static final int MAX_SIZE = 3;@Overridepublic void invoke(String value, Context context) throws Exception {// 新数据加入缓存cache.add(value);// 若缓存满3条,清空并输出if (cache.size() >= MAX_SIZE) {System.out.println("缓存数据:" + cache);cache.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {// Checkpoint时,将内存缓存写入ListStatecacheState.clear();for (String s : cache) {cacheState.add(s);}}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {// 初始化ListState(定义状态描述符)ListStateDescriptor<String> descriptor = new ListStateDescriptor<>("cache-state", // 状态名称(唯一标识)String.class // 状态数据类型);// 从上下文获取ListState(区分算子状态和键控状态)cacheState = context.getOperatorStateStore().getListState(descriptor);// 故障恢复时,从状态中恢复数据到内存缓存if (context.isRestored()) { // 判断是否从故障中恢复for (String s : cacheState.get()) {cache.add(s);}}}
}
3. 代码解析
-
状态定义:通过
ListStateDescriptor描述状态的名称和数据类型,确保状态可序列化。 -
状态初始化:
initializeState中从OperatorStateStore获取ListState,并在恢复时加载历史数据。 -
状态快照:
snapshotState在 Checkpoint 时将内存中的缓存写入ListState,确保故障后可恢复。 -
业务逻辑:
invoke方法处理输入数据,维护内存缓存,满 3 条时输出并清空。
五、算子状态的重分区策略
当算子并行度调整时,Flink 会对算子状态进行重分区,不同类型的状态采用不同策略:
-
ListState:将状态列表中的元素均匀分配给新的并行实例(如原并行度 2,状态列表有 [1,2,3,4],新并行度 2 则分为 [1,3] 和 [2,4])。
-
UnionListState:每个新实例都获取完整的状态列表(如原状态 [1,2,3],新并行度 2 则每个实例都持有 [1,2,3],需用户手动去重或处理)。
-
BroadcastState:状态会自动广播到所有新实例,确保每个实例状态一致。
六、算子状态与键控状态(Keyed State)的区别
| 对比维度 | 算子状态(Operator State) | 键控状态(Keyed State) |
|---|---|---|
| 绑定对象 | 与算子实例绑定,所有数据共享状态 | 与输入数据的 Key 绑定,每个 Key 独立维护状态 |
| 适用算子 | 无 KeyBy 的算子(如 Source、Map、Sink) | 有 KeyBy 的算子(如 KeyedStream 上的聚合、窗口) |
| 重分区策略 | 按实例均匀分配或全量复制 | 按 Key 的哈希值重新分配(与 KeyBy 的分区一致) |
| 典型场景 | 偏移量管理、广播配置 | 按 Key 统计(如每个用户的累计消费) |
七、总结&综合练习
算子状态是 Flink 中与算子实例绑定的状态类型,适用于无 Key 分区的场景,通过CheckpointedFunction接口实现状态的管理。核心要点:
-
按数据结构和场景选择合适的状态类型(ListState、UnionListState 等);
-
重分区策略需根据状态类型和业务需求设计,避免数据丢失或重复;
-
与键控状态配合使用,可覆盖绝大多数流处理场景(如先 KeyBy 做分桶聚合,再用算子状态做全局统计)。
掌握算子状态的使用,能有效解决流处理中的状态管理问题,提升程序的容错性和灵活性。
package com.example.flink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.util.Collector;import java.time.Duration;public class FlinkPersistentWordCount {public static void main(String[] args) throws Exception {// 1. 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2. 设置状态后端和检查点存储位置为 HDFS(持久化到 node01:8020 中)env.setStateBackend(new org.apache.flink.runtime.state.filesystem.FsStateBackend("hdfs://node01:8020/flink/checkpoints"));env.enableCheckpointing(5000); // 设置检查点间隔为 5 秒env.getCheckpointConfig().setCheckpointTimeout(60000); // 设置检查点超时时间为 60 秒env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // 两次检查点之间最小间隔为 1 秒env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 3. 读取来自网络套接字的数据流DataStream<String> sourceStream = env.socketTextStream("localhost", 19523);// 4. 处理数据,将字符串分割为单词并标记初始计数DataStream<Tuple2<String, Integer>> wordStream = sourceStream.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {for (String word : line.split("\\s+")) {if (!word.isEmpty()) {out.collect(new Tuple2<>(word, 1));}}}).returns(Tuple2.class);// 5. 设置水位线,允许 2 秒延迟,处理乱序数据WatermarkStrategy<Tuple2<String, Integer>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((event, timestamp) -> System.currentTimeMillis());DataStream<Tuple2<String, Integer>> timestampedStream = wordStream.assignTimestampsAndWatermarks(watermarkStrategy);// 6. 应用滑动窗口(每 10 秒统计一次,每 5 秒滑动一次)并计算词频DataStream<String> slidingWindowResult = timestampedStream.keyBy(event -> event.f0).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动窗口:窗口大小为 10 秒,滑动间隔为 5 秒.trigger(CountTrigger.of(5)) // 当窗口中有 5 条数据时,提前触发计算.evictor(TimeEvictor.of(Time.seconds(2), true)) // 计算前剔除 2 秒之前的数据.sum(1).map(result -> "滑动窗口结果: 单词 = " + result.f0 + ",次数 = " + result.f1);// 7. 应用滚动窗口(每 15 秒统计一次)并计算词频DataStream<String> tumblingWindowResult = timestampedStream.keyBy(event -> event.f0).window(TumblingEventTimeWindows.of(Time.seconds(15))) // 滚动窗口:窗口大小为 15 秒.sum(1).map(result -> "滚动窗口结果: 单词 = " + result.f0 + ",次数 = " + result.f1);// 8. 合并结果并打印slidingWindowResult.union(tumblingWindowResult).print();// 9. 启动 Flink 程序env.execute("Flink Persistent Word Count with Checkpoints");}
}
今天这篇文章就到这里了,大厦之成,非一木之材也;大海之阔,非一流之归也。感谢大家观看本文

