当前位置: 首页 > news >正文

DolphinScheduler 3.2.0 Worker启动核心源码解析

目录

一、概览

二、Worker 启动入口 run()

三、启动 RPC 服务

3.1 Netty RPC Server(接收请求)

3.2 Netty RPC Client(发送结果)

四、加载 Task 插件

五、注册中心客户端启动

六、Worker 管理线程启动

七、消息重试线程启动

八、RPC 请求到任务执行的完整链路

九、总结


一、概览

在 DolphinScheduler 中,Worker 负责从 Master 接收调度命令、执行具体的 Task,并将执行结果通过消息回传给 Master。Worker 在 JVM 中启动后,需要依次完成以下关键组件的初始化和启动:

  • Netty RPC 服务端(workerRpcServer)和客户端(workerRpcClient

  • 任务插件管理器(taskPluginManager

  • 注册中心客户端(workerRegistryClient

  • Worker 管理线程(调度任务分发)

  • 消息重试线程(保证消息可靠性)

最终,Worker 进入“消息接收 → 任务封装 → 入队等待 → 线程池执行 → 结果回传”的循环流程,持续响应 Master 的分发请求并执行任务。


二、Worker 启动入口 run()

Worker 的入口方法标记为 @PostConstruct,即在 Spring 容器完成 Bean 注入后自动调用。其源码如下:

@PostConstruct
public void run() {// 1. 启动 rpc 服务this.workerRpcServer.start();this.workerRpcClient.start();// 2. 加载插件this.taskPluginManager.loadPlugin();// 3. 启动注册中心客户端this.workerRegistryClient.setRegistryStoppable(this);this.workerRegistryClient.start();// 4. 启动管理线程this.workerManagerThread.start();// 5. 启动消息重试线程this.messageRetryRunner.start();
}

第1步:启动 RPC 服务(服务端 + 客户端),用于接收 Master 下发的分发消息以及向 Master 发送执行结果。 – 第2步:通过 SPI 加载 User 自定义或内置的 TaskChannelFactory,实现对不同 Task 类型的支持。 – 第3步:启动注册中心客户端,将自己注册到 Zookeeper(或其他注册中心)上,以便 Master 可发现并下发任务。 – 第4步:启动 Worker 管理线程,用于不断从等待队列取出任务并提交给线程池执行。 – 第5步:启动消息重试线程,负责对由于网络抖动或 Master 未及时 ACK 而需要重试发送的消息进行重发。


三、启动 RPC 服务

3.1 Netty RPC Server(接收请求)

public void start() {NettyServerConfig serverConfig = workerConfig.getWorkerRpcServerConfig();serverConfig.setListenPort(workerConfig.getListenPort());nettyRemotingServer = new NettyRemotingServer(serverConfig);for (WorkerRpcProcessor processor : workerRpcProcessors) {nettyRemotingServer.registerProcessor(processor);}this.nettyRemotingServer.start();
}
  1. 读取配置:从 workerConfig 中获取 RPC 服务端配置,包括端口、线程池大小等。

  2. 构造 NettyRemotingServer:底层基于 Netty 封装的服务器类,用于接收网络消息。

  3. 注册 Processor:通过 nettyRemotingServer.registerProcessor 将各类消息处理器(如 WorkerTaskDispatchProcessor)绑定到不同的消息类型上。

  4. 启动服务器:调用 Netty 的 bind(port).sync(),启动 BossGroup 和 WorkerGroup,监听客户端连接。

底层实现重点在 NettyRemotingServer.start(),核心伪码如下:

if (isStarted.compareAndSet(false, true)) {serverBootstrap.group(bossGroup, workGroup).channel(getServerSocketChannelClass()).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) {initNettyChannel(ch);}});serverBootstrap.bind(listenPort).sync();
}
  • BossGroup 负责接收连接,WorkerGroup 负责处理 I/O 读写。

  • 通过 ChannelInitializer 向 Pipeline 中注入解码器、编码器、心跳检测等 Handler。

3.2 Netty RPC Client(发送结果)

public void start() {this.nettyRemotingClient = new NettyRemotingClient(workerConfig.getWorkerRpcClientConfig());for (WorkerRpcProcessor processor : workerRpcProcessors) {this.nettyRemotingClient.registerProcessor(processor);}
}

NettyRemotingClient 构造时会初始化:

  • EventLoopGroup(Epoll 或 NIO)

  • Callback Executor:处理响应回调

  • Response Future Executor:扫描超时的未回包请求

其启动逻辑:

bootstrap.group(workerGroup).channel(getSocketChannelClass()).option(TCP_NODELAY, clientConfig.isTcpNoDelay()).handler(new ChannelInitializer<SocketChannel>() {public void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(...)).addLast(new NettyDecoder(), clientHandler, encoder);}});
responseFutureExecutor.scheduleWithFixedDelay(ResponseFuture::scanFutureTable, 0, 1, TimeUnit.SECONDS);
isStarted.set(true);
  • 连接池与重连策略:客户端在发送消息时会根据连接状态自动重连或重建 Channel。

  • 心跳机制:通过 IdleStateHandler 定期向 Master 发送心跳,保持连接活跃。

  • 超时重试ResponseFuture 定时扫描未收到回包的请求,触发超时失败或重试。


四、加载 Task 插件

Worker 支持多种 Task 类型(Shell、Spark、MapReduce……),这些逻辑都通过 SPI 插件化管理:

public void loadPlugin() {PrioritySPIFactory<TaskChannelFactory> factoryLoader= new PrioritySPIFactory<>(TaskChannelFactory.class);for (Map.Entry<String, TaskChannelFactory> entry : factoryLoader.getSPIMap().entrySet()) {String name = entry.getKey();TaskChannelFactory factory = entry.getValue();taskChannelFactoryMap.put(name, factory);taskChannelMap.put(name, factory.create());}
}
  1. PrioritySPIFactory 利用 ServiceLoader.load(spiClass) 扫描 classpath 下所有 META-INF/services/... 配置的实现。

  2. 冲突处理:若多个插件使用同一标识(t.getIdentify().getName()),则通过 resolveConflict 按优先级或版本选择。

  3. 实例化 Factory 并调用 create(),生成具体 TaskChannel,用于执行阶段构建执行命令、日志采集等。

插件加载完成后,Worker 便可根据 Task 类型动态路由到对应的执行实现。


五、注册中心客户端启动

Worker 需要向统一的注册中心(如 Zookeeper)注册自己的可用性信息,Master 才能发现并调度任务给它。

public void start() {registry();registryClient.addConnectionStateListener(new WorkerConnectionStateListener(workerConfig, strategy));
}
private void registry() {WorkerHeartBeat hb = workerHeartBeatTask.getHeartBeat();String path = workerConfig.getWorkerRegistryPath();registryClient.remove(path);registryClient.persistEphemeral(path, JSONUtils.toJsonString(hb));workerHeartBeatTask.start();
}
  • persistEphemeral:将自身地址、可用线程数等写入 ${registryRoot}/workers/${workerAddress},并由 ZK 管理生命周期。

  • 心跳定时任务workerHeartBeatTask.start() 定时刷新节点数据/TTL,保证长连接下的可用性信息及时更新。

  • 连接监听:若与注册中心断开,WorkerConnectionStateListener 可触发重连或优雅退出。


六、Worker 管理线程启动

Worker 启动后,用一个独立线程不断从“等待提交队列”中取出任务并提交给线程池。

public void run() {while (!ServerLifeCycleManager.isStopped()) {if (!ServerLifeCycleManager.isRunning()) {Thread.sleep(SLEEP_TIME);}if (getThreadPoolQueueSize() <= workerExecThreads) {WorkerDelayTaskExecuteRunnable task = waitSubmitQueue.take();workerExecService.submit(task);} else {incOverloadCount();Thread.sleep(SLEEP_TIME);}}
}
  • waitSubmitQueue:阻塞队列,存放待执行的 WorkerDelayTaskExecuteRunnable 实例。

  • 背压机制:当线程池队列已满(> workerExecThreads)时,先统计过载指标,再稍后重试。

  • workerExecService:底层是带监听能力的 ListeningExecutorService,执行完毕后可注册回调处理日志和结果汇总。


七、消息重试线程启动

Worker 执行过程中需要向 Master 回传任务执行结果、日志位点、心跳等消息。若网络抖动导致消息发送失败,需重试:

public void run() {while (!ServerLifeCycleManager.isStopped()) {if (needToRetryMessages.isEmpty()) {Thread.sleep(MESSAGE_RETRY_WINDOW);}long now = System.currentTimeMillis();for (entry in needToRetryMessages) {for (message in entry.getValue()) {if (now - message.getSendTime() > RETRY_WINDOW) {message.setSendTime(now);messageSenderMap.get(type).sendMessage(message);}}}Thread.sleep(SLEEP_TIME);}
}
  • needToRetryMessages:按 TaskInstanceId 分类的待重试消息列表。

  • 定时扫描:以 MESSAGE_RETRY_WINDOW(如 30 秒)为周期,判断消息是否超时,若超时则重新调用对应的 messageSender 去向 Master 派发。

  • 幂等与日志:每次重试都会刷新发送时间,并输出重试日志,保证 Master 端能最终收到消息或记录重试失败。


