当前位置: 首页 > news >正文

Flink Slot 不足导致任务Pending修复方案

当前有3个虚拟机节点,每个节点配置的slot节点数量是4,${FLINK_HOME}/conf/flink-conf.yaml 关于slot的配置如下:

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4state.backend: rocksdb
# checkpoint的时间根据集群性能和数据量确定
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://cdh01:8020/chkpoints
state.backend.incremental: true

其中,${HADOOP_HOME}/etc/hadoop/yarn-site.xml中关于yarn单个容器允许分配的最大内存设置如下:

	<!-- yarn单个容器允许分配的最大最小内存 --><property><name>yarn.scheduler.minimum-allocation-mb</name><value>512</value></property><property><name>yarn.scheduler.maximum-allocation-mb</name><value>4096</value></property><!-- yarn容器允许管理的物理内存大小 --><property><name>yarn.nodemanager.resource.memory-mb</name><value>4096</value></property>

启动yarn-session以Flink session cluster供FLINK SQL以共享方式运行:
$FLINK_HOME/bin/yarn-session.sh -d

启动FLINK SQL Client:
$FLINK_HOME/bin/sql-client.sh embedded -i $FLINK_HOME/conf/sql_client_init.sql -s yarn-session

如图,提交23个 FlinkSQL任务(flink-cdc -> hudi-ods),由于slot数量不够,将导致12个任务之外的剩余任务处于pending的状态(黑紫任务)。
在这里插入图片描述

查看某个pending任务的log,看到报错如下:

2025-xx-xx 21:45:56
java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeoutat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)at org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:222)at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:164)at org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at akka.actor.Actor.aroundReceive(Actor.scala:517)at akka.actor.Actor.aroundReceive$(Actor.scala:515)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeoutat org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)... 26 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms... 27 more

从报错来看,核心问题是 Flink集群总Slot(12个)不足以支撑23个任务的并发需求,导致11个任务因无法分配Slot而处于Pending状态,最终触发超时(300秒后仍未获取到资源)。解决思路围绕“增加可用Slot”和“优化资源分配”展开,具体方案如下:

一、核心原因拆解

Flink的Slot是任务执行的最小资源单元(每个Slot对应一个TaskManager的一个CPU核心,默认1个Slot/核心),当前12个Slot最多同时运行12个任务。23个任务并发时,超出的11个任务会进入Pending队列,若超过slot.request.timeout(默认300秒)仍未获取到Slot,就会报超时错误。

二、解决方案(按优先级排序)

方案1:增加TaskManager节点或每个节点的Slot数量(最直接)

通过扩展Flink集群的Slot总数,满足23个任务的并发需求(需至少23个Slot)。

1.1 增加每个TaskManager的Slot数量(无需新增虚拟机)

若现有3台虚拟机的CPU资源充足(如每台虚拟机有10核CPU,当前仅用4核),可通过修改配置增加单节点的Slot数:

  1. 停止Flink集群:
    ./bin/stop-cluster.sh
    
  2. 修改flink-conf.yaml配置文件:
    # 每个TaskManager的Slot数量(默认1,根据CPU核数调整,建议不超过物理核数)
    taskmanager.numberOfTaskSlots: 10  # 3台虚拟机 × 10个Slot = 30个Slot,满足23个任务
    

    注意:numberOfTaskSlots不能超过虚拟机的物理CPU核心数(如虚拟机是10核,最大设为10,留1核给系统)。

  3. 重启Flink集群:
    ./bin/start-cluster.sh
    
  4. 验证Slot总数:
    • 访问Flink Web UI(如http://hadoop103:8081),在左侧“Cluster Overview”中查看“Total Slots”是否为30(3×10)。
1.2 新增TaskManager节点(现有节点资源不足时)

若3台虚拟机的CPU已达上限,需新增1台虚拟机作为TaskManager节点:

  1. 在新虚拟机上安装Flink(与现有集群版本一致),并配置flink-conf.yaml
    # 指向现有JobManager的地址(如hadoop103)
    jobmanager.rpc.address: hadoop103
    # 新节点的Slot数量(如8个,3台旧节点×4 + 1台新节点×8 = 20,仍不够可设为11)
    taskmanager.numberOfTaskSlots: 11
    
  2. 在新节点上启动TaskManager:
    ./bin/taskmanager.sh start
    
  3. 验证:Web UI中“Total Slots”是否达到23+。
方案2:优化任务资源分配(避免资源浪费)

若无法增加Slot总数,可通过优化任务的资源配置,减少单个任务的Slot占用,提升Slot利用率。

2.1 降低单个任务的并行度(Parallelism)

Flink任务的并行度默认等于Slot数量,若部分任务无需高并行(如简单查询),可手动降低并行度,减少Slot占用:

-- 执行任务时指定并行度(如设为1,仅占用1个Slot)
Flink SQL> set execution.parallelism = 1;
-- 再执行任务(如查询)
Flink SQL> select * from order_info_cdc;

说明:默认情况下,每个任务的并行度 = parallelism.defaultflink-conf.yaml配置,默认1),若之前设为更高值(如2),会导致单个任务占用多个Slot。

