当前位置: 首页 > news >正文

Apache Ignite 核心组件:GridClosureProcessor解析

这是一个 Apache Ignite 中非常核心的组件 —— GridClosureProcessor,它是 分布式闭包(Closure)执行的调度中枢,负责在集群节点上异步执行用户提交的任务(如 RunnableClosure)。

我们来逐层深入理解它的设计思想、关键机制和代码逻辑。


🧱 一、类概览:GridClosureProcessor

public class GridClosureProcessor extends GridProcessorAdapter
  • 职责:处理所有基于闭包(函数式)的远程执行请求
  • 常见用途:
    • compute().run(Runnable)
    • compute().call(Closure)
    • compute().broadcast(Closure)
    • cache().affinity().run(...)
  • 它是 ComputeTask 的底层支撑模块

🔩 二、关键字段解析

字段类型作用
poolsPoolProcessor线程池管理器,用于获取执行任务的线程池
busyLockGridSpinReadWriteLock控制组件在 停止期间不接受新任务
stoppingboolean标记当前处理器是否正在停止

⚠️ 这三个字段共同实现了 “优雅关闭” 的核心逻辑。


🔒 三、busyLock:优雅关闭的关键机制

1. 什么是 GridSpinReadWriteLock

  • Ignite 自定义的 自旋读写锁
  • 特点:
    • 读锁可重入、允许多个线程同时持有
    • 写锁独占,用于“停止”阶段
    • 使用 自旋 + sleep 避免线程频繁阻塞唤醒

2. 读锁(readLock()):

  • 所有任务提交方法(runAsync, callAsync, broadcast)都先获取读锁
  • 表示:“我正在使用这个处理器”
  • 允许多个线程并发提交任务

3. 写锁(tryWriteLock(...)):

  • onKernalStop(...) 中使用
  • 目的:阻止任何新任务提交,并标记为“停止中”

🛑 四、onKernalStop(...):优雅关闭流程

@Override
public void onKernalStop(boolean cancel) {boolean interrupted = false;while (true) {try {if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))break;elseThread.sleep(200);}catch (InterruptedException ignore) {interrupted = true;}}try {if (interrupted)Thread.currentThread().interrupt();stopping = true; // 标记为停止状态}finally {busyLock.writeUnlock();}
}

🔍 流程详解:

  1. 尝试获取写锁

    • tryWriteLock(200ms):尝试在 200ms 内获取写锁
    • 如果有线程持有读锁(即正在提交任务),则失败
    • 失败后 Thread.sleep(200),然后重试
  2. 为什么是“Busy Wait”?

    • 注解 @SuppressWarnings("BusyWait") 表示这是有意为之的忙等待
    • 目的:尽快完成关闭,避免长时间阻塞
    • 每 200ms 尝试一次,不会过度消耗 CPU
  3. 处理中断

    • 如果等待期间被中断,记录 interrupted = true
    • 最后恢复中断状态(线程安全最佳实践)
  4. 设置 stopping = true

    • 获取写锁后,设置标志位
    • 之后所有 runAsync 等调用都会被拒绝
  5. 释放写锁

    • 即使发生异常,也确保释放锁

✅ 这是一个典型的 “关闭守卫”模式:先阻止新请求,再清理资源。


🚀 五、任务提交方法分析

所有任务提交方法都遵循统一模式:

busyLock.readLock();
try {if (stopping) reject();// 提交任务
} finally {busyLock.readUnlock();
}

我们以 runAsync(...) 为例:

runAsync(...):运行一批 Runnable

public ComputeTaskInternalFuture<?> runAsync(...) {assert mode != null;assert !F.isEmpty(jobs);busyLock.readLock(); // 获取读锁try {if (stopping) {return finishedFuture(new IgniteCheckedException("Closure processor cannot be used on stopped grid"));}if (F.isEmpty(nodes))return finishedFuture(U.emptyTopologyException());ctx.task().setThreadContext(TC_SUBGRID, nodes);return ctx.task().execute(new T1(mode, jobs), null, sys, execName);}finally {busyLock.readUnlock(); // 释放读锁}
}
关键点:
  • stopping 检查:如果正在停止,直接返回失败 future
  • nodes 检查:拓扑为空则返回空拓扑异常
  • ctx.task().execute(...):交给 TaskProcessor 执行(T1 是一个内部任务类型)
  • 使用 sys 参数决定使用 系统线程池 还是 公共线程池

callAsync(...):远程调用 Closure

