当前位置: 首页 > news >正文

Flink Keyed State 详解之三

Apache Flink Raw State 与 Managed State 详解

1. 基本概念

在 Apache Flink 中,状态可以分为两种类型:Raw State(原始状态)和 Managed State(托管状态)。这两种状态类型在管理和使用方式上有显著的区别。

1.1 Managed State(托管状态)

Managed State 是由 Flink 框架管理的状态,具有以下特点:

  • 由 Flink 运行时自动管理序列化和反序列化
  • 提供高级别的抽象接口(如 ValueState、ListState 等)
  • 支持状态 Schema 演化
  • 自动处理状态的分区和重分布
  • 提供内置的状态后端支持

1.2 Raw State(原始状态)

Raw State 是由用户自己管理的状态,具有以下特点:

  • 用户需要自己处理序列化和反序列化
  • 提供低级别的字节数组接口
  • 需要用户自己处理状态的分区和重分布
  • 更加灵活但使用复杂

2. 适用场景

2.1 Managed State 适用场景

  1. 大多数常规状态管理场景

    • 用户自定义函数中的状态维护
    • 窗口操作中的状态管理
    • 聚合计算中的中间状态
  2. 需要 Schema 演化的场景

    • 状态数据结构需要随时间演进
    • 需要向后兼容的状态管理
  3. 复杂状态操作场景

    • 需要使用 ValueState、ListState 等高级状态接口
    • 需要框架自动处理状态分区

2.2 Raw State 适用场景

  1. 自定义状态后端实现

    • 开发新的状态后端
    • 需要特殊存储格式的场景
  2. 性能敏感场景

    • 需要极致性能优化
    • 需要特殊序列化方式
  3. 与外部系统集成

    • 需要与特定存储系统深度集成
    • 需要特殊的恢复机制

3. Managed State 详细说明

3.1 Keyed State 类型

Managed State 提供了多种 Keyed State 类型:

import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;/*** Managed State 各种类型使用示例*/
public class ManagedStateExample extends KeyedProcessFunction<String, String, String> {// ValueState - 存储单个值private ValueState<Integer> valueState;// ListState - 存储元素列表private ListState<String> listState;// MapState - 存储键值对映射private MapState<String, Integer> mapState;// ReducingState - 存储聚合值private ReducingState<Long> reducingState;// AggregatingState - 存储聚合值(支持不同输入输出类型)private AggregatingState<Double, Double> aggregatingState;@Overridepublic void open(Configuration parameters) {// 初始化各种状态// ValueStateValueStateDescriptor<Integer> valueDescriptor = new ValueStateDescriptor<>("value-state",Integer.class,0);valueState = getRuntimeContext().getState(valueDescriptor);// ListStateListStateDescriptor<String> listDescriptor = new ListStateDescriptor<>("list-state",String.class);listState = getRuntimeContext().getListState(listDescriptor);// MapStateMapStateDescriptor<String, Integer> mapDescriptor = new MapStateDescriptor<>("map-state",String.class,Integer.class);mapState = getRuntimeContext().getMapState(mapDescriptor);// ReducingStateReducingStateDescriptor<Long> reducingDescriptor = new ReducingStateDescriptor<>("reducing-state",Long::sum,  // 求和函数Long.class);reducingState = getRuntimeContext().getReducingState(reducingDescriptor);// AggregatingStateAggregatingStateDescriptor<Double, AverageAccumulator, Double> aggregatingDescriptor = new AggregatingStateDescriptor<>("aggregating-state",new AverageAggregateFunction(),new AverageAccumulator());aggregatingState = getRuntimeContext().getAggregatingState(aggregatingDescriptor);}@Overridepublic void processElement(String value, Context ctx, Collector<String> out) throws Exception {// 使用 ValueStateInteger currentValue = valueState.value();valueState.update(currentValue + 1);// 使用 ListStatelistState.add(value);// 使用 MapStatemapState.put(value, mapState.get(value) == null ? 1 : mapState.get(value) + 1);// 使用 ReducingStatereducingState.add(1L);// 使用 AggregatingStateaggregatingState.add(1.0);out.collect("Processed: " + value);}// 平均值聚合函数public static class AverageAggregateFunction implements AggregateFunction<Double, AverageAccumulator, Double> {@Overridepublic AverageAccumulator createAccumulator() {return new AverageAccumulator();}@Overridepublic AverageAccumulator add(Double value, AverageAccumulator accumulator) {accumulator.sum += value;accumulator.count++;return accumulator;}@Overridepublic Double getResult(AverageAccumulator accumulator) {return accumulator.count == 0 ? 0.0 : accumulator.sum / accumulator.count;}@Overridepublic AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {a.sum += b.sum;a.count += b.count;return a;}}// 平均值累加器public static class AverageAccumulator {public double sum = 0.0;public long count = 0;}
}

3.2 Operator State 类型

