Flink keyby使用随机数踩坑记
发现问题
在flink开发过程中为了避免出现空值,导致数据倾斜,随在keyby过程中,KeySelector<IN, KEY> 函数使用随机值返回结果,如下:

做savepoint报错,如下:

日志报错
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=151, endKeyGroup=153}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177)
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
... 3 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: Key group 0 is not in KeyGroupRange{startKeyGroup=151, endKeyGroup=153}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:153)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:115)
at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeKVStateData(FullSnapshotAsyncWriter.java:147)
at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeSnapshotToOutputStream(FullSnapshotAsyncWriter.java:107)
at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:77)
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:642)
... 6 common frames omitted
2025-10-30 11:17:00.732 WARN [jobmanager-io-thread:o-thread-1] org.apache.flink.runtime.checkpoint.CheckpointFailureManager - Failed to trigger or complete checkpoint 1352 for job 16079917617403645111514515160799. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed.
at org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1066)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: Asynchronous task checkpoint failed.
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:301)
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
... 3 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 1352 for operator LoadProgressCoLoadResultFlatMap (57/380)#0.
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:279)
... 4 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=151, endKeyGroup=153}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177)
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
... 3 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: Key group 0 is not in KeyGroupRange{startKeyGroup=151, endKeyGroup=153}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:153)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:115)
at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeKVStateData(FullSnapshotAsyncWriter.java:147)
at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeSnapshotToOutputStream(FullSnapshotAsyncWriter.java:107)
at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:77)
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:642)
... 6 common frames omitted
分析问题
- 
Flink开发过程中使用随机数返回随机键,这样就会导致keyby过程中的结果不确定性,可能会在keyby中生成相同的随机数,相同的随机键使得这种验证无法通过。因为会在不同的subtask中出现相同的key,这就打破了相同的key必须被放到同一个subtask的原则。 
- 
Flink的有状态计算依赖于相同键的数据持续发送到同一任务。随机数会使相同逻辑的数据因每次键值不同而分散到不同任务,导致状态无法正确维护和聚合。另外Savepoint包含完整的作业拓扑信息,在恢复时会验证状态与算子的对应关系。这种机制要求作为键的值必须具备确定性。 
- 
如果下游subtask不涉及状态保存,后面没有基于这个key的Keyed State,只是单纯打散用,keyby中可以使用随机数。 
