当前位置: 首页 > wzjs >正文

地产网站互动设计国际新闻今天最新消息

地产网站互动设计,国际新闻今天最新消息,网站线框图怎样做,wordpress更改链接后404目录 一、概览 二、Worker 启动入口 run() 三、启动 RPC 服务 3.1 Netty RPC Server(接收请求) 3.2 Netty RPC Client(发送结果) 四、加载 Task 插件 五、注册中心客户端启动 六、Worker 管理线程启动 七、消息重试线程启…

目录

一、概览

二、Worker 启动入口 run()

三、启动 RPC 服务

3.1 Netty RPC Server(接收请求)

3.2 Netty RPC Client(发送结果)

四、加载 Task 插件

五、注册中心客户端启动

六、Worker 管理线程启动

七、消息重试线程启动

八、RPC 请求到任务执行的完整链路

九、总结


一、概览

在 DolphinScheduler 中,Worker 负责从 Master 接收调度命令、执行具体的 Task,并将执行结果通过消息回传给 Master。Worker 在 JVM 中启动后,需要依次完成以下关键组件的初始化和启动:

  • Netty RPC 服务端(workerRpcServer)和客户端(workerRpcClient

  • 任务插件管理器(taskPluginManager

  • 注册中心客户端(workerRegistryClient

  • Worker 管理线程(调度任务分发)

  • 消息重试线程(保证消息可靠性)

最终,Worker 进入“消息接收 → 任务封装 → 入队等待 → 线程池执行 → 结果回传”的循环流程,持续响应 Master 的分发请求并执行任务。


二、Worker 启动入口 run()

Worker 的入口方法标记为 @PostConstruct,即在 Spring 容器完成 Bean 注入后自动调用。其源码如下:

@PostConstruct
public void run() {// 1. 启动 rpc 服务this.workerRpcServer.start();this.workerRpcClient.start();// 2. 加载插件this.taskPluginManager.loadPlugin();// 3. 启动注册中心客户端this.workerRegistryClient.setRegistryStoppable(this);this.workerRegistryClient.start();// 4. 启动管理线程this.workerManagerThread.start();// 5. 启动消息重试线程this.messageRetryRunner.start();
}

第1步:启动 RPC 服务(服务端 + 客户端),用于接收 Master 下发的分发消息以及向 Master 发送执行结果。 – 第2步:通过 SPI 加载 User 自定义或内置的 TaskChannelFactory,实现对不同 Task 类型的支持。 – 第3步:启动注册中心客户端,将自己注册到 Zookeeper(或其他注册中心)上,以便 Master 可发现并下发任务。 – 第4步:启动 Worker 管理线程,用于不断从等待队列取出任务并提交给线程池执行。 – 第5步:启动消息重试线程,负责对由于网络抖动或 Master 未及时 ACK 而需要重试发送的消息进行重发。


三、启动 RPC 服务

3.1 Netty RPC Server(接收请求)

public void start() {NettyServerConfig serverConfig = workerConfig.getWorkerRpcServerConfig();serverConfig.setListenPort(workerConfig.getListenPort());nettyRemotingServer = new NettyRemotingServer(serverConfig);for (WorkerRpcProcessor processor : workerRpcProcessors) {nettyRemotingServer.registerProcessor(processor);}this.nettyRemotingServer.start();
}
  1. 读取配置:从 workerConfig 中获取 RPC 服务端配置,包括端口、线程池大小等。

  2. 构造 NettyRemotingServer:底层基于 Netty 封装的服务器类,用于接收网络消息。

  3. 注册 Processor:通过 nettyRemotingServer.registerProcessor 将各类消息处理器(如 WorkerTaskDispatchProcessor)绑定到不同的消息类型上。

  4. 启动服务器:调用 Netty 的 bind(port).sync(),启动 BossGroup 和 WorkerGroup,监听客户端连接。

底层实现重点在 NettyRemotingServer.start(),核心伪码如下:

if (isStarted.compareAndSet(false, true)) {serverBootstrap.group(bossGroup, workGroup).channel(getServerSocketChannelClass()).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) {initNettyChannel(ch);}});serverBootstrap.bind(listenPort).sync();
}
  • BossGroup 负责接收连接,WorkerGroup 负责处理 I/O 读写。

  • 通过 ChannelInitializer 向 Pipeline 中注入解码器、编码器、心跳检测等 Handler。

3.2 Netty RPC Client(发送结果)

public void start() {this.nettyRemotingClient = new NettyRemotingClient(workerConfig.getWorkerRpcClientConfig());for (WorkerRpcProcessor processor : workerRpcProcessors) {this.nettyRemotingClient.registerProcessor(processor);}
}

