当前位置: 首页 > news >正文

Ignite(Apache Ignite)中计算与数据共置的核心概念和编程实践

以下内容是关于 Ignite(Apache Ignite) 中“计算与数据共置(Colocating Computations with Data)”的核心概念和编程实践。下面我将用通俗易懂的方式,结合技术原理,为你全面解析这段内容,帮助你真正“理解”它。


🔍 一、为什么要“计算与数据共置”?

传统方式的问题:

想象一下,你要统计100万条员工记录的平均工资。如果数据分布在多个节点上(分布式系统),而你的计算任务在一个节点上运行:

  • 你需要从所有节点把这100万条数据拉到这个节点;
  • 网络传输开销巨大;
  • 内存压力大;
  • 速度慢。

这就是所谓的 “移动数据” —— 效率低。


共置计算的理念:“移动计算,而不是移动数据”

把计算任务发送到数据所在的位置去执行,只把结果传回来。

✅ 好处:

  • 减少网络传输
  • 提高执行速度
  • 更好地利用集群资源

这正是 Ignite 所支持的核心优化策略。


🧠 二、Ignite 如何知道“数据在哪个节点”?

答案:Affinity Function(亲和性函数)

  • Ignite 使用 affinity function 来决定某个 key 的数据应该存储在哪个节点。
  • 它基于 key 的哈希值,映射到特定的主节点(primary node)或备份节点(backup nodes)。
  • 所以,在执行任务前,Ignite 可以通过 affinity() 算出某 key 或某 partition 在哪个节点上。

⚙️ 这是实现“计算与数据共置”的基础!


🛠️ 三、两种共置方式

1️⃣ 按 Key 共置(Colocating by Key)

方法:
compute.affinityRun("myCache", key, runnable);
compute.affinityCall("myCache", key, callable);
  • affinityRun: 无返回值的任务(比如打印日志)
  • affinityCall: 有返回值的任务(比如计算总和)
示例解释:
IgniteCache<Integer, String> cache = ignite.cache("myCache");IgniteCompute compute = ignite.compute();int key = 1;// This closure will execute on the remote node where
// data for the given 'key' is located.
compute.affinityRun("myCache", key, () -> {// Peek is a local memory lookup.System.out.println("Co-located [key= " + key + ", value= " + cache.localPeek(key) + ']');
});

✅ 执行过程:

  1. Ignite 调用 affinity("myCache", key) 找出 key 所在的节点;
  2. 把 lambda 函数(任务)发送到那个节点;
  3. 在该节点上直接访问本地缓存(.localPeek()),无需网络请求;
  4. 执行完后,任务结束。

📌 关键点:cache.localPeek()本地内存查找,非常快!


2️⃣ 按 Partition 共置(Colocating by Partition)

为什么需要?

有时候你要处理多个 key,但不想为每个 key 都发一次任务。如果你知道这些 key 属于同一个 partition,就可以一次性处理一批。

Ignite 默认有 1024 个 partitions。每个 partition 包含一组 keys。

方法:
compute.affinityCall(cacheNames, partId, job);
compute.affinityRun(cacheNames, partId, job);
实际例子:计算多个员工的平均工资
// this task sums up the values of the salary field for the given set of keys
private static class SumTask implements IgniteCallable<BigDecimal> {private Set<Long> keys;public SumTask(Set<Long> keys) {this.keys = keys;}@IgniteInstanceResourceprivate Ignite ignite;@Overridepublic BigDecimal call() throws Exception {IgniteCache<Long, BinaryObject> cache = ignite.cache("person").withKeepBinary();BigDecimal sum = new BigDecimal(0);for (long k : keys) {BinaryObject person = cache.localPeek(k, CachePeekMode.PRIMARY);if (person != null)sum = sum.add(new BigDecimal((float) person.field("salary")));}return sum;}
}public static void calculateAverage(Ignite ignite, Set<Long> keys) {// get the affinity function configured for the cacheAffinity<Long> affinityFunc = ignite.affinity("person");// this map stores collections of keys for each partitionHashMap<Integer, Set<Long>> partMap = new HashMap<>();keys.forEach(k -> {int partId = affinityFunc.partition(k);Set<Long> keysByPartition = partMap.computeIfAbsent(partId, key -> new HashSet<Long>());keysByPartition.add(k);});BigDecimal total = new BigDecimal(0);IgniteCompute compute = ignite.compute();List<String> caches = Arrays.asList("person");// iterate over all partitionsfor (Map.Entry<Integer, Set<Long>> pair : partMap.entrySet()) {// send a task that gets specific keys for the partitionBigDecimal sum = compute.affinityCall(caches, pair.getKey().intValue(), new SumTask(pair.getValue()));total = total.add(sum);}System.out.println("the average salary is " + total.floatValue() / keys.size());
}

✅ 优势:

  • 最多只有 1024 个任务(等于 partition 数量),而不是成千上万个 key 对应的任务;
  • 每个任务都在本地处理一批数据,高效且并行。

📌 SumTask 类必须能在远程节点上找到(见下文类加载问题)。


✅ 拓展:处理整个缓存的数据

如果你想扫描整个缓存做统计(如总工资),可以遍历所有 partitions:

for (int partId = 0; partId < affinity.partitions(); partId++) {compute.affinityCall(caches, partId, new SumByPartitionTask(partId));
}

每个任务处理一个 partition 的全部数据,使用 ScanQuery(partId).setLocal(true) 保证只查本地数据。

