当前位置: 首页 > news >正文

Flink keyby使用随机数踩坑记

发现问题

在flink开发过程中为了避免出现空值,导致数据倾斜,随在keyby过程中,KeySelector<IN, KEY> 函数使用随机值返回结果,如下:
在这里插入图片描述

做savepoint报错,如下:
在这里插入图片描述
日志报错


Caused by: org.apache.flink.util.SerializedThrowable: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=151, endKeyGroup=153}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177)
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
... 3 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: Key group 0 is not in KeyGroupRange{startKeyGroup=151, endKeyGroup=153}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:153)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:115)
at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeKVStateData(FullSnapshotAsyncWriter.java:147)
at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeSnapshotToOutputStream(FullSnapshotAsyncWriter.java:107)
at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:77)
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:642)
... 6 common frames omitted
2025-10-30 11:17:00.732 WARN [jobmanager-io-thread:o-thread-1] org.apache.flink.runtime.checkpoint.CheckpointFailureManager - Failed to trigger or complete checkpoint 1352 for job 16079917617403645111514515160799. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed.
at org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1066)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: Asynchronous task checkpoint failed.
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:301)
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
... 3 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint 1352 for operator LoadProgressCoLoadResultFlatMap (57/380)#0.
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:279)
... 4 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=151, endKeyGroup=153}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:177)
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
... 3 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: Key group 0 is not in KeyGroupRange{startKeyGroup=151, endKeyGroup=153}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:153)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:115)
at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeKVStateData(FullSnapshotAsyncWriter.java:147)
at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.writeSnapshotToOutputStream(FullSnapshotAsyncWriter.java:107)
at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:77)
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:642)
... 6 common frames omitted

分析问题

  1. Flink开发过程中使用随机数返回随机键,这样就会导致keyby过程中的结果不确定性,可能会在keyby中生成相同的随机数,相同的随机键使得这种验证无法通过。因为会在不同的subtask中出现相同的key,这就打破了相同的key必须被放到同一个subtask的原则。

  2. Flink的有状态计算依赖于相同键的数据持续发送到同一任务。随机数会使相同逻辑的数据因每次键值不同而分散到不同任务,导致状态无法正确维护和聚合。另外Savepoint包含完整的作业拓扑信息,在恢复时会验证状态与算子的对应关系。这种机制要求作为键的值必须具备‌确定性

  3. 如果下游subtask不涉及状态保存,后面没有基于这个key的Keyed State,只是单纯打散用,keyby中可以使用随机数。

http://www.dtcms.com/a/549493.html

相关文章:

  • 行业网站建设方案室内设计师联盟首页
  • JAVA中的堆和栈
  • A2A协议的多智能体投顾引擎架构, 智能体生成年化418%,回撤11%,夏普比5.19的规则策略,附python代码
  • 建设黑彩网站需要什么药理学网站建设方案
  • Linux本机ping虚机ip Network unreachable
  • 个体工商户可以备案哪些网站做一个同城便民信息网站怎么做
  • 队列——速成
  • 南京建设网站的公司网易企业邮箱登录入口手机
  • R语言基于Rselenium模拟浏览器抓取DatabaseCommons数据-连载NO.04
  • 对于一些MP4文件的压缩
  • 基于Selenium和AI的图像处理
  • Selenium Wire 网络拦截实现方案
  • 无锡手机网站制作费用网页设计与网站建设在线考试
  • 【Qt】【1. 版本特性介绍】
  • pyside6的历史发展、Qt 介绍、PyQt 和 pyside6对比
  • 做没用的网站建立个网站
  • numpy的random函数总结
  • ⸢ 拾-Ⅱ⸥⤳ 威胁感知与响应建设方案:威胁运营威胁响应
  • Auto Dark Mode,一款Windows 自动深浅色切换工具
  • 惠民县建设网站信宜网站设计公司
  • 论文对应项目复现教程
  • 第165期 无需提示词的微调:Bonepoke 与系统姿态的隐藏调控旋钮
  • 口腔种植中叠腮技术的适应证与考量
  • 原码、反码、补码与正数、负数的运算关系介绍
  • ShimetaPi丨事件相机新版SDK发布:支持Python调用,可降低使用门槛
  • 计算机图形学:【Games101】学习笔记03——光栅化(三角形的离散化、深度测试与抗锯齿)
  • 如何掌握【Java】 IO/NIO设计模式?工厂/适配器/装饰器/观察者模式全解析
  • C# 中的空条件运算符(?.)与空合并运算符(??)详解
  • 福建人力资源建设网站房地产销售技巧
  • 佳木斯 网站建设网页版qq登录入口版qq账号登录界面