NettyRemotingClient 构造时会初始化:

  • EventLoopGroup(Epoll 或 NIO)

  • Callback Executor:处理响应回调

  • Response Future Executor:扫描超时的未回包请求

其启动逻辑:

bootstrap.group(workerGroup).channel(getSocketChannelClass()).option(TCP_NODELAY, clientConfig.isTcpNoDelay()).handler(new ChannelInitializer<SocketChannel>() {public void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(...)).addLast(new NettyDecoder(), clientHandler, encoder);}});
responseFutureExecutor.scheduleWithFixedDelay(ResponseFuture::scanFutureTable, 0, 1, TimeUnit.SECONDS);
isStarted.set(true);
  • 连接池与重连策略:客户端在发送消息时会根据连接状态自动重连或重建 Channel。

  • 心跳机制:通过 IdleStateHandler 定期向 Master 发送心跳,保持连接活跃。

  • 超时重试ResponseFuture 定时扫描未收到回包的请求,触发超时失败或重试。


四、加载 Task 插件

Worker 支持多种 Task 类型(Shell、Spark、MapReduce……),这些逻辑都通过 SPI 插件化管理:

public void loadPlugin() {PrioritySPIFactory<TaskChannelFactory> factoryLoader= new PrioritySPIFactory<>(TaskChannelFactory.class);for (Map.Entry<String, TaskChannelFactory> entry : factoryLoader.getSPIMap().entrySet()) {String name = entry.getKey();TaskChannelFactory factory = entry.getValue();taskChannelFactoryMap.put(name, factory);taskChannelMap.put(name, factory.create());}
}
  1. PrioritySPIFactory 利用 ServiceLoader.load(spiClass) 扫描 classpath 下所有 META-INF/services/... 配置的实现。

  2. 冲突处理:若多个插件使用同一标识(t.getIdentify().getName()),则通过 resolveConflict 按优先级或版本选择。

  3. 实例化 Factory 并调用 create(),生成具体 TaskChannel,用于执行阶段构建执行命令、日志采集等。

插件加载完成后,Worker 便可根据 Task 类型动态路由到对应的执行实现。


五、注册中心客户端启动

Worker 需要向统一的注册中心(如 Zookeeper)注册自己的可用性信息,Master 才能发现并调度任务给它。

public void start() {registry();registryClient.addConnectionStateListener(new WorkerConnectionStateListener(workerConfig, strategy));
}
private void registry() {WorkerHeartBeat hb = workerHeartBeatTask.getHeartBeat();String path = workerConfig.getWorkerRegistryPath();registryClient.remove(path);registryClient.persistEphemeral(path, JSONUtils.toJsonString(hb));workerHeartBeatTask.start();
}
  • persistEphemeral:将自身地址、可用线程数等写入 ${registryRoot}/workers/${workerAddress},并由 ZK 管理生命周期。

  • 心跳定时任务workerHeartBeatTask.start() 定时刷新节点数据/TTL,保证长连接下的可用性信息及时更新。

  • 连接监听:若与注册中心断开,WorkerConnectionStateListener 可触发重连或优雅退出。


六、Worker 管理线程启动

Worker 启动后,用一个独立线程不断从“等待提交队列”中取出任务并提交给线程池。

public void run() {while (!ServerLifeCycleManager.isStopped()) {if (!ServerLifeCycleManager.isRunning()) {Thread.sleep(SLEEP_TIME);}if (getThreadPoolQueueSize() <= workerExecThreads) {WorkerDelayTaskExecuteRunnable task = waitSubmitQueue.take();workerExecService.submit(task);} else {incOverloadCount();Thread.sleep(SLEEP_TIME);}}
}
  • waitSubmitQueue:阻塞队列,存放待执行的 WorkerDelayTaskExecuteRunnable 实例。

  • 背压机制:当线程池队列已满(> workerExecThreads)时,先统计过载指标,再稍后重试。

  • workerExecService:底层是带监听能力的 ListeningExecutorService,执行完毕后可注册回调处理日志和结果汇总。


七、消息重试线程启动

Worker 执行过程中需要向 Master 回传任务执行结果、日志位点、心跳等消息。若网络抖动导致消息发送失败,需重试:

public void run() {while (!ServerLifeCycleManager.isStopped()) {if (needToRetryMessages.isEmpty()) {Thread.sleep(MESSAGE_RETRY_WINDOW);}long now = System.currentTimeMillis();for (entry in needToRetryMessages) {for (message in entry.getValue()) {if (now - message.getSendTime() > RETRY_WINDOW) {message.setSendTime(now);messageSenderMap.get(type).sendMessage(message);}}}Thread.sleep(SLEEP_TIME);}
}
  • needToRetryMessages:按 TaskInstanceId 分类的待重试消息列表。

  • 定时扫描:以 MESSAGE_RETRY_WINDOW(如 30 秒)为周期,判断消息是否超时,若超时则重新调用对应的 messageSender 去向 Master 派发。

  • 幂等与日志:每次重试都会刷新发送时间,并输出重试日志,保证 Master 端能最终收到消息或记录重试失败。


八、RPC 请求到任务执行的完整链路

  1. Master 通过 RPC 调用 WorkerRpcClient.sendMessageTaskDispatchRequest 发往指定 Worker。

  2. Worker Netty Server 的 NettyServerHandler.channelRead() 接收消息,调用 processReceived()

    pair = processors.get(msg.getType());
    pair.getRight().submit(() -> processor.process(channel, msg));
  3. WorkerTaskDispatchProcessor.process() 反序列化 TaskDispatchRequest,构建 TaskExecutionContext 并缓存。

  4. 检查是否需要延时执行(delayTime);若需延时,则先发送延时执行消息给 Master,跳过入队。

  5. 通过 WorkerTaskExecuteRunnableFactoryBuilder 构造 WorkerDelayTaskExecuteRunnable,并调用:

    if (!workerManager.offer(runnable)) {sendDispatchRejectResult();
    } else {sendDispatchSuccessResult();
    }
  6. workerManager.offer() 根据满载策略将 Runnable 放入 waitSubmitQueue,等待管理线程消费。

  7. 管理线程取出后提交给线程池执行,进入 WorkerDelayTaskExecuteRunnable.run(),真正执行 TaskChannel 的 execute()

  8. Task 执行过程中通过 workerRpcClient 向 Master 发送日志、状态更新等消息;若发送失败,加入 needToRetryMessages 由重试线程处理。

  9. Task 完成后,最终调用 messageSenderMap.get(TASK_EXECUTE_RESULT).sendMessage(resultMessage),Master 收到后更新实例状态。


九、总结

DolphinScheduler Worker 的启动流程,实际上是一套“轻量级调度节点”从初始化、服务注册、插件加载,到任务接收、入队执行、结果回传以及消息可靠性保障的完整闭环。

  • 可插拔:SPI 机制加载 TaskChannel 工厂,支持按需扩展。

  • 高并发:Netty + 线程池,实现主从分离、异步解耦。

  • 可靠性:注册中心心跳、RPC 心跳、消息重试,保证节点可用性和消息可靠交互。

  • 可监控:丰富的度量指标(队列长度、过载次数、重试次数),有助于运维监控和自动扩缩容。

http://www.dtcms.com/wzjs/397757.html

相关文章:

  • 周口规划建设局网站网络广告代理
  • 教育类网站首页设计模板沧州网站建设推广
  • 网页制作淘宝网站建设现在做网络推广好做吗
  • 合肥网站建设培训机构设计网站一般多少钱
  • 公司品牌网站建设网上宣传方法有哪些
  • 精湛的中山网站建设seo的工作内容
  • 优惠券网站是不是很难做营销策划思路
  • 企业公司网站建设国外新闻最新消息
  • 网上赚钱游戏天津seo优化公司哪家好
  • 金融培训网站源码如何注册网站怎么注册
  • 加强主流网站建设岳阳网站设计
  • 高端网站报价热狗seo优化外包
  • 推广策略方案佛山旺道seo
  • 有专门做ppt的网站搜什么关键词能找到网站
  • 企业网站建设空间网络软文写作
  • 松岗网站建设怎么做网站推广多少钱
  • 如何修改网站后台代码关键词优化营销
  • 东莞设计企业网站的有哪些成都seo优化推广
  • 用vs做音乐网站seo排名系统
  • .net core 做网站学网络运营在哪里学比较好
  • 网站维护费怎么做会计分录seo关键词排名优化
  • 网站改标题降权seo销售好做吗
  • 南京网站开发南京乐识正规搜索引擎优化策略包括
  • 做网站准备的资料优化步骤
  • 什么是网站建设的三次点击原则公司网站开发费用
  • 网站建设项目国内外分析报告年度关键词
  • 长沙制作网站公司吗怎么看百度关键词的搜索量
  • 深圳市营销型网站建设销售课程培训视频教程
  • 海口网站建设方案报价优化大师的优化项目有哪7个
  • 网上做分销代销哪个网站好优化方案的格式及范文