Managed State 也支持 Operator State:

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;/*** Managed Operator State 使用示例*/
public class ManagedOperatorStateExample extends RichParallelSourceFunction<String> implements CheckpointedFunction {private volatile boolean isRunning = true;private ListState<Integer> checkpointedCount;private int count = 0;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (isRunning) {synchronized (ctx.getCheckpointLock()) {ctx.collect("Element-" + count);count++;Thread.sleep(1000);}}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedCount.clear();checkpointedCount.add(count);}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Integer> descriptor = new ListStateDescriptor<>("managed-operator-state",Integer.class);checkpointedCount = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Integer countValue : checkpointedCount.get()) {count = countValue;}}}
}

4. Raw State 详细说明

4.1 Raw State 接口

Raw State 提供了较低级别的接口,需要用户自己处理序列化:

import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.FlinkException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;/*** Raw State 使用示例* 注意:这只是一个概念示例,实际使用中需要更复杂的实现*/
public class RawStateExample {/*** 自定义 Raw State 实现*/public static class CustomRawState implements InternalKvState<Object, Object, Object> {private Map<Object, Object> stateMap = new HashMap<>();@Overridepublic void setCurrentNamespace(Object namespace) {// 设置命名空间}@Overridepublic Object getCurrentNamespace() {return null;}@Overridepublic byte[] getSerializedValue(byte[] serializedKeyAndNamespace,KeyGroupRange keyGroupRange,ClassLoader userCodeClassLoader,StateSnapshotTransformer.StateSnapshotFilter stateSnapshotFilter) throws Exception {// 获取序列化的状态值return new byte[0];}@Overridepublic StateIncrementalVisitor<Object, Object, Object> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {return null;}// 其他必要的方法实现...}/*** 自定义状态序列化器*/public static class CustomStateSerializer implements IOReadableWritable {private Object state;public CustomStateSerializer(Object state) {this.state = state;}@Overridepublic void write(DataOutputStream out) throws IOException {// 自定义序列化逻辑// 这里只是一个示例,实际实现会更复杂if (state instanceof String) {out.writeUTF((String) state);} else if (state instanceof Integer) {out.writeInt((Integer) state);}}@Overridepublic void read(DataInputStream in) throws IOException {// 自定义反序列化逻辑}}
}

4.2 Raw State 使用场景示例

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import java.util.HashMap;
import java.util.Map;/*** Raw State 使用场景示例* 实现一个自定义的状态管理器*/
public class RawStateManager extends RichMapFunction<String, String> implements CheckpointedFunction {// 模拟 Raw State 存储private Map<String, String> rawState = new HashMap<>();private Map<String, String> checkpointedState = new HashMap<>();@Overridepublic void open(Configuration parameters) throws Exception {// 初始化 Raw StateSystem.out.println("Initializing Raw State");}@Overridepublic String map(String value) throws Exception {// 使用 Raw StateString key = "key-" + value;String currentValue = rawState.get(key);if (currentValue == null) {currentValue = "0";}int count = Integer.parseInt(currentValue) + 1;rawState.put(key, String.valueOf(count));return value + ": " + count;}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {// 快照 Raw StatecheckpointedState.clear();checkpointedState.putAll(rawState);System.out.println("Snapshot Raw State: " + checkpointedState.size() + " entries");}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {// 恢复 Raw Stateif (context.isRestored()) {rawState.clear();rawState.putAll(checkpointedState);System.out.println("Restored Raw State: " + rawState.size() + " entries");}}
}

5. 配置方法

5.1 状态后端配置

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** 状态后端配置示例*/
public class StateBackendConfiguration {public static void configureStateBackends() {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. MemoryStateBackend(默认,仅用于测试)// 适用于小状态和本地开发测试env.setStateBackend(new MemoryStateBackend());// 2. HashMapStateBackend(托管状态)// 适用于中小规模状态,存储在堆内存中env.setStateBackend(new HashMapStateBackend());// 3. EmbeddedRocksDBStateBackend(托管状态)// 适用于大规模状态,存储在 RocksDB 中EmbeddedRocksDBStateBackend rocksDBBackend = new EmbeddedRocksDBStateBackend();// 可以配置 RocksDB 参数// rocksDBBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);env.setStateBackend(rocksDBBackend);}
}

5.2 状态 TTL 配置

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;/*** 状态 TTL 配置示例*/
public class StateTTLConfiguration {public static ValueStateDescriptor<String> configureTTL() {// 配置状态 TTLStateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))  // 设置 TTL 为 1 小时.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)  // 更新类型.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)  // 状态可见性.cleanupFullSnapshot()  // 清理策略.build();// 应用 TTL 配置到状态描述符ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>("ttl-state",String.class,"default-value");descriptor.enableTimeToLive(ttlConfig);return descriptor;}
}

6. 完整使用示例

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** Managed State 与 Raw State 对比示例*/
public class StateComparisonExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000); // 10秒检查点// 创建输入数据流DataStream<String> input = env.fromElements("user1", "user2", "user1", "user3", "user2", "user1");// 使用 Managed State 进行用户访问计数DataStream<String> managedStateResult = input.keyBy(user -> user).map(new ManagedStateCounter());// 使用模拟的 Raw State 进行用户访问计数DataStream<String> rawStateResult = input.map(new RawStateCounter());System.out.println("=== Managed State Results ===");managedStateResult.print();System.out.println("=== Raw State Results ===");rawStateResult.print();env.execute("State Comparison Example");}/*** 使用 Managed State 的计数器*/public static class ManagedStateCounter extends RichMapFunction<String, String> {private ValueState<Integer> countState;@Overridepublic void open(Configuration parameters) {ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("managed-count-state",Integer.class,0);countState = getRuntimeContext().getState(descriptor);}@Overridepublic String map(String user) throws Exception {Integer currentCount = countState.value();currentCount++;countState.update(currentCount);return "Managed State - User " + user + ": " + currentCount;}}/*** 使用模拟 Raw State 的计数器*/public static class RawStateCounter extends RichMapFunction<String, String> {// 模拟 Raw State 存储private java.util.Map<String, Integer> rawState = new java.util.HashMap<>();@Overridepublic String map(String user) throws Exception {Integer currentCount = rawState.getOrDefault(user, 0);currentCount++;rawState.put(user, currentCount);return "Raw State - User " + user + ": " + currentCount;}}
}

7. 最佳实践建议

7.1 Managed State 最佳实践

