当前位置: 首页 > news >正文

Flink状态编程之算子状态(OperatorState)

在这里插## 标题入图片描述

> 						大家好,我是程序员小羊!

✨博客主页: https://blog.csdn.net/m0_63815035?type=blog

💗《博客内容》:大数据、Java、测试开发、Python、Android、Go、Node、Android前端小程序等相关领域知识
📢博客专栏: https://blog.csdn.net/m0_63815035/category_11954877.html
📢欢迎点赞 👍 收藏 ⭐留言 📝
📢本文为学习笔记资料,如有侵权,请联系我删除,疏漏之处还请指正🙉
📢大厦之成,非一木之材也;大海之阔,非一流之归也✨

在这里插入图片描述

Flink 算子状态前言

在 Flink 流处理中,状态(State) 是算子在处理数据过程中需要保存的中间结果或元数据(如累计计数、缓存的历史数据等)。算子状态(Operator State)是 Flink 状态的一种重要类型,它与算子实例绑定,仅由当前算子实例访问和管理。本文将详细讲解算子状态的核心概念、类型、使用场景及编程实践。
在这里插入图片描述

一、算子状态(Operator State)的核心概念

算子状态(Operator State)是与单个算子实例绑定的状态,即状态属于算子本身,而非输入数据中的特定键(Key)。所有流经该算子实例的数据都会共享这个状态。

状态(State)
状态 是指在数据流的计算过程中,Flink 为每个任务保留的中间信息或计算结果。这样的上下文可以实现例如累加器,窗口聚合等。
算子的分类:① 有状态的算子(计算不依赖于其他数据 如A转B 例如 map flatmp) ② 无状态的算子(当数据传入后先获取历史的状态信息,然后进行计算出新的值更新历史状态,最后把结果输出)
状态存在的意义:状态就是帮助 Flink 记住之前的数据,让它能够累加、统计、计算,而不仅仅处理每次到来的新数据。
例如用户的点击记录,状态就像一个笔记本,可以记下每个用户的点击次数。 计算过去一分钟内的销售额,状态就会记下每个订单的金额,并不断把它们加起来。
托管方式:Flink统一管理,故障恢复和重组等一系列问题都由Flink实现不需要我们手动管理,值桩头多样,内部支持的类型多样。(值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、聚合状态(Aggregatestate)等)我们需要做的只有调用接口即可。
当然如果你想要自己写也可以自定义,不过只默认存储byte类型。

  • 核心特点
  1. 状态与算子实例绑定,不依赖输入数据的 Key;

  2. 当算子并行度发生变化时,状态会在新的并行实例间重新分配(重分区);

  3. 适用于无 Key 分区的场景(如 Source 算子、Map 算子等)。

二、算子状态的类型

Flink 为算子状态提供了 4 种具体实现类型,分别对应不同的数据结构和分配策略,需根据业务场景选择:

状态类型数据结构适用场景重分区策略
ListState列表(List)存储多个元素,元素间无关联(如缓存的历史数据)均匀分配:将列表中的元素平均分配给新的并行实例(Round-Robin)
UnionListState列表(List)存储多个元素,重分区时需保留所有元素(如 Source 算子的偏移量)全量复制:每个新实例都获取完整的状态列表,由用户决定如何处理重复数据
BroadcastState键值对(Map)广播状态:将一个算子的状态广播到下游所有并行实例(如配置表、规则更新)全量复制:每个下游实例都持有完整的广播状态,确保所有实例状态一致
ReducingState单一值(通过 Reduce 聚合)需对数据持续聚合(如累计求和、求最大值)同 ListState,按聚合后的中间结果进行均匀分配

三、算子状态的使用场景

算子状态适用于无需按 Key 分区的场景,典型案例包括:

  1. Source 算子的偏移量管理:如 Kafka Source 需要保存每个分区的消费偏移量(offset),确保故障恢复后从断点继续消费(常用 UnionListState)。

  2. 无 Key 的聚合计算:如统计整个流的总条数(用 ReducingState 累计计数)。

  3. 广播配置 / 规则:如动态更新过滤规则,将规则广播到所有并行实例(用 BroadcastState)。

  4. 临时缓存数据:如缓存最近 10 条数据用于滑动窗口计算(用 ListState)。

四、算子状态的编程实践(以 ListState 为例)

