Flink Keyed State 详解之三
Apache Flink Raw State 与 Managed State 详解
1. 基本概念
在 Apache Flink 中,状态可以分为两种类型:Raw State(原始状态)和 Managed State(托管状态)。这两种状态类型在管理和使用方式上有显著的区别。
1.1 Managed State(托管状态)
Managed State 是由 Flink 框架管理的状态,具有以下特点:
- 由 Flink 运行时自动管理序列化和反序列化
- 提供高级别的抽象接口(如 ValueState、ListState 等)
- 支持状态 Schema 演化
- 自动处理状态的分区和重分布
- 提供内置的状态后端支持
1.2 Raw State(原始状态)
Raw State 是由用户自己管理的状态,具有以下特点:
- 用户需要自己处理序列化和反序列化
- 提供低级别的字节数组接口
- 需要用户自己处理状态的分区和重分布
- 更加灵活但使用复杂
2. 适用场景
2.1 Managed State 适用场景
-
大多数常规状态管理场景:
- 用户自定义函数中的状态维护
- 窗口操作中的状态管理
- 聚合计算中的中间状态
-
需要 Schema 演化的场景:
- 状态数据结构需要随时间演进
- 需要向后兼容的状态管理
-
复杂状态操作场景:
- 需要使用 ValueState、ListState 等高级状态接口
- 需要框架自动处理状态分区
2.2 Raw State 适用场景
-
自定义状态后端实现:
- 开发新的状态后端
- 需要特殊存储格式的场景
-
性能敏感场景:
- 需要极致性能优化
- 需要特殊序列化方式
-
与外部系统集成:
- 需要与特定存储系统深度集成
- 需要特殊的恢复机制
3. Managed State 详细说明
3.1 Keyed State 类型
Managed State 提供了多种 Keyed State 类型:
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;/*** Managed State 各种类型使用示例*/
public class ManagedStateExample extends KeyedProcessFunction<String, String, String> {// ValueState - 存储单个值private ValueState<Integer> valueState;// ListState - 存储元素列表private ListState<String> listState;// MapState - 存储键值对映射private MapState<String, Integer> mapState;// ReducingState - 存储聚合值private ReducingState<Long> reducingState;// AggregatingState - 存储聚合值(支持不同输入输出类型)private AggregatingState<Double, Double> aggregatingState;@Overridepublic void open(Configuration parameters) {// 初始化各种状态// ValueStateValueStateDescriptor<Integer> valueDescriptor = new ValueStateDescriptor<>("value-state",Integer.class,0);valueState = getRuntimeContext().getState(valueDescriptor);// ListStateListStateDescriptor<String> listDescriptor = new ListStateDescriptor<>("list-state",String.class);listState = getRuntimeContext().getListState(listDescriptor);// MapStateMapStateDescriptor<String, Integer> mapDescriptor = new MapStateDescriptor<>("map-state",String.class,Integer.class);mapState = getRuntimeContext().getMapState(mapDescriptor);// ReducingStateReducingStateDescriptor<Long> reducingDescriptor = new ReducingStateDescriptor<>("reducing-state",Long::sum, // 求和函数Long.class);reducingState = getRuntimeContext().getReducingState(reducingDescriptor);// AggregatingStateAggregatingStateDescriptor<Double, AverageAccumulator, Double> aggregatingDescriptor = new AggregatingStateDescriptor<>("aggregating-state",new AverageAggregateFunction(),new AverageAccumulator());aggregatingState = getRuntimeContext().getAggregatingState(aggregatingDescriptor);}@Overridepublic void processElement(String value, Context ctx, Collector<String> out) throws Exception {// 使用 ValueStateInteger currentValue = valueState.value();valueState.update(currentValue + 1);// 使用 ListStatelistState.add(value);// 使用 MapStatemapState.put(value, mapState.get(value) == null ? 1 : mapState.get(value) + 1);// 使用 ReducingStatereducingState.add(1L);// 使用 AggregatingStateaggregatingState.add(1.0);out.collect("Processed: " + value);}// 平均值聚合函数public static class AverageAggregateFunction implements AggregateFunction<Double, AverageAccumulator, Double> {@Overridepublic AverageAccumulator createAccumulator() {return new AverageAccumulator();}@Overridepublic AverageAccumulator add(Double value, AverageAccumulator accumulator) {accumulator.sum += value;accumulator.count++;return accumulator;}@Overridepublic Double getResult(AverageAccumulator accumulator) {return accumulator.count == 0 ? 0.0 : accumulator.sum / accumulator.count;}@Overridepublic AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {a.sum += b.sum;a.count += b.count;return a;}}// 平均值累加器public static class AverageAccumulator {public double sum = 0.0;public long count = 0;}
}
3.2 Operator State 类型
Managed State 也支持 Operator State:
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;/*** Managed Operator State 使用示例*/
public class ManagedOperatorStateExample extends RichParallelSourceFunction<String> implements CheckpointedFunction {private volatile boolean isRunning = true;private ListState<Integer> checkpointedCount;private int count = 0;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (isRunning) {synchronized (ctx.getCheckpointLock()) {ctx.collect("Element-" + count);count++;Thread.sleep(1000);}}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedCount.clear();checkpointedCount.add(count);}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Integer> descriptor = new ListStateDescriptor<>("managed-operator-state",Integer.class);checkpointedCount = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Integer countValue : checkpointedCount.get()) {count = countValue;}}}
}
4. Raw State 详细说明
4.1 Raw State 接口
Raw State 提供了较低级别的接口,需要用户自己处理序列化:
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.FlinkException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;/*** Raw State 使用示例* 注意:这只是一个概念示例,实际使用中需要更复杂的实现*/
public class RawStateExample {/*** 自定义 Raw State 实现*/public static class CustomRawState implements InternalKvState<Object, Object, Object> {private Map<Object, Object> stateMap = new HashMap<>();@Overridepublic void setCurrentNamespace(Object namespace) {// 设置命名空间}@Overridepublic Object getCurrentNamespace() {return null;}@Overridepublic byte[] getSerializedValue(byte[] serializedKeyAndNamespace,KeyGroupRange keyGroupRange,ClassLoader userCodeClassLoader,StateSnapshotTransformer.StateSnapshotFilter stateSnapshotFilter) throws Exception {// 获取序列化的状态值return new byte[0];}@Overridepublic StateIncrementalVisitor<Object, Object, Object> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {return null;}// 其他必要的方法实现...}/*** 自定义状态序列化器*/public static class CustomStateSerializer implements IOReadableWritable {private Object state;public CustomStateSerializer(Object state) {this.state = state;}@Overridepublic void write(DataOutputStream out) throws IOException {// 自定义序列化逻辑// 这里只是一个示例,实际实现会更复杂if (state instanceof String) {out.writeUTF((String) state);} else if (state instanceof Integer) {out.writeInt((Integer) state);}}@Overridepublic void read(DataInputStream in) throws IOException {// 自定义反序列化逻辑}}
}
4.2 Raw State 使用场景示例
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import java.util.HashMap;
import java.util.Map;/*** Raw State 使用场景示例* 实现一个自定义的状态管理器*/
public class RawStateManager extends RichMapFunction<String, String> implements CheckpointedFunction {// 模拟 Raw State 存储private Map<String, String> rawState = new HashMap<>();private Map<String, String> checkpointedState = new HashMap<>();@Overridepublic void open(Configuration parameters) throws Exception {// 初始化 Raw StateSystem.out.println("Initializing Raw State");}@Overridepublic String map(String value) throws Exception {// 使用 Raw StateString key = "key-" + value;String currentValue = rawState.get(key);if (currentValue == null) {currentValue = "0";}int count = Integer.parseInt(currentValue) + 1;rawState.put(key, String.valueOf(count));return value + ": " + count;}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {// 快照 Raw StatecheckpointedState.clear();checkpointedState.putAll(rawState);System.out.println("Snapshot Raw State: " + checkpointedState.size() + " entries");}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {// 恢复 Raw Stateif (context.isRestored()) {rawState.clear();rawState.putAll(checkpointedState);System.out.println("Restored Raw State: " + rawState.size() + " entries");}}
}
5. 配置方法
5.1 状态后端配置
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** 状态后端配置示例*/
public class StateBackendConfiguration {public static void configureStateBackends() {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. MemoryStateBackend(默认,仅用于测试)// 适用于小状态和本地开发测试env.setStateBackend(new MemoryStateBackend());// 2. HashMapStateBackend(托管状态)// 适用于中小规模状态,存储在堆内存中env.setStateBackend(new HashMapStateBackend());// 3. EmbeddedRocksDBStateBackend(托管状态)// 适用于大规模状态,存储在 RocksDB 中EmbeddedRocksDBStateBackend rocksDBBackend = new EmbeddedRocksDBStateBackend();// 可以配置 RocksDB 参数// rocksDBBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);env.setStateBackend(rocksDBBackend);}
}
5.2 状态 TTL 配置
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;/*** 状态 TTL 配置示例*/
public class StateTTLConfiguration {public static ValueStateDescriptor<String> configureTTL() {// 配置状态 TTLStateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1)) // 设置 TTL 为 1 小时.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 更新类型.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 状态可见性.cleanupFullSnapshot() // 清理策略.build();// 应用 TTL 配置到状态描述符ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>("ttl-state",String.class,"default-value");descriptor.enableTimeToLive(ttlConfig);return descriptor;}
}
6. 完整使用示例
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** Managed State 与 Raw State 对比示例*/
public class StateComparisonExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000); // 10秒检查点// 创建输入数据流DataStream<String> input = env.fromElements("user1", "user2", "user1", "user3", "user2", "user1");// 使用 Managed State 进行用户访问计数DataStream<String> managedStateResult = input.keyBy(user -> user).map(new ManagedStateCounter());// 使用模拟的 Raw State 进行用户访问计数DataStream<String> rawStateResult = input.map(new RawStateCounter());System.out.println("=== Managed State Results ===");managedStateResult.print();System.out.println("=== Raw State Results ===");rawStateResult.print();env.execute("State Comparison Example");}/*** 使用 Managed State 的计数器*/public static class ManagedStateCounter extends RichMapFunction<String, String> {private ValueState<Integer> countState;@Overridepublic void open(Configuration parameters) {ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("managed-count-state",Integer.class,0);countState = getRuntimeContext().getState(descriptor);}@Overridepublic String map(String user) throws Exception {Integer currentCount = countState.value();currentCount++;countState.update(currentCount);return "Managed State - User " + user + ": " + currentCount;}}/*** 使用模拟 Raw State 的计数器*/public static class RawStateCounter extends RichMapFunction<String, String> {// 模拟 Raw State 存储private java.util.Map<String, Integer> rawState = new java.util.HashMap<>();@Overridepublic String map(String user) throws Exception {Integer currentCount = rawState.getOrDefault(user, 0);currentCount++;rawState.put(user, currentCount);return "Raw State - User " + user + ": " + currentCount;}}
}
7. 最佳实践建议
7.1 Managed State 最佳实践
-
优先使用 Managed State:
- 对于大多数应用场景,Managed State 是更好的选择
- 提供更好的抽象和易用性
-
合理选择状态类型:
- 单个值使用 ValueState
- 列表数据使用 ListState
- 键值对数据使用 MapState
- 需要聚合的数据使用 ReducingState 或 AggregatingState
-
状态命名规范:
- 使用有意义的名称
- 避免重复名称
- 建议使用小写字母和连字符
-
性能优化:
- 避免在状态中存储大量数据
- 合理设置状态后端
- 考虑使用 RocksDB 状态后端处理大状态
7.2 Raw State 最佳实践
-
谨慎使用 Raw State:
- 只在必要时使用 Raw State
- 需要有丰富的 Flink 内部机制知识
-
正确的序列化处理:
- 实现高效的序列化和反序列化逻辑
- 确保序列化格式的兼容性
-
状态管理:
- 正确处理状态的分区和重分布
- 实现可靠的检查点和恢复机制
-
测试和验证:
- 充分测试状态的持久化和恢复
- 验证在不同并行度下的状态重分布
7.3 通用最佳实践
-
状态后端选择:
- 小状态使用 HashMapStateBackend
- 大状态使用 EmbeddedRocksDBStateBackend
- 避免在生产环境中使用 MemoryStateBackend
-
检查点配置:
- 合理设置检查点间隔
- 配置适当的检查点超时时间
- 启用增量检查点以提高性能
-
状态监控:
- 监控状态大小和增长趋势
- 设置适当的告警机制
- 定期分析状态使用模式
通过合理选择和使用 Managed State 与 Raw State,可以有效地在 Flink 应用程序中实现高效、可靠的状态管理,满足不同场景下的需求。