  1. 优先使用 Managed State

    • 对于大多数应用场景,Managed State 是更好的选择
    • 提供更好的抽象和易用性
  2. 合理选择状态类型

    • 单个值使用 ValueState
    • 列表数据使用 ListState
    • 键值对数据使用 MapState
    • 需要聚合的数据使用 ReducingState 或 AggregatingState
  3. 状态命名规范

    • 使用有意义的名称
    • 避免重复名称
    • 建议使用小写字母和连字符
  4. 性能优化

    • 避免在状态中存储大量数据
    • 合理设置状态后端
    • 考虑使用 RocksDB 状态后端处理大状态

7.2 Raw State 最佳实践

  1. 谨慎使用 Raw State

    • 只在必要时使用 Raw State
    • 需要有丰富的 Flink 内部机制知识
  2. 正确的序列化处理

    • 实现高效的序列化和反序列化逻辑
    • 确保序列化格式的兼容性
  3. 状态管理

    • 正确处理状态的分区和重分布
    • 实现可靠的检查点和恢复机制
  4. 测试和验证

    • 充分测试状态的持久化和恢复
    • 验证在不同并行度下的状态重分布

7.3 通用最佳实践

  1. 状态后端选择

    • 小状态使用 HashMapStateBackend
    • 大状态使用 EmbeddedRocksDBStateBackend
    • 避免在生产环境中使用 MemoryStateBackend
  2. 检查点配置

    • 合理设置检查点间隔
    • 配置适当的检查点超时时间
    • 启用增量检查点以提高性能
  3. 状态监控

    • 监控状态大小和增长趋势
    • 设置适当的告警机制
    • 定期分析状态使用模式

通过合理选择和使用 Managed State 与 Raw State,可以有效地在 Flink 应用程序中实现高效、可靠的状态管理,满足不同场景下的需求。

http://www.dtcms.com/a/545517.html

相关文章:

  • LangChain4j学习3:模型参数
  • 驻马店做网站哪家好常州微网站建设
  • 深圳网站建设报价网站开发客户来源
  • 仓颉开发鸿蒙应用:深入理解组件生命周期的设计哲学与实践
  • Java 启动脚本-简介版
  • CFX Manager下载安装教程
  • 基于STM32HAL库判断传感器数据和系统定时器外部中断
  • 仓颉语言中的成员变量与方法:深入剖析与工程实践
  • JavaScript是如何执行的——V8引擎的执行
  • GEO:AI 时代流量新入口,四川嗨它科技如何树立行业标杆? (2025年10月最新版)
  • 【牛客刷题-剑指Offer】BM24 二叉树的中序遍历:左根右的奇妙之旅(递归+迭代双解法详解)
  • 宝山网站建设哪家好平面设计免费模板网站
  • 腾讯云 怎样建设网站免费自助建站工具
  • elasticsearch中文分词器插件下载
  • 【开题答辩全过程】以 叮叮网上图书销售管理系统为例,包含答辩的问题和答案
  • 2025—2028年教育部面47项白名单赛事汇总表(正式版)
  • IPython.display 显示网页
  • Excel怎么根据身份证号码来计算年龄?
  • 江阴网站网站建设免费的舆情网站
  • 服务间的通信之gRPC
  • php做电商网站开题报告wordpress输密码访问
  • Mybatis中# 和 $的区别
  • IDEA开发常用快捷键总结
  • SAP HANA数据库HA双机架构概念及运维
  • Blender 4K渲染背后的技术挑战
  • 镇江建设集团网站扁平化wordpress
  • 测试开发话题05---用例篇(2)
  • 做个网站多少费用asp学习网站
  • 基于电鱼 ARM 工控机的井下设备运行状态监测方案——实时采集电机、电泵、皮带机等关键设备运行数据
  • 【代码审计】Doufox v0.1.1 任意文件读取 分析