当前位置: 首页 > wzjs >正文

网站到期查询wordpress商品展示模板下载

网站到期查询,wordpress商品展示模板下载,关键词林俊杰在线听免费,在哪个网站可以免费做广告目录 一、概览 二、Worker 启动入口 run() 三、启动 RPC 服务 3.1 Netty RPC Server(接收请求) 3.2 Netty RPC Client(发送结果) 四、加载 Task 插件 五、注册中心客户端启动 六、Worker 管理线程启动 七、消息重试线程启…

目录

一、概览

二、Worker 启动入口 run()

三、启动 RPC 服务

3.1 Netty RPC Server(接收请求)

3.2 Netty RPC Client(发送结果)

四、加载 Task 插件

五、注册中心客户端启动

六、Worker 管理线程启动

七、消息重试线程启动

八、RPC 请求到任务执行的完整链路

九、总结


一、概览

在 DolphinScheduler 中,Worker 负责从 Master 接收调度命令、执行具体的 Task,并将执行结果通过消息回传给 Master。Worker 在 JVM 中启动后,需要依次完成以下关键组件的初始化和启动:

  • Netty RPC 服务端(workerRpcServer)和客户端(workerRpcClient

  • 任务插件管理器(taskPluginManager

  • 注册中心客户端(workerRegistryClient

  • Worker 管理线程(调度任务分发)

  • 消息重试线程(保证消息可靠性)

最终,Worker 进入“消息接收 → 任务封装 → 入队等待 → 线程池执行 → 结果回传”的循环流程,持续响应 Master 的分发请求并执行任务。


二、Worker 启动入口 run()

Worker 的入口方法标记为 @PostConstruct,即在 Spring 容器完成 Bean 注入后自动调用。其源码如下:

@PostConstruct
public void run() {// 1. 启动 rpc 服务this.workerRpcServer.start();this.workerRpcClient.start();// 2. 加载插件this.taskPluginManager.loadPlugin();// 3. 启动注册中心客户端this.workerRegistryClient.setRegistryStoppable(this);this.workerRegistryClient.start();// 4. 启动管理线程this.workerManagerThread.start();// 5. 启动消息重试线程this.messageRetryRunner.start();
}

第1步:启动 RPC 服务(服务端 + 客户端),用于接收 Master 下发的分发消息以及向 Master 发送执行结果。 – 第2步:通过 SPI 加载 User 自定义或内置的 TaskChannelFactory,实现对不同 Task 类型的支持。 – 第3步:启动注册中心客户端,将自己注册到 Zookeeper(或其他注册中心)上,以便 Master 可发现并下发任务。 – 第4步:启动 Worker 管理线程,用于不断从等待队列取出任务并提交给线程池执行。 – 第5步:启动消息重试线程,负责对由于网络抖动或 Master 未及时 ACK 而需要重试发送的消息进行重发。


三、启动 RPC 服务

3.1 Netty RPC Server(接收请求)

public void start() {NettyServerConfig serverConfig = workerConfig.getWorkerRpcServerConfig();serverConfig.setListenPort(workerConfig.getListenPort());nettyRemotingServer = new NettyRemotingServer(serverConfig);for (WorkerRpcProcessor processor : workerRpcProcessors) {nettyRemotingServer.registerProcessor(processor);}this.nettyRemotingServer.start();
}
  1. 读取配置:从 workerConfig 中获取 RPC 服务端配置,包括端口、线程池大小等。

  2. 构造 NettyRemotingServer:底层基于 Netty 封装的服务器类,用于接收网络消息。

  3. 注册 Processor:通过 nettyRemotingServer.registerProcessor 将各类消息处理器(如 WorkerTaskDispatchProcessor)绑定到不同的消息类型上。

  4. 启动服务器:调用 Netty 的 bind(port).sync(),启动 BossGroup 和 WorkerGroup,监听客户端连接。

底层实现重点在 NettyRemotingServer.start(),核心伪码如下:

if (isStarted.compareAndSet(false, true)) {serverBootstrap.group(bossGroup, workGroup).channel(getServerSocketChannelClass()).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) {initNettyChannel(ch);}});serverBootstrap.bind(listenPort).sync();
}
  • BossGroup 负责接收连接,WorkerGroup 负责处理 I/O 读写。

  • 通过 ChannelInitializer 向 Pipeline 中注入解码器、编码器、心跳检测等 Handler。

3.2 Netty RPC Client(发送结果)

public void start() {this.nettyRemotingClient = new NettyRemotingClient(workerConfig.getWorkerRpcClientConfig());for (WorkerRpcProcessor processor : workerRpcProcessors) {this.nettyRemotingClient.registerProcessor(processor);}
}