2.2 合并小任务(减少任务总数)

若20个任务中有多个相似的轻量任务(如多个简单查询),可合并为1个任务,减少Slot需求:

  • 例:将多个select * from table where ...合并为union all查询:
    select * from table1 where type=1
    union all
    select * from table1 where type=2;
    
    合并后1个任务仅占用1个Slot,替代原来的2个任务。
方案3:配置Slot共享组(Share Group)

Flink支持“Slot共享组”,允许不同任务共享同一个Slot(仅适用于非密集型任务),提升Slot利用率。

  1. 在创建表或执行任务时,指定共享组名称:
    -- 执行任务时指定共享组(如所有查询共享“query_group”)
    Flink SQL> set execution.share-group = query_group;
    -- 执行多个任务,会共享同一个Slot(需确保任务非CPU密集型)
    Flink SQL> select * from table1;
    Flink SQL> select * from table2;
    
  2. 注意:共享组仅适合轻量任务(如简单查询),若任务是CPU/内存密集型(如大表聚合),共享会导致性能下降。
方案4:延长Slot请求超时时间(临时规避)

若任务可接受等待(如非实时任务),可延长slot.request.timeout,给Pending任务更多时间等待释放的Slot:

  1. 修改flink-conf.yaml
    # Slot请求超时时间(单位:毫秒,设为10分钟=600000ms)
    slot.request.timeout: 600000
    
  2. 重启Flink集群生效。

注意:这只是临时方案,若Slot总数始终不足,任务仍会最终失败,需配合方案1/2彻底解决。

三、验证与监控

  1. 查看任务状态:
    • 访问Flink Web UI,在“Jobs”页面查看Pending任务是否已分配Slot(状态从“Pending”变为“Running”)。
  2. 监控Slot使用:
    • 在Web UI“Cluster Overview”中查看“Used Slots”和“Available Slots”,确保Available Slots ≥ 0(无任务Pending)。

四、生产环境建议

  • 优先方案1:生产环境中,核心任务需保证资源充足,建议Slot总数预留20%-30%冗余(如20个任务,配置28个Slot),避免突发任务导致Pending。
  • 规范并行度配置:根据任务复杂度设置并行度(如大表处理设为4-8,简单查询设为1-2),避免盲目使用高并行度浪费资源。
  • 使用YARN/K8s动态资源:若Flink部署在YARN/K8s上,可开启动态资源分配(yarn.containers.dynamic/K8s的Pod动态扩缩容),自动根据任务需求增减Slot,无需手动配置。

通过以上方案,可彻底解决Slot不足导致的任务Pending问题,确保所有任务正常运行。

http://www.dtcms.com/a/347951.html

相关文章:

  • 互联网大厂Java面试实录:从Spring到微服务的全面考察
  • 【软件安全】ARM64、x86、32 位与 64 位架构的区别、定义、应用背景
  • 个人搭建小网站教程(云服务器Ubuntu版本)
  • 【数据结构】二叉树的顺序存储、堆的实现及其应用:堆排序与Top-K问题
  • 以国产IoTDB为代表的主流时序数据库架构与性能深度选型评测
  • kanass V1.1.4版本发布,支持Mysql数据库、ubuntu安装与Mantis数据导入
  • Thonny+MicroPython搭建ESP32芯片开发环境
  • 代码性能测试——benchmark库
  • Elasticsearch Ruby 客户端故障排查实战指南
  • AI与SEO关键词协同优化
  • DBeaver连接SQL Server集成认证问题解决方案
  • xxl-job 启动后导致pod内存使用率持续增加
  • 从 Unity UGUI 到 Unreal UMG 的交互与高效实践:UI 事件、坐标系适配与性能优化
  • MATLAB 与 Simulink 联合仿真:控制系统建模与动态性能优化
  • C#_gRPC
  • RabbitMQ--消费端异常处理与 Spring Retry
  • 阿里云拉取dockers镜像
  • 在JavaScript中,比较两个数组是否有相同元素(交集)的常用方法
  • 今日科技热点 | AI加速创新,5G与量子计算引领未来
  • wpf之DockPanel
  • 3D打印机管理后台与RabbitMQ集成的业务场景
  • RabbitMQ面试精讲 Day 29:版本升级与平滑迁移
  • 【图像处理基石】基于 Python 的图像行人删除技术:实现街景无干扰化处理
  • 性能比拼: .NET (C#) vs. Fiber (Go)
  • Kaggle项目:一次 Uber 出行数据分析的完整思路
  • 高空作业安全监控难题突破!陌讯自适应识别算法实现安全带穿戴检测准确率↑93%
  • 深度学习——详细教学:神经元、神经网络、感知机、激活函数、损失函数、优化算法(梯度下降)
  • 大数据管理与应用系列丛书《数据挖掘》读书笔记之集成学习(1)
  • 基于PHP服装租赁管理系统/基于php的服装管理系统的设计与实现
  • 基于电磁频谱地图的辐射源定位算法复现