// this task sums up the value of the 'salary' field for all objects stored in
// the given partition
public static class SumByPartitionTask implements IgniteCallable<BigDecimal> {private int partId;public SumByPartitionTask(int partId) {this.partId = partId;}@IgniteInstanceResourceprivate Ignite ignite;@Overridepublic BigDecimal call() throws Exception {// use binary objects to avoid deserializationIgniteCache<Long, BinaryObject> cache = ignite.cache("person").withKeepBinary();BigDecimal total = new BigDecimal(0);try (QueryCursor<Cache.Entry<Long, BinaryObject>> cursor = cache.query(new ScanQuery<Long, BinaryObject>(partId).setLocal(true))) {for (Cache.Entry<Long, BinaryObject> entry : cursor) {total = total.add(new BigDecimal((float) entry.getValue().field("salary")));}}return total;}
}

⚙️ 四、类加载问题:远程任务如何被执行?

你在本地提交了一个 SumTask 对象,但它要在远程节点上运行 —— 那个节点怎么知道 SumTask 是什么?

两种解决方案:

方式说明
1. 加入 classpathSumTask.class 放到所有节点的启动 classpath 中(如 jar 包)
2. Peer Class Loading启用对等类加载(默认开启),Ignite 自动把类从客户端传到服务端节点

⚠️ 生产环境建议关闭 peer class loading,使用稳定的部署包。


💡 五、Entry Processor:更轻量的本地操作

除了 affinityCall/Run,Ignite 还提供另一种共置机制:Entry Processor

特点:

  • 直接在缓存 entry 上执行操作(比如更新、计数)
  • 不需要序列化整个对象
  • 原子性:基于 key 加锁,线程安全

示例:原子递增计数器

cache.invoke("mykey", (entry, args) -> {Integer val = entry.getValue();entry.setValue(val == null ? 1 : val + 1);return null;
});

✅ 执行流程:

  1. 定位 "mykey" 所在节点;
  2. 在那个节点上获取该 entry 的锁;
  3. 执行 lambda(更新值);
  4. 释放锁。

📌 适合:小粒度、高频更新操作(如点赞数、库存扣减)


📊 六、性能考虑(Performance Considerations)

场景推荐方式
数据量小(< 1000 条)Scan Query 可能更快
数据量大优先使用 affinityCall/Run 分区处理
高频单 key 更新使用 Entry Processor
批量处理大量 key按 partition 分组 + affinityCall

✅ 核心思想:越靠近数据,性能越高


✅ 总结:一句话理解“计算与数据共置”

不要把大象搬到厨房,而是把厨房搬到大象旁边。

在分布式系统中:

  • “大象” = 大量数据
  • “厨房” = 计算逻辑
  • 正确做法 = 把计算代码发到数据所在的节点执行

🧩 类比理解(生活例子)

比喻对应技术
图书馆查资料你要写论文(计算任务)
把所有书搬到你家看传统方式:拉取数据到客户端处理
你去图书馆,在阅览室当场查阅并记下结论共置计算:任务去数据所在节点执行
多个分馆(分区)每个分馆处理一部分书(partition)
管理员帮你定位哪本书在哪个馆Affinity Function
你只带回笔记(结果)只传输结果,不传原始数据

📚 延伸建议

  1. 学习 Affinity Function 配置:可自定义数据分布规则(如按地区分片)
  2. 结合 MapReduce 模式affinityCall 是 map 阶段的理想选择
  3. 监控任务执行位置:确保任务确实运行在预期节点上
  4. 避免大任务阻塞:长时间任务会影响节点响应,考虑拆分或异步

http://www.dtcms.com/a/305092.html

相关文章:

  • 小程序视频播放,与父视图一致等样式设置
  • Electron将视频文件单独打包成asar并调用
  • 如何在Linux系统下进行C语言程序的编写和debug测试
  • 解锁全球数据:Bright Data MCP 智能解决代理访问难题
  • 三极管、MOS 管、CMOS 管的特点、属性及综合对比
  • DAY27 函数专题2:装饰器
  • 【算法训练营Day18】二叉树part8
  • BOSMA博冠推出8K广播级讯道摄像机DC0300 EFP
  • 项目开发需求管理
  • 项目目标如何设定?遵循的主要原则分析
  • unity 使用PropertyDrawer 在Inspector 面板上自定义字段的显示方式
  • Android User版本默认用test-keys,如何改用release-keys
  • IDDR原语基本使用
  • 【三桥君】AI技术发展下,单智能体局限性凸显,如何通过MCP和A2A协议实现智能体团队协作转变?
  • Day 25:异常处理
  • GitLab的安装及使用
  • 嵌入式第十四课!!!指针在字符数组的应用与数组指针
  • 【Lua】题目小练3
  • 13、select_points_object_model_3d解析
  • Excel制作滑珠图、哑铃图
  • HCIP--MGRE综合实验
  • 从0到1学PHP(五):PHP 数组:高效存储与处理数据
  • C#_ArrayList动态数组
  • 【C#获取高精度时间】
  • 同一个局域网段,如何实现所有用户都可以访问本地mysql数据库?
  • 理解Transformer解码器
  • 【三桥君】企业级AI应用需要系统工程支撑,如何通过MCP大模型架构实现全链路实战解构?
  • 1 RAG三问
  • Javaweb————HTTP请求头属性讲解
  • ​第七篇:Python数据库编程与ORM实践