鸿蒙分布式计算实战:用 ArkTS+Worker 池落地可运行任务管理 Demo,从单设备到跨设备全方案
摘要(介绍目前的背景和现状)
端侧算力越来越强,设备也越来越多:手机、平板、手表、车机、智慧屏……把这些设备“串成一台分布式计算机”,在边缘完成更低时延、更高隐私的数据处理,是很多应用的刚需(比如大图处理、语音转写、日志分析、模型微调)。但一旦上分布式,任务调度、负载均衡、失败重试、状态监控、结果合并就变成工程核心。本文提供一套“能跑”的参考实现,并给出扩展到跨设备(SoftBus / DeviceManager / RPC)的落地思路。
引言(介绍目前的发展情况和场景应用)
鸿蒙具备分布式能力:跨设备发现与连接(DeviceManager/SoftBus)、跨端数据(分布式数据对象/分布式文件)、跨端调用(RPC/跨设备拉起)。这些原语让我们可以在应用层实现一套自定义的分布式调度器。本文先在单设备里用 Worker 池模拟“多个计算节点”,把核心调度/容错/监控机制做扎实;随后给出如何替换为跨设备通信与执行的对接点。
总体设计
功能目标与模块划分
- 任务调度器:基于优先级 + 资源需求 + 节点状态,动态分配任务
- 任务分解与合并:Map-Reduce 风格,把大任务拆成子任务并行,完成后合并
- 负载均衡:加权轮询 + 实时负载反馈,避免热节点过载
- 状态监控:任务生命周期(待分配/执行中/已完成/失败/迁移),节点心跳
- 故障恢复:节点超时/失败自动回收任务并迁移
- 结果收集与整合:流式归并,边算边合并,降低尾延迟
关键数据结构
Task
:描述一个可拆分的大任务,包含优先级、拆分器、合并器等SubTask
:子任务元数据(输入分片、资源需求、超时、重试次数)Node
:一个计算节点(Demo 用 Worker 模拟;真实环境可映射到设备或远端 Ability)Scheduler
:维护任务优先队列、分配策略、监控与容错
核心代码示例(可运行 Demo)
说明:下面是一个最小可运行的 ArkTS Demo 结构。它在单设备内用 Worker 池模拟多个“分布式节点”,支持任务拆分、调度、监控、故障迁移和结果合并。你可以在 DevEco Studio 新建 Stage 工程,把这些文件放进对应目录即可运行。
真跨设备时,只需把WorkerNode
的通信层替换为 SoftBus/RPC,即可对接真实节点。
类型与工具(/common/types.ts
)
// /common/types.ts
export type Priority = 'HIGH' | 'MEDIUM' | 'LOW';export interface ResourceHint {cpuCost?: number; // 预估CPU开销(相对值)memCost?: number; // 预估内存开销(相对值)deadlineMs?: number;timeoutMs?: number; // 子任务超时
}export interface SubTask<Input, Output> {id: string;taskId: string;shardIndex: number;input: Input;priority: Priority;resource: ResourceHint;retries: number;maxRetries: number;createdAt: number;
}export type SubTaskStatus = 'PENDING' | 'RUNNING' | 'SUCCEEDED' | 'FAILED' | 'CANCELLED';export interface SubTaskResult<Output> {subTaskId: string;status: SubTaskStatus;output?: Output;error?: string;durationMs?: number;nodeId?: string;
}export interface TaskSpec<Input, MapOut, ReduceOut> {id: string;name: string;priority: Priority;input: Input;// 拆分器:把大任务拆成多个子任务输入splitter: (input: Input) => Input[];// map计算函数签名(放到节点执行)// 在真实分布式时,这段逻辑位于节点进程/设备侧mapperFuncName: string;// 合并器:把多个子结果合成最终结果reducer: (partials: MapOut[]) => ReduceOut;resource?: ResourceHint;
}export interface NodeMetrics {nodeId: string;running: number;capacity: number;avgLatencyMs: number;lastHeartbeat: number;healthy: boolean;
}export interface NodeLike {id: string;capacity: number; // 可并发处理的子任务数量post(subTask: any): Promise<SubTaskResult<any>>;healthy(): boolean;metrics(): NodeMetrics;stop(): void;
}
简单事件总线(/common/bus.ts
)
// /common/bus.ts
type Handler = (...args: any[]) => void;export class EventBus {private map = new Map<string, Set<Handler>>();on(evt: string, fn: Handler) {if (!this.map.has(evt)) this.map.set(evt, new Set());this.map.get(evt)!.add(fn);}off(evt: string, fn: Handler) {this.map.get(evt)?.delete(fn);}emit(evt: string, ...args: any[]) {this.map.get(evt)?.forEach(fn => fn(...args));}
}
export const bus = new EventBus();
Worker 节点实现(/workers/node.ts
)
// /workers/node.ts
// 这是 Worker 线程脚本,负责实际执行“子任务的 mapper 函数”
// 注意:Worker 文件需在 module.json5 里声明(见文末说明)import worker from '@ohos.worker';const globalMapperRegistry: Record<string, (input: any) => any> = {// 在这里注册可执行的 mapper 函数(Demo 里放几个例子)'heavyPrimeCount': (n: number) => {// 计算 <= n 的素数个数,模拟 CPU 密集任务const isPrime = (x: number) => {if (x < 2) return false;for (let i=2;i*i<=x;i++) if (x % i === 0) return false;return true;};let cnt = 0;for (let i=2;i<=n;i++) if (isPrime(i)) cnt++;return cnt;},'wordCount': (text: string) => {const map = new Map<string, number>();text.split(/\s+/).forEach(w => {if (!w) return;const key = w.toLowerCase();map.set(key, (map.get(key) || 0) + 1);});// 返回普通对象,避免 Map 的结构化拷贝问题const obj: Record<string, number> = {};map.forEach((v,k) => obj[k] = v);return obj;},'matrixRowMul': (payload: { row: number[], col: number[] }) => {const { row, col } = payload;let sum = 0;for (let i = 0; i < row.length; i++) sum += row[i] * col[i];return sum;}
};const parent = worker.parentPort;parent?.onmessage = async (evt) => {const { subTaskId, mapperFuncName, input, startedAt } = evt.data;const start = Date.now();try {const fn = globalMapperRegistry[mapperFuncName];if (!fn) throw new Error(`Mapper not found: ${mapperFuncName}`);const output = fn(input);parent?.postMessage({subTaskId,status: 'SUCCEEDED',output,durationMs: Date.now() - start,});} catch (err: any) {parent?.postMessage({subTaskId,status: 'FAILED',error: String(err?.message || err),durationMs: Date.now() - start,});}
};
Worker 节点包装(/common/nodePool.ts
)
// /common/nodePool.ts
import worker from '@ohos.worker';
import { NodeLike, NodeMetrics, SubTask, SubTaskResult } from './types';let idSeq = 0;export class WorkerNode implements NodeLike {id: string;capacity: number;private running = 0;private histLatency: number[] = [];private lastOkTs = Date.now();private workerInst: worker.ThreadWorker;constructor(capacity: number, workerScript: string) {this.id = `node-${++idSeq}`;this.capacity = capacity;this.workerInst = new worker.ThreadWorker(workerScript);this.workerInst.on('error', (e) => {// Worker 异常当作故障console.error(`[${this.id}] worker error:`, e?.message);});}healthy(): boolean {// 简化:最近10秒有成功返回就算健康return (Date.now() - this.lastOkTs) < 10_000;}async post(subTask: SubTask<any, any>): Promise<SubTaskResult<any>> {this.running++;const startedAt = Date.now();const res: SubTaskResult<any> = await new Promise((resolve) => {const onMessage = (msg: any) => {const result: SubTaskResult<any> = {subTaskId: msg.subTaskId,status: msg.status,output: msg.output,error: msg.error,durationMs: msg.durationMs,nodeId: this.id};this.workerInst.off('message', onMessage as any);resolve(result);};const onError = (err: any) => {this.workerInst.off('error', onError as any);resolve({subTaskId: subTask.id,status: 'FAILED',error: String(err?.message || err),durationMs: Date.now() - startedAt,nodeId: this.id});};this.workerInst.on('message', onMessage as any);this.workerInst.on('error', onError as any);this.workerInst.postMessage({subTaskId: subTask.id,mapperFuncName: subTask.resource?.deadlineMs ? 'heavyPrimeCount' : subTask.taskId, // 仅示例// 实际请传 subTask 对应的 mapper 名称,这里由 Scheduler 负责});});this.running--;if (res.status === 'SUCCEEDED') {this.lastOkTs = Date.now();this.histLatency.push(res.durationMs || 0);if (this.histLatency.length > 20) this.histLatency.shift();}return res;}metrics(): NodeMetrics {const avg =this.histLatency.length === 0? 0: Math.round(this.histLatency.reduce((a, b) => a + b, 0) / this.histLatency.length);return {nodeId: this.id,running: this.running,capacity: this.capacity,avgLatencyMs: avg,lastHeartbeat: this.lastOkTs,healthy: this.healthy()};}stop() {try { this.workerInst.terminate(); } catch {}}
}export class NodePool {private nodes: WorkerNode[] = [];register(node: WorkerNode) { this.nodes.push(node); }all(): WorkerNode[] { return this.nodes; }healthy(): WorkerNode[] { return this.nodes.filter(n => n.healthy()); }stopAll() { this.nodes.forEach(n => n.stop()); }
}
提示:真实跨设备时,你可以写一个
RemoteNode implements NodeLike
,在post()
里通过 SoftBus/RPC 把子任务发到远端执行,再把结果回传。metrics()
可来自远端心跳与统计。
调度器(/common/scheduler.ts
)
// /common/scheduler.ts
import { bus } from './bus';
import {NodeLike, NodeMetrics, SubTask, SubTaskResult, TaskSpec, Priority
} from './types';
import { NodePool } from './nodePool';function now() { return Date.now(); }
function uid() { return Math.random().toString(36).slice(2); }interface InternalTaskState<MapOut> {spec: TaskSpec<any, MapOut, any>;subTasks: SubTask<any, MapOut>[];pending: Set<string>;running: Map<string, { nodeId: string; startedAt: number }>;results: Map<string, MapOut>;failed: Map<string, string>;cancelled: boolean;
}export class Scheduler {private pool: NodePool;private queue: InternalTaskState<any>[] = [];private timer?: number;private nodeSelectorIdx = 0;constructor(pool: NodePool) {this.pool = pool;}submit<Input, MapOut, ReduceOut>(spec: TaskSpec<Input, MapOut, ReduceOut>): Promise<ReduceOut> {const splits = spec.splitter(spec.input);const subTasks: SubTask<Input, MapOut>[] = splits.map((piece, i) => ({id: uid(),taskId: spec.id,shardIndex: i,input: piece,priority: spec.priority,resource: spec.resource || {},retries: 0,maxRetries: 2,createdAt: now(),}));const state: InternalTaskState<MapOut> = {spec,subTasks,pending: new Set(subTasks.map(s => s.id)),running: new Map(),results: new Map(),failed: new Map(),cancelled: false,};this.queue.push(state);bus.emit('task.submitted', { taskId: spec.id, total: subTasks.length });return new Promise<ReduceOut>((resolve, reject) => {const onProgress = () => {// 所有子任务完成if (state.pending.size === 0 && state.running.size === 0) {bus.off(`task.${spec.id}.progress`, onProgress);if (state.failed.size > 0) {reject(new Error(`Task ${spec.id} failed: ${JSON.stringify([...state.failed.values()])}`));} else {const merged = spec.reducer([...state.results.values()]);resolve(merged);}}};bus.on(`task.${spec.id}.progress`, onProgress);this.ensureLoop();});}pause(taskId: string) {// 简化:把 pending 清空到一个 shadow 队列,这里直接置 cancelledconst t = this.queue.find(q => q.spec.id === taskId);if (t) t.cancelled = true;}resume(taskId: string) {const t = this.queue.find(q => q.spec.id === taskId);if (t) t.cancelled = false;this.ensureLoop();}cancel(taskId: string) {const idx = this.queue.findIndex(q => q.spec.id === taskId);if (idx >= 0) this.queue.splice(idx, 1);}private ensureLoop() {if (this.timer) return;// 简单调度循环:每 50ms 拉一次队列this.timer = setInterval(() => this.tick(), 50) as unknown as number;}private pickNode(priority: Priority): NodeLike | undefined {const nodes = this.pool.healthy();if (nodes.length === 0) return undefined;// 加权轮询(按空闲容量)const sorted = nodes.map(n => ({ n, free: Math.max(n.metrics().capacity - n.metrics().running, 0) })).sort((a, b) => b.free - a.free || a.n.id.localeCompare(b.n.id));const pick = sorted[this.nodeSelectorIdx % sorted.length]?.n;this.nodeSelectorIdx++;return pick;}private async dispatchSubTask(state: InternalTaskState<any>, sub: SubTask<any, any>) {const node = this.pickNode(sub.priority);if (!node) return; // 没有可用节点,等下一轮state.pending.delete(sub.id);state.running.set(sub.id, { nodeId: node.id, startedAt: now() });bus.emit('subtask.started', { taskId: state.spec.id, subTaskId: sub.id, nodeId: node.id });// 传 mapper 名称给节点(在 Demo 下我们让 mapper 名称=spec.mapperFuncName)const result: SubTaskResult<any> = await node.post({...sub,taskId: state.spec.mapperFuncName, // 让 worker 能拿到 mapper 名称(简化处理)} as any);state.running.delete(sub.id);if (result.status === 'SUCCEEDED') {state.results.set(sub.id, result.output);bus.emit('subtask.succeeded', { subTaskId: sub.id, nodeId: node.id, durationMs: result.durationMs });} else {state.failed.set(sub.id, result.error || 'unknown');// 故障恢复:重试与迁移if (sub.retries < sub.maxRetries) {sub.retries++;state.pending.add(sub.id);state.failed.delete(sub.id);bus.emit('subtask.retried', { subTaskId: sub.id, retries: sub.retries });} else {bus.emit('subtask.failed', { subTaskId: sub.id, error: result.error });}}bus.emit(`task.${state.spec.id}.progress`);}private tick() {for (const t of this.queue) {if (t.cancelled) continue;// 避免一轮派发太多:每个任务每轮最多派发 min(剩余, 全局空闲)const freeSlots = this.pool.healthy().map(n => Math.max(n.metrics().capacity - n.metrics().running, 0)).reduce((a, b) => a + b, 0);if (freeSlots <= 0) continue;// 高优先级先派发(这里队列已按提交顺序,若要更严格可以改为多级队列)const readySubs = [...t.pending].slice(0, freeSlots);readySubs.forEach(subId => {const sub = t.subTasks.find(s => s.id === subId)!;this.dispatchSubTask(t, sub);});}// 全空闲则停表const hasPending = this.queue.some(t => t.pending.size > 0 || t.running.size > 0);if (!hasPending) {clearInterval(this.timer as unknown as number);this.timer = undefined;}}
}
简单 UI 页面(/pages/Index.ets
)
// /pages/Index.ets
import { NodePool, WorkerNode } from '../common/nodePool';
import { Scheduler } from '../common/scheduler';
import { TaskSpec } from '../common/types';
import { bus } from '../common/bus';@Entry
@Component
struct Index {private logs: string[] = [];private pool = new NodePool();private scheduler?: Scheduler;aboutToAppear() {// 注册3个“节点”,每个节点并发能力不同this.pool.register(new WorkerNode(2, 'workers/node.js'));this.pool.register(new WorkerNode(1, 'workers/node.js'));this.pool.register(new WorkerNode(3, 'workers/node.js'));this.scheduler = new Scheduler(this.pool);// 订阅事件做状态监控bus.on('task.submitted', (e) => this.pushLog(`Task submitted: ${e.taskId}, total: ${e.total}`));bus.on('subtask.started', (e) => this.pushLog(`SubTask ${e.subTaskId} -> ${e.nodeId} started`));bus.on('subtask.succeeded', (e) => this.pushLog(`SubTask ${e.subTaskId} done in ${e.durationMs}ms`));bus.on('subtask.retried', (e) => this.pushLog(`SubTask ${e.subTaskId} retry #${e.retries}`));bus.on('subtask.failed', (e) => this.pushLog(`SubTask ${e.subTaskId} failed: ${e.error}`));}private pushLog(s: string) {this.logs = [`[${new Date().toLocaleTimeString()}] ${s}`, ...this.logs].slice(0, 200);}build() {Column({ space: 12 }) {Text('分布式任务管理 Demo(Worker 模拟节点)').fontSize(20).fontWeight(FontWeight.Bold).margin({ top: 12 })Row({ space: 8 }) {Button('启动:素数计数(拆分并行)').onClick(() => this.runPrimeDemo());Button('启动:词频统计(文本分片)').onClick(() => this.runWordCountDemo());Button('清空日志').onClick(() => this.logs = []);}List() {ForEach(this.logs, (line: string) => {ListItem() {Text(line).fontSize(12).maxLines(2)}})}.height('70%')}.padding(16)}private async runPrimeDemo() {if (!this.scheduler) return;// 把一个大 n 拆成多个分片(比如 [1..2e6] 切成10片)const N = 2_000_00; // 2e5,演示更快;需要更重可放大const shards = 10;const step = Math.floor(N / shards);const spec: TaskSpec<number[], number, number> = {id: `prime-${Date.now()}`,name: 'heavyPrimeCount',priority: 'HIGH',input: Array.from({ length: shards }, (_, i) => (i + 1) * step),splitter: (arr) => arr, // 这里 input 已是分片上界数组mapperFuncName: 'heavyPrimeCount',reducer: (partials) => partials.reduce((a, b) => a + b, 0),resource: { cpuCost: 3 }};this.pushLog('启动素数计数任务...');try {const total = await this.scheduler.submit(spec);this.pushLog(`素数总数:${total}`);} catch (e: any) {this.pushLog(`任务失败:${e?.message || e}`);}}private async runWordCountDemo() {if (!this.scheduler) return;const text = `Hello world hello HarmonyOS world distributed scheduler demo hello`;const words = text.split(/\s+/);const shardSize = 3;const splits: string[] = [];for (let i = 0; i < words.length; i += shardSize) {splits.push(words.slice(i, i + shardSize).join(' '));}const spec = {id: `wc-${Date.now()}`,name: 'wordCount',priority: 'MEDIUM',input: splits,splitter: (arr: string[]) => arr,mapperFuncName: 'wordCount',reducer: (partials: Record<string, number>[]) => {const acc: Record<string, number> = {};partials.forEach(p => {Object.keys(p).forEach(k => { acc[k] = (acc[k] || 0) + p[k]; });});return acc;},resource: { cpuCost: 1 }};this.pushLog('启动词频统计任务...');try {const wc = await this.scheduler.submit(spec as any);this.pushLog(`词频结果:${JSON.stringify(wc)}`);} catch (e: any) {this.pushLog(`任务失败:${e?.message || e}`);}}
}
Worker 注册(module.json5
片段)
{"module": {"abilities": [ /* ... */ ],"resources": [ /* ... */ ],"js": [{"pages": [ "pages/Index" ],"name": "default","window": { "designWidth": 720 }}],"workers": [{"name": "node","src": "workers/node.ts" // 构建后路径可能为 workers/node.js}]}
}
小贴士:构建后实际加载的路径可能是
workers/node.js
,上面页面里也用的是这个路径;如果工程配置不同,请按实际产物路径调整。
任务分解与合并
分解策略
- 均匀分片:最常用,适合输入切分比较均匀的 CPU 密集任务
- 采样分片:先抽样估算每片耗时,做不均匀切分
- 流式分片:源源不断产出子任务(例如日志流/视频帧),调度器动态派发
合并策略
- 交换律/结合律友好的任务直接 reduce(sum/max/min/count)
- Map 合并(词频):Key-by 合并计数
- 流式归并:子结果一到就合并,降低尾延迟;并可用于“中间态可视化”
负载均衡设计
核心思路
- 实时负载度量:
running / capacity
、历史平均延迟、失败率 - 节点选择:加权轮询/最少连接/基于延迟的选择
- 背压:全局空闲槽为 0 时暂停派发
代码落地
文章中的 pickNode()
就是“最少连接 + 简单轮询”的混合,用每个节点的空闲槽排序,尽可能把任务丢到最空闲的节点。
任务状态监控
生命周期事件
task.submitted
/subtask.started
/subtask.succeeded
/subtask.failed
/subtask.retried
- 通过
bus
统一发事件,页面订阅后就能做 UI 呈现、日志、埋点
健康检查
- Demo 中以“最近 10 秒有成功返回”为健康;实际可用心跳 + 指标上报
- 节点维度可上报:CPU/内存/温度/电量/网络质量 等
故障恢复机制
失败类型
- 执行失败:代码异常、输入异常
- 超时失败:节点卡死、资源不足
- 节点失联:心跳超时、网络断开
迁移与重试
- 子任务失败 -> 放回 pending,增加重试计数,重新选择另一个节点
- 重试上限 -> 标记失败,最终任务失败
- 可选策略:指数退避、冷热节点隔离、黑名单/熔断
Demo 中:
maxRetries=2
,失败后自动回到待分配队列,下一轮会被派往新的节点。
任务结果收集与整合
边算边合并
- 调度器收到子结果即可调用 reducer 的“增量版本”,UI 侧可以不断更新“当前汇总”
- 好处:用户能尽早看到部分结果,尾部子任务不再阻塞整体体验
一致性与幂等
- 子任务应具备幂等(可重试),合并器也尽量写成幂等(例如用分片 id 去重)
- 如果涉及跨设备一致性,可用分布式数据对象做“最终一致”存储
应用场景与示例代码
场景一:大图批处理(滤镜/缩放/特征提取)
- 输入是一张大图或多张图片,拆成网格块(tile),各节点并行处理
- 合并时按 tile 坐标把图块拼回去,或把特征向量聚合
示例(伪代码,映射到本文结构即可):
// mapper: tileProcess(imageTile) -> processedTile
// reducer: stitch(tiles) -> finalImage
const spec = {id: `img-${Date.now()}`,name: 'imageTiles',priority: 'HIGH',input: splitToTiles(image, 4, 4), // 16个tilesplitter: (tiles) => tiles,mapperFuncName: 'processTile', // 在节点注册reducer: (tiles) => stitch(tiles)
};
scheduler.submit(spec);
节点侧注册:
// workers/node.ts
globalMapperRegistry['processTile'] = (tile: Uint8Array) => {// 简化:亮度提升const out = new Uint8Array(tile.length);for (let i = 0; i < tile.length; i++) out[i] = Math.min(255, tile[i] + 10);return out;
};
场景二:日志关键词与异常检测(近实时)
- 输入是按分钟切分的日志片段
- mapper 做关键词统计 + 简单规则匹配;reducer 合并计数并输出异常列表
globalMapperRegistry['logScan'] = (chunk: string) => {const anomalies: string[] = [];const counts: Record<string, number> = {};chunk.split('\n').forEach(line => {if (line.includes('ERROR')) anomalies.push(line);const m = line.match(/\b(login|pay|video|fail)\b/gi);if (m) m.forEach(k => counts[k] = (counts[k] || 0) + 1);});return { anomalies, counts };
};const spec = {id: `log-${Date.now()}`,name: 'logScan',priority: 'MEDIUM',input: minuteChunks, // e.g. 最近10分钟splitter: (arr: string[]) => arr,mapperFuncName: 'logScan',reducer: (partials: any[]) => {const merged: any = { anomalies: [], counts: {} };partials.forEach(p => {merged.anomalies.push(...p.anomalies);Object.keys(p.counts).forEach(k => merged.counts[k] = (merged.counts[k] || 0) + p.counts[k]);});return merged;}
};
scheduler.submit(spec as any);
场景三:矩阵乘法(教育/科研小样)
- 把矩阵 A×B 的计算拆成“按行×列”的点积子任务
- mapper 只做一行与一列的点积;reducer 把每个单元填回结果矩阵
// 注册
globalMapperRegistry['matrixRowMul'] 已在上文// 拆分
function splitMatMul(A: number[][], B: number[][]) {const BT = transpose(B);const jobs: {i: number, j: number, row: number[], col: number[]}[] = [];for (let i = 0; i < A.length; i++) {for (let j = 0; j < BT.length; j++) {jobs.push({ i, j, row: A[i], col: BT[j] });}}return jobs;
}const spec = {id: `mm-${Date.now()}`,name: 'matmul',priority: 'HIGH',input: splitMatMul(A, B),splitter: (arr: any[]) => arr,mapperFuncName: 'matrixRowMul',reducer: (partials: {i:number,j:number,val:number}[]) => {const C = Array.from({length: A.length}, () => Array(B[0].length).fill(0));partials.forEach(p => { C[p.i][p.j] = p.val; });return C;}
};
注:上段 reducer 需要 mapper 返回
{i,j,val}
,可在node.ts
里把返回值改为({ row, col, i, j }) => ({ i, j, val: dot(row,col) })
。
把 Demo 扩展为真分布式
设备发现与连接
- 用
@ohos.distributedHardware.deviceManager
获取在线设备列表 - 用
@ohos.communication.softbus
或对应封装建立通道/会话
远端执行与 RPC
- 为远端设备提供一个“执行能力”(FA/Stage Ability + RemoteObject 接口),暴露
executeSubTask(mapperName: string, input: any)
- 本地
RemoteNode implements NodeLike.post()
:序列化子任务,通过 SoftBus/RPC 发过去 - 远端执行完成后回传结果,落到统一的
SubTaskResult
状态与心跳
- 每个设备周期性上报:并发容量、运行中数量、平均延迟、电量/温度/网络延迟等
- 调度侧据此更新
NodeMetrics
,参与负载均衡和健康判定
故障与数据一致性
- 会话断开/心跳超时 -> 标记节点故障 -> 回收其未完成子任务 -> 迁移
- 中间态存储可用分布式数据对象;输出结果可写分布式文件或云端
QA 环节
Q1:任务优先级如何真正“生效”?
A:两层做法:队列层面先派高优先级;节点选择时也给高优先级更多空闲槽(例如按权重倍数分配)。复杂一点可以按优先级分别维护独立队列,或者实现“时间片”+“抢占”(把低优先级的待派发任务延后)。
Q2:如何避免某些超大子任务卡死单个节点?
A:拆分更细;或者给子任务设置超时,超时后分裂为更小子任务再派发;节点侧可实现可中断执行(定期检查取消标志)。
Q3:结果合并时如何保证幂等?
A:给每个子任务固定 subTaskId
和 shardIndex
,合并器按 shardIndex
去重写入;重复到达也不影响最终态。
Q4:Worker 池在移动端会不会抢资源影响前台体验?
A:可以读取系统负载/温度/电量阈值,动态降低 capacity
;前后台切换时暂停/降速;同时为高优先级的交互任务保留资源。
Q5:跨设备网络波动大,怎么稳?
A:引入“自适应批量”和“自适应超时”:网络差时缩小分片、降低并发;网络好时加大窗口。对失败设备做临时熔断,间隔重试恢复。
总结
本文给出了一套“能跑”的分布式任务管理 Demo:
- 用 Worker 池模拟多节点,完成任务拆分、调度、负载均衡、状态监控、故障恢复、结果合并
- 通过
NodeLike
接口把节点抽象出来,后续可无感替换为跨设备节点(SoftBus/RPC) - 给出三类常见场景的实现思路与代码骨架
如果你要落地到生产:
- 完善健康度量(CPU/内存/电量/温度/网络)与调度策略(延迟/容量/优先级的联合优化)
- 加上任务持久化(崩溃恢复)、可观测性(Tracing/指标/日志)
- 结合分布式数据对象或对象存储,保证结果的一致和可追溯
- 对不同设备画像(算力、温度墙、电量策略)做“亲和性调度”,整体效率会更高
把本文 Demo 跑通后,你基本就具备了“把任务丢给最近、最闲、最合适的设备执行,还能抗故障并回收结果”的核心工程骨架。把通信层替换为跨设备实现,就是真·分布式计算的任务管理器了。