当前位置: 首页 > news >正文

17.TaskExecutor与ResourceManager交互

TaskExecutorResourceManager交互

  • TaskExecutorResourceManager 之间的交互机制较为复杂,核心包含首次发现与注册连接建立以及心跳维持三个阶段。后续会将这些内容拆解为两个部分详细说明。本文重点介绍领导者发现与注册阶段。

LeaderRetrievalService

  • LeaderRetrievalService(领导者检索服务)是专门用于监听领导者变更的组件,前文已有介绍。
  • Standalone 模式下,其实现类为 StandaloneLeaderRetrievalService。该服务启动后会持续监听指定组件的领导者信息变化。
  • 对于 TaskExecutor 而言,这一监听服务主要用于监控ResourceManager 的领导者变更,监听的回调对象为:
    ResourceManagerLeaderListener

ResourceManagerLeaderListener

  • ResourceManagerLeaderListenerLeaderRetrievalService 中的回调监听器。
  • 其职责是:
    • ResourceManager 领导者节点发生变更时,通知 TaskExecutor,以便后者与新的领导者节点重新建立连接与注册关系。
    • 换句话说,ResourceManagerLeaderListener 是**TaskExecutor 感知 ResourceManager 变动**的关键入口。
/** The listener for leader changes of the resource manager. */private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {@Overridepublic void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {runAsync(() ->//这一步通知,并建立连接notifyOfNewResourceManagerLeader(leaderAddress,ResourceManagerId.fromUuidOrNull(leaderSessionID)));}@Overridepublic void handleError(Exception exception) {onFatalError(exception);}}// ------------------------------------------------------------------------//  Internal resource manager connection methods// ------------------------------------------------------------------------private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {resourceManagerAddress =//解析获取对应的 akka下的 地址createResourceManagerAddress(newLeaderAddress, newResourceManagerId);//连接 ResourceManagerreconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s",resourceManagerAddress)));}private void reconnectToResourceManager(Exception cause) {//取消之前的连接closeResourceManagerConnection(cause);//超时控制startRegistrationTimeout();//尝试建立连接tryConnectToResourceManager();}
  • 先看一下 startRegistrationTimeout方法的实现:

    • 用于控制注册过程的最大等待时间。如果在设定的超时时间内未完成注册,系统会触发超时回调,重新尝试或报错退出。
    • 超时机制采用 UUID 标记以避免并发问题,只有当前有效的请求会触发超时逻辑
     private void startRegistrationTimeout() {final Duration maxRegistrationDuration =taskManagerConfiguration.getMaxRegistrationDuration();if (maxRegistrationDuration != null) {//生成新的uuidfinal UUID newRegistrationTimeoutId = UUID.randomUUID();currentRegistrationTimeoutId = newRegistrationTimeoutId;scheduleRunAsync(//定时异步检查,如果 currentRegistrationTimeoutId 和 newRegistrationTimeoutId 相等,就说明没连上//说明在连接了以后,肯定有某个地方取消了 currentRegistrationTimeoutId。() -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);}}
    
  • 再看一下 tryConnectToResourceManager方法

    • 前提条件:监听到领导者后,resourceManagerAddress 已被解析生成。
    • 核心动作:调用 connectToResourceManager() 方法建立实际连接。
private void tryConnectToResourceManager() {if (resourceManagerAddress != null) {connectToResourceManager();}}private void connectToResourceManager() {assert (resourceManagerAddress != null);assert (establishedResourceManagerConnection == null);assert (resourceManagerConnection == null);log.info("Connecting to ResourceManager {}.", resourceManagerAddress);// 封装 TaskExecutor 自身的基本信息,作为注册请求的数据final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(getAddress(),                            // TaskExecutor RPC 地址getResourceID(),                         // TaskExecutor 唯一资源 IDunresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),         // JMX 端口(用于监控)hardwareDescription,                     // 硬件信息memoryConfiguration,                     // 内存资源信息taskManagerConfiguration.getDefaultSlotResourceProfile(),  // 默认 slot 配置taskManagerConfiguration.getTotalResourceProfile(),        // 总资源配置unresolvedTaskManagerLocation.getNodeId()                  // 节点 ID);// 构建与 ResourceManager 通信的专用连接管理类resourceManagerConnection = new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),                   // ResourceManager 地址resourceManagerAddress.getResourceManagerId(),         // ResourceManager session IDgetMainThreadExecutor(),new ResourceManagerRegistrationListener(),             // 注册结果监听taskExecutorRegistration                               // 注册数据);// 正式启动连接与注册流程resourceManagerConnection.start();
}

核心方法:connectToResourceManager

关键组件说明
TaskExecutorRegistration
  • 封装了当前 TaskExecutor 节点的所有关键信息。
  • 这些信息将在后续通过 RPC 发送给 ResourceManager,用于注册与资源上报。
TaskExecutorToResourceManagerConnection
  • 这是一个专门负责管理与 ResourceManager 通信的类。
  • 核心职责:
    • 负责发送注册请求;
    • 管理连接状态与生命周期;
    • 支持断线重连、注册重试;
    • 通过监听器获取注册结果(成功/失败)。

resourceManagerConnection.start()
  • 启动该连接管理类:
    • 立即发起首次注册请求
    • 如果失败,依据配置进行重试
    • 注册成功后,建立起稳定的通信会话
    • 后续的心跳机制slot 报告均通过此连接进行。

RegisteredRpcConnection

  • 这是一个用于封装两个组件之间 RPC 通信逻辑的通用工具类
  • 在本文的场景中,主要体现为 TaskExecutorToResourceManagerConnection,负责管理 TaskExecutorResourceManager 之间的连接生命周期和注册流程。

start方法

// ------------------------------------------------------------------------//  Life cycle// ------------------------------------------------------------------------public void start() {checkState(!closed, "The RPC connection is already closed");checkState(!isConnected() && pendingRegistration == null,"The RPC connection is already started");// 创建一个重试注册策略对象,用于管理连接和注册的重试逻辑final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {//启动该策略newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}}

关键点解析

  • start() 方法是连接生命周期的入口,负责启动注册和连接建立过程。
  • 通过原子更新器 (REGISTRATION_UPDATER) 保证注册策略只被启动一次,防止并发重复执行。
  • 注册过程封装为 RetryingRegistration,它实现了自动重试机制,确保在网络抖动或服务不可达时,能够持续尝试连接。
  • 如果已经有激活的注册流程,新的启动请求会被取消,避免资源浪费和竞态条件。

createNewRegistration()方法

  • 该方法负责生成一个带有回调机制的注册任务(RetryingRegistration),用以管理与目标组件(此处为 ResourceManager)的连接和注册流程。

    private RetryingRegistration<F, G, S, R> createNewRegistration() {RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration());CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future =newRegistration.getFuture();future.whenCompleteAsync((RetryingRegistration.RetryingRegistrationResult<G, S, R> result, Throwable failure) -> {if (failure != null) {if (failure instanceof CancellationException) {// 忽略取消异常,表示注册任务被主动取消log.debug("Retrying registration towards {} was cancelled.", targetAddress);} else {// 非取消异常视为严重错误,触发失败处理onRegistrationFailure(failure);}} else {// 注册结果处理if (result.isSuccess()) {targetGateway = result.getGateway();onRegistrationSuccess(result.getSuccess());} else if (result.isRejection()) {onRegistrationRejection(result.getRejection());} else {throw new IllegalArgumentException(String.format("Unknown retrying registration response: %s.", result));}}},executor);return newRegistration;
    }
    

    核心流程说明:

    • 生成一个新的注册任务(RetryingRegistration)。

      • 该类中包含一个重要的成员变量:CompletableFuture<RetryingRegistrationResult<G, S, R>> completionFuture。

        • 这是整个注册过程的异步结果通知机制。
        • 通知的方法是 onRegistrationSuccess(result.getSuccess());
      • 若注册失败且非主动取消,触发失败处理逻辑。

      • 若注册成功,保存返回的 Gateway 并触发成功处理。

      • 若注册被拒绝,调用拒绝处理逻辑。

    • 整体通过异步回调执行,保证非阻塞执行。

startRegistration() 方法

  • 该方法是 RetryingRegistration 启动注册尝试的入口,完成目标地址解析及正式发起注册请求。
@SuppressWarnings("unchecked")
public void startRegistration() {if (canceled) {// 已被取消,直接返回return;}try {// 解析目标地址,获取 RPC Gateway 代理final CompletableFuture<G> rpcGatewayFuture;if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture =//接下来重点看  rpcService.connect 方法(CompletableFuture<G>) rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));} else {rpcGatewayFuture = rpcService.connect(targetAddress, targetType);}// 成功解析后,开始执行注册流程CompletableFuture<Void> rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync((G rpcGateway) -> {log.info("Resolved {} address, beginning registration", targetName);register(rpcGateway,1,retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());},rpcService.getScheduledExecutor());// 如果解析失败且未取消,则根据配置延迟重试rpcGatewayAcceptFuture.whenCompleteAsync((Void v, Throwable failure) -> {if (failure != null && !canceled) {final Throwable strippedFailure = ExceptionUtils.stripCompletionException(failure);if (log.isDebugEnabled()) {log.debug("Could not resolve {} address {}, retrying in {} ms.",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure);} else {log.info("Could not resolve {} address {}, retrying in {} ms: {}",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure.getMessage());}startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());}},rpcService.getScheduledExecutor());} catch (Throwable t) {// 出现异常,完成异常通知并取消注册completionFuture.completeExceptionally(t);cancel();}
}

重点解析

  • 首先判断是否已经取消,如果是则跳过。
  • 调用 rpcService.connect 解析目标地址,获取对应的 RPC Gateway。
    • 对支持 fencing 的 Gateway,会传入 fencingToken 进行鉴权。
  • 解析成功后,调用 register() 方法发起正式的注册请求,传入初始重试次数和超时时间。
  • 解析失败时,根据配置的延迟时间进行重试。
  • 异常捕获保证了在意外错误发生时,能正确地完成异常流程并取消注册。

总结:

  • 至此,TaskExecutorResourceManager注册阶段流程梳理完毕:
    • 监听 ResourceManager 领导者变更;
    • 建立连接并发送注册请求;
    • 注册过程支持自动重试与超时控制;
    • 注册成功后建立正式会话,后续的心跳与 slot 汇报基于该会话完成。
  • 后续章节将重点介绍:
    • 注册细节
      • 包括 TaskExecutorResourceManager 双方的网关对象生成机制;
    • 心跳机制
      • 注册成功后,如何通过心跳维持与 ResourceManager 的活跃连接;
http://www.dtcms.com/a/288810.html

相关文章:

  • 论文阅读:Instruct BLIP (2023.5)
  • 【Lua】多脚本引用
  • Java反射:打破静态限制的利器
  • 【笔记】Anaconda 重装后虚拟环境写入路径异常的完整排查与解决过程
  • MySQL—表设计和聚合函数以及正则表达式
  • LeetCode 1712.将数组分成三个子数组的方案数
  • ZooKeeper学习专栏(二):深入 Watch 机制与会话管理
  • BST(二叉搜索树)的笔试大题(C语言)
  • [硬件电路-59]:电源:电子存储的仓库,电能的发生地,电场的动力场所
  • 手推OpenGL相机的正交投影矩阵和透视投影矩阵(附源码)
  • 【AI】文生图文生视频
  • 第三章自定义检视面板_创建自定义编辑器类_编辑器操作的撤销与恢复(本章进度3/9)
  • 使用pnpm安装项目的生产依赖dependencies和开发依赖devDependies及pnpm工作空间等简单使用方法说明
  • Function
  • Qwen3-8B 与 ChatGPT-4o Mini 的 TTFT 性能对比与底层原理详解
  • Docker实战:使用Docker部署envlinks极简个人导航页
  • Springboot美食分享平台
  • 【Kafka】深入理解 Kafka MirrorMaker2 - 实战篇
  • Mac m系列 VMware Fusion虚拟机安装ARM contos
  • host.equiv,.rhosts,inetd.conf文件的作用
  • Python应用进阶DAY10--模块化编程概念(模块、包、导入)及常见系统模块总结和第三方模块管理
  • 如何解决pip安装报错ModuleNotFoundError: No module named ‘beautifulsoup4’问题
  • 响应式编程入门教程第九节:UniRx 高级特性与自定义
  • python doipclient库
  • 学习C++、QT---30(QT库中如何自定义控件(自定义按钮)讲解)
  • XSS知识总结
  • Ajax简单介绍及Axios请求方式的别名
  • MyBatis从浅入深
  • SQL中的EXPLAIN命令详解
  • python的pywebview库结合Flask和waitress开发桌面应用程序简介