17.TaskExecutor与ResourceManager交互
TaskExecutor
与ResourceManager
交互
TaskExecutor
与ResourceManager
之间的交互机制较为复杂,核心包含首次发现与注册、连接建立以及心跳维持三个阶段。后续会将这些内容拆解为两个部分详细说明。本文重点介绍领导者发现与注册阶段。
LeaderRetrievalService
LeaderRetrievalService
(领导者检索服务)是专门用于监听领导者变更的组件,前文已有介绍。- 在
Standalone
模式下,其实现类为StandaloneLeaderRetrievalService
。该服务启动后会持续监听指定组件的领导者信息变化。 - 对于
TaskExecutor
而言,这一监听服务主要用于监控ResourceManager 的领导者变更,监听的回调对象为:
ResourceManagerLeaderListener
ResourceManagerLeaderListener
ResourceManagerLeaderListener
是LeaderRetrievalService
中的回调监听器。- 其职责是:
- 当
ResourceManager
领导者节点发生变更时,通知TaskExecutor
,以便后者与新的领导者节点重新建立连接与注册关系。 - 换句话说,
ResourceManagerLeaderListener
是**TaskExecutor
感知 ResourceManager 变动**的关键入口。
- 当
/** The listener for leader changes of the resource manager. */private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {@Overridepublic void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {runAsync(() ->//这一步通知,并建立连接notifyOfNewResourceManagerLeader(leaderAddress,ResourceManagerId.fromUuidOrNull(leaderSessionID)));}@Overridepublic void handleError(Exception exception) {onFatalError(exception);}}// ------------------------------------------------------------------------// Internal resource manager connection methods// ------------------------------------------------------------------------private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {resourceManagerAddress =//解析获取对应的 akka下的 地址createResourceManagerAddress(newLeaderAddress, newResourceManagerId);//连接 ResourceManagerreconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s",resourceManagerAddress)));}private void reconnectToResourceManager(Exception cause) {//取消之前的连接closeResourceManagerConnection(cause);//超时控制startRegistrationTimeout();//尝试建立连接tryConnectToResourceManager();}
-
先看一下
startRegistrationTimeout
方法的实现:- 用于控制注册过程的最大等待时间。如果在设定的超时时间内未完成注册,系统会触发超时回调,重新尝试或报错退出。
- 超时机制采用 UUID 标记以避免并发问题,只有当前有效的请求会触发超时逻辑
private void startRegistrationTimeout() {final Duration maxRegistrationDuration =taskManagerConfiguration.getMaxRegistrationDuration();if (maxRegistrationDuration != null) {//生成新的uuidfinal UUID newRegistrationTimeoutId = UUID.randomUUID();currentRegistrationTimeoutId = newRegistrationTimeoutId;scheduleRunAsync(//定时异步检查,如果 currentRegistrationTimeoutId 和 newRegistrationTimeoutId 相等,就说明没连上//说明在连接了以后,肯定有某个地方取消了 currentRegistrationTimeoutId。() -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);}}
-
再看一下
tryConnectToResourceManager
方法- 前提条件:监听到领导者后,
resourceManagerAddress
已被解析生成。 - 核心动作:调用
connectToResourceManager()
方法建立实际连接。
- 前提条件:监听到领导者后,
private void tryConnectToResourceManager() {if (resourceManagerAddress != null) {connectToResourceManager();}}private void connectToResourceManager() {assert (resourceManagerAddress != null);assert (establishedResourceManagerConnection == null);assert (resourceManagerConnection == null);log.info("Connecting to ResourceManager {}.", resourceManagerAddress);// 封装 TaskExecutor 自身的基本信息,作为注册请求的数据final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(getAddress(), // TaskExecutor RPC 地址getResourceID(), // TaskExecutor 唯一资源 IDunresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1), // JMX 端口(用于监控)hardwareDescription, // 硬件信息memoryConfiguration, // 内存资源信息taskManagerConfiguration.getDefaultSlotResourceProfile(), // 默认 slot 配置taskManagerConfiguration.getTotalResourceProfile(), // 总资源配置unresolvedTaskManagerLocation.getNodeId() // 节点 ID);// 构建与 ResourceManager 通信的专用连接管理类resourceManagerConnection = new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(), // ResourceManager 地址resourceManagerAddress.getResourceManagerId(), // ResourceManager session IDgetMainThreadExecutor(),new ResourceManagerRegistrationListener(), // 注册结果监听taskExecutorRegistration // 注册数据);// 正式启动连接与注册流程resourceManagerConnection.start();
}
核心方法:connectToResourceManager
关键组件说明
TaskExecutorRegistration
- 封装了当前
TaskExecutor
节点的所有关键信息。 - 这些信息将在后续通过 RPC 发送给
ResourceManager
,用于注册与资源上报。
TaskExecutorToResourceManagerConnection
- 这是一个专门负责管理与 ResourceManager 通信的类。
- 核心职责:
- 负责发送注册请求;
- 管理连接状态与生命周期;
- 支持断线重连、注册重试;
- 通过监听器获取注册结果(成功/失败)。
resourceManagerConnection.start()
- 启动该连接管理类:
- 立即发起首次注册请求;
- 如果失败,依据配置进行重试;
- 注册成功后,建立起稳定的通信会话;
- 后续的心跳机制和slot 报告均通过此连接进行。
RegisteredRpcConnection
- 这是一个用于封装两个组件之间 RPC 通信逻辑的通用工具类。
- 在本文的场景中,主要体现为
TaskExecutorToResourceManagerConnection
,负责管理TaskExecutor
与ResourceManager
之间的连接生命周期和注册流程。
start方法
// ------------------------------------------------------------------------// Life cycle// ------------------------------------------------------------------------public void start() {checkState(!closed, "The RPC connection is already closed");checkState(!isConnected() && pendingRegistration == null,"The RPC connection is already started");// 创建一个重试注册策略对象,用于管理连接和注册的重试逻辑final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {//启动该策略newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}}
关键点解析
start()
方法是连接生命周期的入口,负责启动注册和连接建立过程。- 通过原子更新器 (
REGISTRATION_UPDATER
) 保证注册策略只被启动一次,防止并发重复执行。 - 注册过程封装为
RetryingRegistration
,它实现了自动重试机制,确保在网络抖动或服务不可达时,能够持续尝试连接。 - 如果已经有激活的注册流程,新的启动请求会被取消,避免资源浪费和竞态条件。
createNewRegistration()方法
-
该方法负责生成一个带有回调机制的注册任务(
RetryingRegistration
),用以管理与目标组件(此处为ResourceManager
)的连接和注册流程。private RetryingRegistration<F, G, S, R> createNewRegistration() {RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration());CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future =newRegistration.getFuture();future.whenCompleteAsync((RetryingRegistration.RetryingRegistrationResult<G, S, R> result, Throwable failure) -> {if (failure != null) {if (failure instanceof CancellationException) {// 忽略取消异常,表示注册任务被主动取消log.debug("Retrying registration towards {} was cancelled.", targetAddress);} else {// 非取消异常视为严重错误,触发失败处理onRegistrationFailure(failure);}} else {// 注册结果处理if (result.isSuccess()) {targetGateway = result.getGateway();onRegistrationSuccess(result.getSuccess());} else if (result.isRejection()) {onRegistrationRejection(result.getRejection());} else {throw new IllegalArgumentException(String.format("Unknown retrying registration response: %s.", result));}}},executor);return newRegistration; }
核心流程说明:
-
生成一个新的注册任务(
RetryingRegistration
)。-
该类中包含一个重要的成员变量:CompletableFuture<RetryingRegistrationResult<G, S, R>> completionFuture。
- 这是整个注册过程的异步结果通知机制。
- 通知的方法是 onRegistrationSuccess(result.getSuccess());
-
若注册失败且非主动取消,触发失败处理逻辑。
-
若注册成功,保存返回的 Gateway 并触发成功处理。
-
若注册被拒绝,调用拒绝处理逻辑。
-
-
整体通过异步回调执行,保证非阻塞执行。
-
startRegistration()
方法
- 该方法是
RetryingRegistration
启动注册尝试的入口,完成目标地址解析及正式发起注册请求。
@SuppressWarnings("unchecked")
public void startRegistration() {if (canceled) {// 已被取消,直接返回return;}try {// 解析目标地址,获取 RPC Gateway 代理final CompletableFuture<G> rpcGatewayFuture;if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture =//接下来重点看 rpcService.connect 方法(CompletableFuture<G>) rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));} else {rpcGatewayFuture = rpcService.connect(targetAddress, targetType);}// 成功解析后,开始执行注册流程CompletableFuture<Void> rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync((G rpcGateway) -> {log.info("Resolved {} address, beginning registration", targetName);register(rpcGateway,1,retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());},rpcService.getScheduledExecutor());// 如果解析失败且未取消,则根据配置延迟重试rpcGatewayAcceptFuture.whenCompleteAsync((Void v, Throwable failure) -> {if (failure != null && !canceled) {final Throwable strippedFailure = ExceptionUtils.stripCompletionException(failure);if (log.isDebugEnabled()) {log.debug("Could not resolve {} address {}, retrying in {} ms.",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure);} else {log.info("Could not resolve {} address {}, retrying in {} ms: {}",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure.getMessage());}startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());}},rpcService.getScheduledExecutor());} catch (Throwable t) {// 出现异常,完成异常通知并取消注册completionFuture.completeExceptionally(t);cancel();}
}
重点解析
- 首先判断是否已经取消,如果是则跳过。
- 调用
rpcService.connect
解析目标地址,获取对应的 RPC Gateway。- 对支持 fencing 的 Gateway,会传入 fencingToken 进行鉴权。
- 解析成功后,调用
register()
方法发起正式的注册请求,传入初始重试次数和超时时间。 - 解析失败时,根据配置的延迟时间进行重试。
- 异常捕获保证了在意外错误发生时,能正确地完成异常流程并取消注册。
总结:
- 至此,
TaskExecutor
与ResourceManager
的注册阶段流程梳理完毕:- 监听
ResourceManager
领导者变更; - 建立连接并发送注册请求;
- 注册过程支持自动重试与超时控制;
- 注册成功后建立正式会话,后续的心跳与 slot 汇报基于该会话完成。
- 监听
- 后续章节将重点介绍:
- 注册细节
- 包括
TaskExecutor
与ResourceManager
双方的网关对象生成机制;
- 包括
- 心跳机制
- 注册成功后,如何通过心跳维持与
ResourceManager
的活跃连接;
- 注册成功后,如何通过心跳维持与
- 注册细节