NettyRemotingClient 构造时会初始化:

  • EventLoopGroup(Epoll 或 NIO)

  • Callback Executor:处理响应回调

  • Response Future Executor:扫描超时的未回包请求

其启动逻辑:

bootstrap.group(workerGroup).channel(getSocketChannelClass()).option(TCP_NODELAY, clientConfig.isTcpNoDelay()).handler(new ChannelInitializer<SocketChannel>() {public void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(...)).addLast(new NettyDecoder(), clientHandler, encoder);}});
responseFutureExecutor.scheduleWithFixedDelay(ResponseFuture::scanFutureTable, 0, 1, TimeUnit.SECONDS);
isStarted.set(true);
  • 连接池与重连策略:客户端在发送消息时会根据连接状态自动重连或重建 Channel。

  • 心跳机制:通过 IdleStateHandler 定期向 Master 发送心跳,保持连接活跃。

  • 超时重试ResponseFuture 定时扫描未收到回包的请求,触发超时失败或重试。


四、加载 Task 插件

Worker 支持多种 Task 类型(Shell、Spark、MapReduce……),这些逻辑都通过 SPI 插件化管理:

public void loadPlugin() {PrioritySPIFactory<TaskChannelFactory> factoryLoader= new PrioritySPIFactory<>(TaskChannelFactory.class);for (Map.Entry<String, TaskChannelFactory> entry : factoryLoader.getSPIMap().entrySet()) {String name = entry.getKey();TaskChannelFactory factory = entry.getValue();taskChannelFactoryMap.put(name, factory);taskChannelMap.put(name, factory.create());}
}
  1. PrioritySPIFactory 利用 ServiceLoader.load(spiClass) 扫描 classpath 下所有 META-INF/services/... 配置的实现。

  2. 冲突处理:若多个插件使用同一标识(t.getIdentify().getName()),则通过 resolveConflict 按优先级或版本选择。

  3. 实例化 Factory 并调用 create(),生成具体 TaskChannel,用于执行阶段构建执行命令、日志采集等。

插件加载完成后,Worker 便可根据 Task 类型动态路由到对应的执行实现。


五、注册中心客户端启动

Worker 需要向统一的注册中心(如 Zookeeper)注册自己的可用性信息,Master 才能发现并调度任务给它。

public void start() {registry();registryClient.addConnectionStateListener(new WorkerConnectionStateListener(workerConfig, strategy));
}
private void registry() {WorkerHeartBeat hb = workerHeartBeatTask.getHeartBeat();String path = workerConfig.getWorkerRegistryPath();registryClient.remove(path);registryClient.persistEphemeral(path, JSONUtils.toJsonString(hb));workerHeartBeatTask.start();
}
  • persistEphemeral:将自身地址、可用线程数等写入 ${registryRoot}/workers/${workerAddress},并由 ZK 管理生命周期。

  • 心跳定时任务workerHeartBeatTask.start() 定时刷新节点数据/TTL,保证长连接下的可用性信息及时更新。

  • 连接监听:若与注册中心断开,WorkerConnectionStateListener 可触发重连或优雅退出。


六、Worker 管理线程启动

Worker 启动后,用一个独立线程不断从“等待提交队列”中取出任务并提交给线程池。

public void run() {while (!ServerLifeCycleManager.isStopped()) {if (!ServerLifeCycleManager.isRunning()) {Thread.sleep(SLEEP_TIME);}if (getThreadPoolQueueSize() <= workerExecThreads) {WorkerDelayTaskExecuteRunnable task = waitSubmitQueue.take();workerExecService.submit(task);} else {incOverloadCount();Thread.sleep(SLEEP_TIME);}}
}
  • waitSubmitQueue:阻塞队列,存放待执行的 WorkerDelayTaskExecuteRunnable 实例。

  • 背压机制:当线程池队列已满(> workerExecThreads)时,先统计过载指标,再稍后重试。

  • workerExecService:底层是带监听能力的 ListeningExecutorService,执行完毕后可注册回调处理日志和结果汇总。


七、消息重试线程启动

Worker 执行过程中需要向 Master 回传任务执行结果、日志位点、心跳等消息。若网络抖动导致消息发送失败,需重试:

public void run() {while (!ServerLifeCycleManager.isStopped()) {if (needToRetryMessages.isEmpty()) {Thread.sleep(MESSAGE_RETRY_WINDOW);}long now = System.currentTimeMillis();for (entry in needToRetryMessages) {for (message in entry.getValue()) {if (now - message.getSendTime() > RETRY_WINDOW) {message.setSendTime(now);messageSenderMap.get(type).sendMessage(message);}}}Thread.sleep(SLEEP_TIME);}
}
  • needToRetryMessages:按 TaskInstanceId 分类的待重试消息列表。

  • 定时扫描:以 MESSAGE_RETRY_WINDOW(如 30 秒)为周期,判断消息是否超时,若超时则重新调用对应的 messageSender 去向 Master 派发。

  • 幂等与日志:每次重试都会刷新发送时间,并输出重试日志,保证 Master 端能最终收到消息或记录重试失败。


八、RPC 请求到任务执行的完整链路

  1. Master 通过 RPC 调用 WorkerRpcClient.sendMessageTaskDispatchRequest 发往指定 Worker。

  2. Worker Netty Server 的 NettyServerHandler.channelRead() 接收消息,调用 processReceived()

    pair = processors.get(msg.getType());
    pair.getRight().submit(() -> processor.process(channel, msg));
  3. WorkerTaskDispatchProcessor.process() 反序列化 TaskDispatchRequest,构建 TaskExecutionContext 并缓存。

  4. 检查是否需要延时执行(delayTime);若需延时,则先发送延时执行消息给 Master,跳过入队。

  5. 通过 WorkerTaskExecuteRunnableFactoryBuilder 构造 WorkerDelayTaskExecuteRunnable,并调用:

    if (!workerManager.offer(runnable)) {sendDispatchRejectResult();
    } else {sendDispatchSuccessResult();
    }
  6. workerManager.offer() 根据满载策略将 Runnable 放入 waitSubmitQueue,等待管理线程消费。

  7. 管理线程取出后提交给线程池执行,进入 WorkerDelayTaskExecuteRunnable.run(),真正执行 TaskChannel 的 execute()

  8. Task 执行过程中通过 workerRpcClient 向 Master 发送日志、状态更新等消息;若发送失败,加入 needToRetryMessages 由重试线程处理。

  9. Task 完成后,最终调用 messageSenderMap.get(TASK_EXECUTE_RESULT).sendMessage(resultMessage),Master 收到后更新实例状态。


九、总结

DolphinScheduler Worker 的启动流程,实际上是一套“轻量级调度节点”从初始化、服务注册、插件加载,到任务接收、入队执行、结果回传以及消息可靠性保障的完整闭环。

  • 可插拔:SPI 机制加载 TaskChannel 工厂,支持按需扩展。

  • 高并发:Netty + 线程池,实现主从分离、异步解耦。

  • 可靠性:注册中心心跳、RPC 心跳、消息重试,保证节点可用性和消息可靠交互。

  • 可监控:丰富的度量指标(队列长度、过载次数、重试次数),有助于运维监控和自动扩缩容。

http://www.dtcms.com/wzjs/792555.html

相关文章:

  • 好的 做网站的软件公司苏州app开发定制
  • 大连模板网站制作推荐深圳品牌创意网站建设
  • 网站建设的网络公司wordpress淘口令插件
  • 网站群集建设建网站公司 优帮云
  • 网站建设通有的网站网速慢
  • 微信视频网站怎么做的好浙江省建设厅网站地址
  • 盐城企业做网站多少钱星链友店
  • 做包装设计的网站有哪些网页版梦幻西游36天罡攻略
  • 用爱奇艺会员做视频网站违法吗wordpress网站速度检测
  • 哪里查网站备案信息远象建设 网站
  • 遵义网站建设哪家强淘宝美工与网站开发
  • 泉州建设网站公司哪家好药店网站源码
  • 关于做网站的ppt温州网站制作价格
  • 快手秒赞秒评网站推广微信怎么做推广
  • phpstudy怎么做网站购买域名之后怎么做网站
  • 深圳 做公司网站东莞seo托管
  • mysql 网站开发 问好网站建设承诺
  • 长春网站建设手机版网站导入链接
  • 网站与网络的区别网站在建设中模板
  • 网站自己怎么制作郑州艾特网站建设
  • 免费h5模板网站模板网站快慢由什么决定
  • 自己网站做第三方支付临沂市建设官方网站
  • 网站建设自查自评xml网站地图生成器
  • 榆林做网站获客平台有哪些
  • 深圳网站建设电话济南网站建设加q479185700
  • 网站建设的要点是什么建筑网建设通
  • 门窗设计软件免费版论坛与网站做优化哪个更好
  • 网站源码论坛有什么做外贸的网站
  • 大连做网站哪家好一点2018网站建设高考成绩查询
  • 网站设计风格有哪几种网站建设合同封面模板