八、RPC 请求到任务执行的完整链路

  1. Master 通过 RPC 调用 WorkerRpcClient.sendMessageTaskDispatchRequest 发往指定 Worker。

  2. Worker Netty Server 的 NettyServerHandler.channelRead() 接收消息,调用 processReceived()

    pair = processors.get(msg.getType());
    pair.getRight().submit(() -> processor.process(channel, msg));
  3. WorkerTaskDispatchProcessor.process() 反序列化 TaskDispatchRequest,构建 TaskExecutionContext 并缓存。

  4. 检查是否需要延时执行(delayTime);若需延时,则先发送延时执行消息给 Master,跳过入队。

  5. 通过 WorkerTaskExecuteRunnableFactoryBuilder 构造 WorkerDelayTaskExecuteRunnable,并调用:

    if (!workerManager.offer(runnable)) {sendDispatchRejectResult();
    } else {sendDispatchSuccessResult();
    }
  6. workerManager.offer() 根据满载策略将 Runnable 放入 waitSubmitQueue,等待管理线程消费。

  7. 管理线程取出后提交给线程池执行,进入 WorkerDelayTaskExecuteRunnable.run(),真正执行 TaskChannel 的 execute()

  8. Task 执行过程中通过 workerRpcClient 向 Master 发送日志、状态更新等消息;若发送失败,加入 needToRetryMessages 由重试线程处理。

  9. Task 完成后,最终调用 messageSenderMap.get(TASK_EXECUTE_RESULT).sendMessage(resultMessage),Master 收到后更新实例状态。


九、总结

DolphinScheduler Worker 的启动流程,实际上是一套“轻量级调度节点”从初始化、服务注册、插件加载,到任务接收、入队执行、结果回传以及消息可靠性保障的完整闭环。

  • 可插拔:SPI 机制加载 TaskChannel 工厂,支持按需扩展。

  • 高并发:Netty + 线程池,实现主从分离、异步解耦。

  • 可靠性:注册中心心跳、RPC 心跳、消息重试,保证节点可用性和消息可靠交互。

  • 可监控:丰富的度量指标(队列长度、过载次数、重试次数),有助于运维监控和自动扩缩容。

http://www.dtcms.com/a/270932.html

相关文章:

  • 一天一道Sql题(day05)
  • IntelliJ IDEA 2025.1.3创建不了java8的项目
  • 初识MySQL(三)之主从配置与读写分离实战
  • Mac电脑,休眠以后,发现电量一直在减少,而且一个晚上,基本上是没了,开机都需要插电源的简单处理
  • Hive MetaStore的实现和优化
  • 在 macOS 上安装与自定义 Oh My Zsh:让终端美观又高效 [特殊字符]
  • 如何使用Pytest进行测试?
  • 基于大模型的窦性心动过速全周期预测与诊疗方案研究报告
  • 【linux】ssh使用-X参数后报错:X11 forwarding request failed on channel 0
  • [GICP] 点云预处理 | 近似最近邻搜索结构(ANN) | KdTree构建 vs 体素地图shi管理
  • 宇树 G1 部署(一)——综述
  • 6N70-ASEMI开关电源核心元件6N70
  • Go语言教程-环境搭建
  • [Vroom] 位置与矩阵 | 路由集成 | 抽象,解耦与通信
  • VScode SSH远程连接Ubuntu(通过SSH密钥对的方式)
  • LLM的表征做减法的是什么,自然语言是一个矩阵,怎么进行减法的
  • 爬虫-正则表达式
  • 【HarmonyOS6】获取华为用户信息
  • 出圈or出局?AI汽车“急速驶来”,市场淘汰赛一触即发
  • leetcode 每日一题 3439. 重新安排会议得到最多空余时间 I
  • 二刷 黑马点评 部署
  • 大模型MoE模型技术详解
  • 专题一_双指针_查找总价格为目标值的两个商品
  • 小程序主体变更全攻略:流程、资料与异常处理方案
  • WPF学习笔记(27)科学计算器
  • 李宏毅NLP-9-语音转换
  • 无人机报警器频段模块设计与运行要点
  • 小米路由器3C刷OpenWrt,更换系统/变砖恢复 指南
  • 在 Spring Boot 中如何使用 Assert 进行断言校验
  • 安卓设备信息查看器 - 功能介绍