使用算子状态需通过CheckpointedFunction接口实现,该接口定义了状态的初始化、快照保存和恢复逻辑。以下是具体步骤和示例:

1. 实现CheckpointedFunction接口

算子需实现CheckpointedFunction,重写两个核心方法:

  • initializeState(FunctionInitializationContext context):初始化状态(首次创建或故障恢复时调用)。

  • snapshotState(FunctionSnapshotContext context):生成状态快照(Checkpoint 时调用)。

2. 示例:用 ListState 缓存最近 3 条数据

需求:实现一个算子,缓存输入的最近 3 条数据,并在每条新数据到来时输出缓存的所有数据。

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.util.ArrayList;
import java.util.List;public class CacheSink implements SinkFunction<String>, CheckpointedFunction {// 定义ListState,用于缓存数据private transient ListState<String> cacheState;// 内存中的临时缓存(避免频繁操作状态)private List<String> cache = new ArrayList<>();// 缓存最大条数private static final int MAX_SIZE = 3;@Overridepublic void invoke(String value, Context context) throws Exception {// 新数据加入缓存cache.add(value);// 若缓存满3条,清空并输出if (cache.size() >= MAX_SIZE) {System.out.println("缓存数据:" + cache);cache.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {// Checkpoint时,将内存缓存写入ListStatecacheState.clear();for (String s : cache) {cacheState.add(s);}}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {// 初始化ListState(定义状态描述符)ListStateDescriptor<String> descriptor = new ListStateDescriptor<>("cache-state",  // 状态名称(唯一标识)String.class    // 状态数据类型);// 从上下文获取ListState(区分算子状态和键控状态)cacheState = context.getOperatorStateStore().getListState(descriptor);// 故障恢复时,从状态中恢复数据到内存缓存if (context.isRestored()) {  // 判断是否从故障中恢复for (String s : cacheState.get()) {cache.add(s);}}}
}
3. 代码解析
  • 状态定义:通过ListStateDescriptor描述状态的名称和数据类型,确保状态可序列化。

  • 状态初始化initializeState中从OperatorStateStore获取ListState,并在恢复时加载历史数据。

  • 状态快照snapshotState在 Checkpoint 时将内存中的缓存写入ListState,确保故障后可恢复。

  • 业务逻辑invoke方法处理输入数据,维护内存缓存,满 3 条时输出并清空。

五、算子状态的重分区策略

当算子并行度调整时,Flink 会对算子状态进行重分区,不同类型的状态采用不同策略:

  1. ListState:将状态列表中的元素均匀分配给新的并行实例(如原并行度 2,状态列表有 [1,2,3,4],新并行度 2 则分为 [1,3] 和 [2,4])。

  2. UnionListState:每个新实例都获取完整的状态列表(如原状态 [1,2,3],新并行度 2 则每个实例都持有 [1,2,3],需用户手动去重或处理)。

  3. BroadcastState:状态会自动广播到所有新实例,确保每个实例状态一致。

六、算子状态与键控状态(Keyed State)的区别

对比维度算子状态(Operator State)键控状态(Keyed State)
绑定对象与算子实例绑定,所有数据共享状态与输入数据的 Key 绑定,每个 Key 独立维护状态
适用算子无 KeyBy 的算子(如 Source、Map、Sink)有 KeyBy 的算子(如 KeyedStream 上的聚合、窗口)
重分区策略按实例均匀分配或全量复制按 Key 的哈希值重新分配(与 KeyBy 的分区一致)
典型场景偏移量管理、广播配置按 Key 统计(如每个用户的累计消费)

七、总结&综合练习

算子状态是 Flink 中与算子实例绑定的状态类型,适用于无 Key 分区的场景,通过CheckpointedFunction接口实现状态的管理。核心要点:

  1. 按数据结构和场景选择合适的状态类型(ListState、UnionListState 等);

  2. 重分区策略需根据状态类型和业务需求设计,避免数据丢失或重复;

  3. 与键控状态配合使用,可覆盖绝大多数流处理场景(如先 KeyBy 做分桶聚合,再用算子状态做全局统计)。

掌握算子状态的使用,能有效解决流处理中的状态管理问题,提升程序的容错性和灵活性。