public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, T arg, ...)
  • 执行一个带返回值的函数(Closure<T,R>
  • 返回 ComputeTaskInternalFuture<R>,可获取结果

broadcast(...):广播到所有节点

public <T, R> IgniteInternalFuture<Collection<R>> broadcast(...)
  • nodes 列表中的每个节点上执行 job
  • 返回一个 Future<Collection<R>>,包含所有节点的返回值

affinityRun(...):基于数据亲和性执行

public ComputeTaskInternalFuture<?> affinityRun(...)
  • 关键用途:将任务发送到 特定缓存分区(partition)的主节点
  • 流程:
    1. 获取当前拓扑版本 readyAffinityVersion()
    2. 使用 ctx.affinity().mapPartitionToNode(...) 找到负责该分区的节点
    3. 只在那个节点上执行任务
  • 优势:本地化执行,避免数据移动,性能极高

💡 这是 Ignite 实现“移动计算而非数据”的核心机制之一。


🧩 六、T1, T8, T11, T4 是什么?

这些是 内部任务类(定义在 GridTaskInternalFuture 或内部类中),用于包装用户任务:

任务类包装的任务类型
T1GridClosureCallMode + Collection<Runnable>
T8IgniteClosure<T,R>
T11Broadcast 任务
T4Affinity 任务

它们都继承自 ComputeTaskAdapter,由 TaskProcessor 调度执行。


🎯 七、整体架构图(简化)

+---------------------+
|  User Code          |
|  compute().run(...) |
+----------+----------+|v
+---------------------+
| GridClosureProcessor|
| - busyLock          |
| - stopping          |
+----------+----------+|v
+---------------------+
|  TaskProcessor      |
|  execute(Task)      |
+----------+----------+|v
+---------------------+
|  PoolProcessor      |
|  系统/公共线程池     |
+---------------------+

✅ 八、设计亮点总结

特性说明
读写锁控制关闭读锁允许多任务并发提交,写锁确保关闭时原子性
优雅拒绝新任务stopping 标志 + finishedFuture 快速失败
支持多种执行模式单节点、广播、亲和性执行
与 Task 子系统集成复用 TaskProcessor 的调度能力
线程安全所有提交路径都受锁保护
可观测性调试日志、异常信息清晰

📌 九、一句话总结

GridClosureProcessor 是 Ignite 的 分布式任务调度入口,它通过 读写锁机制 实现了 高并发提交 + 优雅关闭,并支持 普通执行、广播、数据亲和性执行 等多种模式,是 Compute 子系统的核心引擎。


💡 十、你可以借鉴的设计模式

1. 关闭守卫模式(Shutdown Guard)

private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private volatile boolean shuttingDown = false;public void submit(Runnable task) {shutdownLock.readLock().lock();try {if (shuttingDown) throw new RejectedExecutionException();// 执行任务} finally {shutdownLock.readLock().unlock();}
}public void shutdown() {shutdownLock.writeLock().lock();try {shuttingDown = true;} finally {shutdownLock.writeLock().unlock();}
}

2. 快速失败(Fail-Fast)

  • 不让任务进入队列,而是在入口就拒绝
  • 返回一个“已完成的失败 Future”,避免资源浪费

🏁 结语

GridClosureProcessor 虽然代码量不大,但它体现了分布式系统中 资源管理、并发控制、生命周期管理 的最佳实践。理解它,有助于你设计自己的 高可用、可扩展的任务调度系统

http://www.dtcms.com/a/325803.html

相关文章:

  • ChatML vs Harmony:深度解析OpenAI全新对话结构格式的变化
  • 基于Spring Boot房源信息推荐系统的设计与实现 -项目分享
  • Maven <pom.xml> 标签详尽教程
  • perl notes【1】
  • 云原生环境Prometheus企业级监控
  • 【Node.js从 0 到 1:入门实战与项目驱动】1.3 Node.js 的应用场景(附案例与代码实现)
  • 论文阅读:Aircraft Trajectory Prediction Model Based on Improved GRU Structure
  • 《开源标准推动Linux驱动生态繁荣》
  • 实现分页功能【jQuery】
  • GDB调试 core dump 文件与栈溢出分析
  • 《Python入门:从零到Hello World的极简指南》
  • 板子 7.20--8.11
  • Spring Boot 参数校验 Validation 入门
  • 华为云计算的行业趋势:迈向智能、融合与绿色的未来
  • 【工控】线扫相机小结 第六篇
  • 用vscode 里docker显示不出有容器和镜像 ?
  • 通用 maven 私服 settings.xml 多源配置文件(多个仓库优先级配置)
  • SQL179 每个6/7级用户活跃情况
  • 十一、Linux Shell脚本:函数与模块化
  • 逃离城市与喧嚣,拥抱新的生活方式
  • 开博尔雷电5数据线:120Gbps“闪电传输”,以Intel硬核基因从容优化数字生活
  • 【SpringBoot】持久层 sql 注入问题
  • C/C++练习面试题
  • PyTorch基础(使用Numpy实现机器学习)
  • PyTorch基础(使用Tensor及Antograd实现机器学习)
  • OCSSA-VMD-Transformer轴承故障诊断,特征提取+编码器!
  • cs的搭建和使用
  • 力扣-153.寻找旋转排序数组中的最小值
  • Kubernetes-核心概念
  • 2438. 二的幂数组中查询范围内的乘积