package com.example.flink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.util.Collector;import java.time.Duration;public class FlinkPersistentWordCount {public static void main(String[] args) throws Exception {// 1. 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2. 设置状态后端和检查点存储位置为 HDFS(持久化到 node01:8020 中)env.setStateBackend(new org.apache.flink.runtime.state.filesystem.FsStateBackend("hdfs://node01:8020/flink/checkpoints"));env.enableCheckpointing(5000); // 设置检查点间隔为 5 秒env.getCheckpointConfig().setCheckpointTimeout(60000); // 设置检查点超时时间为 60 秒env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // 两次检查点之间最小间隔为 1 秒env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 3. 读取来自网络套接字的数据流DataStream<String> sourceStream = env.socketTextStream("localhost", 19523);// 4. 处理数据,将字符串分割为单词并标记初始计数DataStream<Tuple2<String, Integer>> wordStream = sourceStream.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {for (String word : line.split("\\s+")) {if (!word.isEmpty()) {out.collect(new Tuple2<>(word, 1));}}}).returns(Tuple2.class);// 5. 设置水位线,允许 2 秒延迟,处理乱序数据WatermarkStrategy<Tuple2<String, Integer>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((event, timestamp) -> System.currentTimeMillis());DataStream<Tuple2<String, Integer>> timestampedStream = wordStream.assignTimestampsAndWatermarks(watermarkStrategy);// 6. 应用滑动窗口(每 10 秒统计一次,每 5 秒滑动一次)并计算词频DataStream<String> slidingWindowResult = timestampedStream.keyBy(event -> event.f0).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动窗口:窗口大小为 10 秒,滑动间隔为 5 秒.trigger(CountTrigger.of(5)) // 当窗口中有 5 条数据时,提前触发计算.evictor(TimeEvictor.of(Time.seconds(2), true)) // 计算前剔除 2 秒之前的数据.sum(1).map(result -> "滑动窗口结果: 单词 = " + result.f0 + ",次数 = " + result.f1);// 7. 应用滚动窗口(每 15 秒统计一次)并计算词频DataStream<String> tumblingWindowResult = timestampedStream.keyBy(event -> event.f0).window(TumblingEventTimeWindows.of(Time.seconds(15))) // 滚动窗口:窗口大小为 15 秒.sum(1).map(result -> "滚动窗口结果: 单词 = " + result.f0 + ",次数 = " + result.f1);// 8. 合并结果并打印slidingWindowResult.union(tumblingWindowResult).print();// 9. 启动 Flink 程序env.execute("Flink Persistent Word Count with Checkpoints");}
}
今天这篇文章就到这里了,大厦之成,非一木之材也;大海之阔,非一流之归也。感谢大家观看本文

在这里插入图片描述

http://www.dtcms.com/a/517088.html

相关文章:

  • 哔哩哔哩修改版 8.64.0| 去除多项冗余内容和广告,精简流畅好用
  • 如何写网站建设报告3d动画制作流程
  • 网站建设合同表(书)注册万维网网站
  • 数据结构——顺序查找
  • 辽宁网站建设哪里好找深圳网站建设 湖南岚鸿
  • 厦门最早做网站的公司阿里云服务器一年多少钱
  • 电子商城网站开发软件网站维护兼职
  • 秦皇岛做网站的公司怎样有效的做网上宣传
  • 中企动力合作网站佰牛深圳网站建设
  • 义乌购商品详情接口的产业级实现:从批发属性解析到供应链协同的全链路技术方案
  • 接口加密了怎么测?
  • 校友网站建设的意义wordpress缩略图中大大
  • php在线做网站网站群建设系统
  • 【GitOps】Argo CD app of apps
  • 怎么建网站青州问枫深圳网站建设服务商
  • 深圳网站建设金瓷网络怎么制作视频短片加字幕带说话
  • GC 的判定方法
  • 网站建设 鼠标英文手机商城网站建设
  • 彩票网站怎么做收银网站统计代码放哪里
  • 惠州网站设计方案响应式网站可以做缩放图吗
  • 深度学习的一些基本概念
  • 移动端性能监控探索:iOS RUM SDK 技术架构与实践
  • seo外链网站大全汕头市网络优化推广平台
  • 番禺区网站建设flash网站管理系统
  • Python+Requests接口测试教程(1):Fiddler抓包工具
  • 手机端网站怎么做的网站建设海南
  • UART、RS232、RS485、I2C 的区别及工程应用场景
  • 网站多数关键词做六个网站静态页多少钱
  • 高密网站开发公司做网络营销
  • 数据科学每日总结--Day2--区块链与模型了解