当前位置: 首页 > news >正文

【Dart语言】八、并发

概览

本页概述了 Dart 中并发编程的工作原理,从高层次解释了事件循环、异步语言特性以及 isolates(隔离区)。如需查看 Dart 并发编程的实际代码示例,请阅读《异步支持》和《Isolates》页面。

Dart 中的并发编程既包括 Future 和 Stream 等异步API,也包括允许将进程转移到单独核心的isolates(隔离区)。

所有 Dart 代码都在 isolates 中运行,从默认的主 isolate 开始,并可选择性地扩展到您显式创建的其他 isolates。当您生成一个新的 isolate 时,它拥有自己独立的内存空间和事件循环。正是这个事件循环使得 Dart 中的异步和并发编程成为可能。

事件循环

Dart 的运行时模型基于事件循环(event loop)。事件循环负责执行程序代码、收集并处理事件等任务。

当应用程序运行时,所有事件都会被添加到一个名为事件队列(event queue)的队列中。这些事件可能包括 UI 重绘请求、用户点击或键盘输入、磁盘 I/O 操作等。由于应用程序无法预知事件发生的顺序,事件循环会按照事件进入队列的先后顺序逐个处理它们。

事件循环的运行机制可以用以下代码表示:

while (eventQueue.waitForEvent()) {eventQueue.processNextEvent();
}

这个示例中的事件循环(event loop)是同步运行的,并且仅使用单线程。然而,大多数 Dart 应用需要同时处理多个任务。例如,一个客户端应用可能需要在发起 HTTP 请求的同时,还要监听用户的按钮点击。为了应对这种情况,Dart 提供了多种异步 API,如 Future、Stream 以及 async-await 语法。这些 API 正是基于这个事件循环构建的。

例如,假设需要发起一个网络请求:

// get方法是一个async,它返回Future类型,Future的then方法则是异步的回调
http.get('https://example.com').then((response) {if (response.statusCode == 200) {print('Success!');}  
}

当这段代码进入事件循环(event loop)时,它会立即执行第一个操作——调用 http.get 并返回一个 Future。同时,事件循环会暂存 then() 中的回调函数,直到 HTTP 请求完成。当请求返回结果时,事件循环将执行这个回调函数,并将请求结果作为参数传入。

这一机制同样适用于 Dart 中其他所有异步事件的处理方式,比如 Stream 对象。

异步编程

本节将概述 Dart 中异步编程的不同类型和语法。如果您已经熟悉 Future、Stream 和 async-await 的使用,可以直接跳转至 isolates 部分继续阅读。

Futures

Future 表示一个异步操作的结果,该操作最终会以返回某个值或错误的方式完成。

在这段示例代码中,Future<String> 的返回类型表示一个最终会提供 String 类型值(或错误)的承诺。

Future<String> _readFileAsync(String filename) {final file = File(filename);// .readAsString() 返回一个 Future// .then() 注册一个回调函数,该函数将在 readAsString 完成时执行return file.readAsString().then((contents) {return contents.trim();});
}

async-await语法

async 和 await 关键字提供了一种声明式的方式来定义异步函数并使用它们的结果。

以下是同步代码的示例,该代码会在等待文件 I/O 时阻塞执行:

const String filename = 'with_keys.json';void main() {// 读取一些数据final fileData = _readFileSync();final jsonData = jsonDecode(fileData);// 使用这些数据print('Number of JSON keys: ${jsonData.length}');
}String _readFileSync() {final file = File(filename);final contents = file.readAsStringSync();return contents.trim();
}

以下是功能相同的代码实现,但已通过修改(标亮处)改为异步方式:

main() 函数在 _readFileAsync() 前使用 await 关键字,使得在原生代码(文件 I/O)执行期间,其他 Dart 代码(如事件处理器)能够继续使用 CPU 资源。使用 await 还会将 _readFileAsync() 返回的 Future<String> 转换为实际的 String 值,因此 contents 变量会被隐式推断为 String 类型。

提示

await 关键字仅在函数体前标记了 async 的函数中有效。

如图所示,当 readAsString() 执行非 Dart 代码(无论是在 Dart 运行时还是操作系统中)时,Dart 代码会暂停执行。一旦 readAsString() 返回值,Dart 代码就会恢复执行。

Streams

Dart 还支持以 Stream 形式实现的异步代码。Stream(流)既能在未来提供值,也能随时间推移持续重复地提供值。一个承诺会随时间推移提供一系列 int 值的对象,其类型为 Stream<int>。

在下面的示例中,通过 Stream.periodic 创建的 Stream 会每秒重复生成一个新的 int 值。

Stream<int> stream = Stream.periodic(const Duration(seconds: 1), (i) => i * i);
await-for和yield

await-for 是一种 for 循环,它在新值被提供时执行循环的每次后续迭代。换句话说,它用于"循环遍历"流。在此示例中,当作为参数提供的流发出新值时,sumStream 函数将发出一个新值。在返回流值的函数中,使用 yield 关键字而不是 return。

Stream<int> sumStream(Stream<int> stream) async* {var sum = 0;await for (final value in stream) {yield sum += value;}
}

代码示例:

Stream<int> sumStream(Stream<int> stream) async* {var sum = 0;await for (final value in stream) {yield sum += value;}
}void main() async {Stream<int> stream = Stream.periodic(const Duration(seconds: 1), (i) => i * i);Stream<int> stream2 = sumStream(stream);// 同步遍历 (await for)// await for (var number in stream2) {//   print('计数: $number');// }// 异步遍历(listen 回调)stream2.listen((value) {print('计数2: $value');});
}

若想进一步了解 async、await、Stream 和 Future 的使用方法,请参阅异步编程教程。

Isolates

Dart 通过 isolates(隔离区)和异步 API 支持并发。现代设备普遍配备多核 CPU,开发者通常使用共享内存的线程并发来利用多核性能,但这种共享状态的并发模式容易出错且会导致代码复杂度增加。

所有 Dart 代码都运行在 isolates 中而非线程里。使用 isolates,Dart 代码可以并行执行多个独立任务,自动利用可用的处理器核心。isolates 类似于线程或进程,但每个 isolates 拥有独立的内存空间,并通过单线程运行事件循环。

每个 isolates 都有独立的全局字段,确保其状态不能被其他 isolates 访问。isolates 之间只能通过消息传递进行通信。由于没有共享状态,Dart 中不会出现互斥锁、死锁或数据竞争等并发问题。但需注意,isolates 并不能完全避免所有竞态条件。如需了解更多关于此并发模型的信息,请参考 Actor 模型。

平台说明
仅 Dart Native 平台实现了 isolates 机制。关于 Dart Web 平台的并发实现,请参阅本章 Web 平台的并发处理章节

主isolate

在大多数情况下,开发者无需主动管理 isolates。Dart程序默认运行在主 isolate 中,这是程序启动和执行的主线程,如下图所示:

即使单个 isolate 的程序也能顺畅运行。在继续执行下一行代码之前,这些应用会使用 async-await 等待异步操作完成。一个行为良好的应用会快速启动,并尽快进入事件循环。然后,应用会根据需要使用异步操作来及时响应每个排队的事件。

Isolate生命周期

如下图所示,每个 isolate 启动时都会先执行一些 Dart 代码,比如 main() 函数。这段初始代码可能会注册一些事件监听器,例如用于响应用户输入或文件 I/O 操作。当 isolate 的初始函数执行完毕后,若仍需处理事件,该 isolate 会继续保持运行状态。待所有事件处理完成后,isolate 才会终止。

事件处理

在客户端应用中,主 isolate 的事件队列可能包含重绘请求、点击事件通知和其他UI事件。例如下图所示的事件顺序:一个重绘事件后跟着一个点击事件,然后是两个重绘事件。事件循环按照先进先出 (FIFO) 的顺序从队列中获取事件。

事件处理发生在 main() 函数执行完毕后的主 isolate 中。如下图所示,main() 退出后,主 isolate 首先处理第一个重绘事件,接着处理点击事件,然后处理另一个重绘事件。

若同步操作耗时过长,应用可能会失去响应。如图所示,点击事件处理代码执行时间过长,导致后续事件延迟处理。此时应用可能出现界面冻结,动画也会变得卡顿不流畅。

在客户端应用中,同步操作耗时过长通常会导致UI动画出现卡顿(不流畅)现象。更严重的是,界面可能会完全失去响应。

后台工作线程

当应用 UI 因执行耗时计算(如解析大型 JSON 文件)而失去响应时,建议将该计算任务转移到工作 isolate(后台工作线程)中执行。如图中典型场景所示:系统会创建一个独立的工作 isolate 来执行计算任务,任务完成后该 isolate 将自动终止,并通过消息传递机制返回计算结果。

工作 isolate 能够执行 I/O 操作(如读写文件)、设置定时器以及其他任务。它拥有独立的内存空间,不与主 isolate 共享任何状态,且其阻塞操作不会影响其他 isolate 的运行。

使用isolates

在 Dart 中,根据使用场景有两种使用 isolates 的方式:

  • 使用 Isolate.run() 在独立线程中执行单次计算任务。
  • 使用 Isolate.spawn() 创建长期运行的 isolate,用于处理多个消息或作为后台工作 isolates 。有关长期运行 isolates 的更多信息,请参阅 Isolates 页面。

在大多数情况下,Isolate.run 是推荐用于在后台运行进程的 API。

Isolate.run()

静态方法 Isolate.run() 需要一个参数:一个在新创建的 isolate 中运行的回调函数。

int slowFib(int n) => n <= 1 ? 1 : slowFib(n - 1) + slowFib(n - 2);// 在不阻塞当前 isolate 的情况下执行计算
void fib40() async {var result = await Isolate.run(() => slowFib(40));print('Fib(40) = $result');
}

性能和isolate组

当一个 Isolate 调用 Isolate.spawn() 时,这两个 Isolate 具有相同的可执行代码并属于同一个 Isolate 组。Isolate 组支持性能优化(例如代码共享),新创建的 Isolate 会立即运行该 Isolate 组拥有的代码。此外,Isolate.exit() 仅在 Isolate 同属一个组时才能生效。

某些特殊场景下,可能需要使用 Isolate.spawnUri(),该方法会基于指定 URI 的代码副本创建新 Isolate。但 spawnUri() 的执行效率远低于 spawn(),且新 Isolate 不会加入创建者的 Isolate 组。另一个性能影响是:当 Isolate 处于不同组时,消息传递的速度会更慢。

Isolate的局限性

Isolate并非线程

对于从多线程语言转向 Dart 的开发者,可能会误以为 isolate 的行为类似线程,但实际并非如此。每个 isolate 都拥有独立的状态,确保其他 isolate 无法访问其内部状态。因此,isolate 只能操作自身内存空间的数据。

例如:若应用存在全局可变变量,该变量在派生的 isolate 中会成为独立副本。即使在派生的 isolate 中修改该变量,主 isolate 中的原始变量仍保持不变。这正是 isolate 的设计机制,在使用时需要特别注意这一特性。

消息类型

通过 SendPort 发送的消息可以是几乎任何类型的 Dart 对象,但存在以下例外情况:

  • 带有原生资源的对象,例如 Socket
  • ReceivePort
  • DynamicLibrary
  • Finalizable
  • Finalizer
  • NativeFinalizer
  • Pointer
  • UserTag
  • 被标记为 @pragma('vm:isolate-unsendable') 的类实例

除上述例外情况外,任何对象都可以传递。更多信息请参阅 SendPort.send 的文档说明。

需注意的是,Isolate.spawn() 和 Isolate.exit() 在底层抽象了 SendPort 对象的操作,因此它们遵循相同的限制规则。

Isolates之间的同步阻塞通信

可并行运行的 Isolate 数量存在上限。该限制不会影响 Dart 中通过消息传递的标准异步通信——即使数百个 Isolate 并发运行也能正常执行任务。这些 Isolate 会以轮询方式在 CPU 上调度,并频繁相互让出执行权。

Isolate 只能在纯 Dart 环境之外通过 FFI 调用 C 代码实现同步通信。若 Isolate 数量超限,在 FFI 调用中尝试通过同步阻塞实现通信可能导致死锁(除非采取特殊措施)。该上限并非固定数值,而是根据 Dart 应用可用的 VM 堆内存动态计算得出。

为避免这种情况,执行同步阻塞的 C 代码需在阻塞操作前调用 Dart_ExitIsolate 离开当前 Isolate,并在从 FFI 返回前通过 Dart_EnterIsolate 重新进入。具体实现请参阅 Dart_EnterIsolate 和 Dart_ExitIsolate 的相关文档。

Web中的并发

所有 Dart 应用都可以使用 async-await、Future 和 Stream 来实现非阻塞的交叉运算。然而,Dart web 平台不支持 isolates。Dart web 应用可以使用 web workers 在后台线程中运行脚本,这与 isolates 类似。不过,web workers 的功能和能力与 isolates 有些不同。

例如,当 web workers 在线程之间发送数据时,它们会来回复制数据。不过,数据复制可能非常慢,特别是对于大型消息。Isolate 也做同样的事情,但还提供了可以更高效地转移持有消息的内存的 API。

创建 web workers 和 isolates 也有所不同。你只能通过声明单独的程序入口点并单独编译来创建 web workers。启动一个 web worker 类似于使用 Isolate.spawnUri 来启动一个 isolate。你也可以使用 Isolate.spawn 来启动一个 isolate,这种方式需要更少的资源,因为它复用了部分与发起 isolate 相同的代码和数据。Web workers 没有等效的 API。

附加资源

  • 如果使用多个 isolate,可考虑 Flutter 中的 IsolateNameServer,或用于非 Flutter Dart 应用的 package:isolate_name_server,它提供类似功能。
  • 阅读有关 Dart isolate 所基于的 Actor 模型。
  • 关于 Isolate API 的附加文档:
    • Isolate.exit()
    • Isolate.spawn()
    • ReceivePort
    • SendPort

异步支持

Dart 库中大量函数会返回 Future 或 Stream 对象。这些函数属于异步操作:它们会在建立可能耗时的操作(如 I/O)后立即返回,而无需等待操作完成。

async 和 await 关键字提供异步编程支持,使您能编写近似同步代码风格的异步代码。

Futures处理

当需要获取已完成的 Future 的结果时,您有两种选择:

  • 使用 async 和 await,如本文档和异步编程教程所述。
  • 使用 Future API,如 dart:async 库文档所述。

使用 async 和 await 的代码本质上是异步的,但其写法与同步代码高度相似。例如,以下代码通过 await 等待异步函数的执行结果:

await lookUpVersion();

要使用 await,代码必须在一个 async 函数中,一个被标记为 async 的函数:

Future<void> checkVersion() async {var version = await lookUpVersion();// Do something with version
}

提示

尽管 async 函数可能执行耗时操作,但它并不会等待这些操作完成。实际上,async 函数仅会执行到第一个 await 表达式处,然后立即返回一个 Future 对象,直到 await 表达式完成后才会继续执行后续代码。

在使用 await 的代码中,可通过 try、catch 和 finally 实现错误处理与资源清理:

try {version = await lookUpVersion();
} catch (e) {// React to inability to look up the version
}

在 async 函数中,您可以多次使用 await。例如,以下代码会三次等待函数执行结果:

var entrypoint = await findEntryPoint();
var exitCode = await runExecutable(entrypoint, args);
await flushThenExit(exitCode);

在 await expression 中,expression 的值通常是一个 Future 对象;如果不是,则该值会自动被包装成 Future。这个 Future 对象代表返回某个对象的承诺。await 表达式的值就是这个被返回的对象。await expression 会使代码执行暂停,直到该对象可用。

若使用 await 时出现编译时错误,请确保 await 位于 async 函数内。例如,要在应用的 main() 函数中使用 await,必须将 main() 的函数体标记为 async:

void main() async {checkVersion();print('In main: version is ${await lookUpVersion()}');
}

提示

前例中的 checkVersion() 作为异步函数被调用时未等待结果返回——若代码逻辑依赖该函数执行完成,此做法可能导致问题。为避免此类情况,建议启用 unawaited_futures 静态检查规则。

关于如何运用 Future、async 和 await 的交互式入门指南,请参阅异步编程教程。

声明async函数

一个 async 函数是指函数体被标记有 async 修饰符的函数。

给函数添加 async 关键字会使其返回一个 Future。例如,看下面这个返回 String 的同步函数:

String lookUpVersion() => '1.0.0';

如果你把它改成 async 函数(例如,因为后续实现可能比较耗时),返回值就会变成 Future:

Future<String> lookUpVersion() async => '1.0.0';

需要注意的是,函数体内部并不需要手动使用 Future API。Dart 会在必要时自动创建 Future 对象。如果你的函数没有实际返回值,请将返回类型设为 Future<void>。

如需交互式学习 futures、async 和 await 的用法,请参阅异步编程教程。

Streams处理

从 Stream 获取数据有两种方式:

  • 使用 async 和异步循环 (await for)
  • 使用 Stream API(详见 dart:async 文档)

提示

使用 await for 前,请确认它能提高代码清晰度且您确实需要等待流的所有结果。例如,通常不应在UI事件监听器中使用 await for,因为UI框架会持续发送无限的事件流。

异步 for 循环的形式如下:

await for (varOrType identifier in expression) {// 每次流(stream)发出值时都会执行
}

expression 的值必须是 Stream 类型。执行流程如下:

  1. 等待流(stream)发出一个值。
  2. 执行for循环体,变量设置为该发出的值。
  3. 重复步骤1和2,直到流(stream)关闭。

如需停止监听流(stream),可使用 break 或 return 语句,这会跳出 for 循环并取消对该流的订阅。

如果在实现异步 for 循环时遇到编译时错误,请确保 await for 位于 async 函数中。例如,要在应用的 main() 函数中使用异步 for 循环,必须将 main() 的函数体标记为 async:

void main() async {// ...await for (final request in requestServer) {handleRequest(request);}// ...
}

示例:

Stream<int> sumStream(Stream<int> stream) async* {var sum = 0;await for (final value in stream) {yield sum += value;}
}void main() async {Stream<int> stream = Stream.periodic(const Duration(seconds: 1), (i) => i * i);Stream<int> stream2 = sumStream(stream);// 同步遍历 (await for)// await for (var number in stream2) {//   print('计数: $number');// }// 异步遍历(listen 回调)stream2.listen((value) {print('计数2: $value');});
}

如需了解更多关于 Dart 异步编程的支持,请查阅 dart:async 库文档。

Isolates

本文介绍了使用 Isolate API 实现 isolates 的一些示例。

当应用程序需要处理可能暂时阻塞其他计算的大型计算任务时,就应该使用 isolates。最常见的例子是在 Flutter 应用程序中,当您需要执行可能导致 UI 无响应的大型计算时。

虽然没有强制规定必须使用 isolates 的场景,但以下情况特别适合使用 isolates:

  • 解析和编解码超大型 JSON 数据。
  • 处理压缩照片、音频和视频。
  • 转换音视频文件格式。
  • 对大型数据集或文件系统执行复杂搜索/过滤。
  • 数据库通信等 I/O 密集型操作。
  • 处理大批量网络请求。

实现一个简单的工作isolate

这些示例实现了一个主 isolate,该 isolate 会生成一个简单的工作 isolate。Isolate.run() 简化了设置和管理工作 isolate 的步骤:

  1. 生成(启动并创建)一个 isolate。
  2. 在生成的 isolate 上运行函数。
  3. 捕获结果。
  4. 将结果返回给主 isolate。
  5. 工作完成后终止 isolate。
  6. 检查、捕获异常和错误并抛回主 isolate。

Flutter 提示

若您使用 Flutter,可以用 Flutter 的 compute 函数替代 Isolate.run()。

在新isolate中运行现有方法

1、调用 run() 方法在主 isolate 中直接创建一个新的 isolate(后台工作线程),同时 main() 方法会等待执行结果返回:

const String filename = 'with_keys.json';void main() async {// 读取数据final jsonData = await Isolate.run(_readAndParseJson);// 使用该数据print('Number of JSON keys: ${jsonData.length}');
}

2、将需要执行的函数作为首个参数传递给 worker isolate。本示例中使用的是现有的 _readAndParseJson() 方法。

Future<Map<String, dynamic>> _readAndParseJson() async {final fileData = await File(filename).readAsString();final jsonData = jsonDecode(fileData) as Map<String, dynamic>;return jsonData;
}

3、Isolate.run() 会获取 _readAndParseJson() 的返回结果,将该值传回主 isolate 并关闭 worker isolate。

4、工作 isolate 会将存储结果的内存直接转移给主 isolate,而非复制数据。转移前工作 isolate 会执行验证检查,确保对象允许被转移。

_readAndParseJson() 是一个现有的异步函数,本可直接在主 isolate 中运行。改用 Isolate.run() 执行可实现并发,工作 isolate 将完全隔离 _readAndParseJson() 的计算过程,其执行不会阻塞主 isolate。

Isolate.run() 的返回值始终是 Future,因为主 isolate 的代码会继续执行。无论工作 isolate 执行的是同步还是异步计算,都不会影响主 isolate,因为它们始终是并发运行的。

完整程序请参阅 send_and_receive.dart 示例文件。

使用isolate传递闭包

可以直接在主 isolate 中使用函数字面量或闭包通过 run() 创建简单的工作 isolate。

const String filename = 'with_keys.json';void main() async {// 读取数据final jsonData = await Isolate.run(() async {final fileData = await File(filename).readAsString();final jsonData = jsonDecode(fileData) as Map<String, dynamic>;return jsonData;});// 使用数据print('Number of JSON keys: ${jsonData.length}');
}

此示例实现的效果与前例相同:新建一个 isolate 执行计算后返回结果。

不过,这次 isolate 传递的是闭包。闭包相比普通命名函数限制更少,无论是功能还是代码编写方式。在本例中,Isolate.run() 并发执行的代码看起来像是本地代码。从这个角度看,可以将 run() 视为"并行运行"的控制流操作符。

使用端口在isolate之间传递多条消息

短期存活的 isolate 便于使用,但创建新 isolate 及跨 isolate 复制对象会产生性能开销。若需重复执行相同计算,相比多次使用 Isolate.run(),创建长期存活的 isolate 可提升性能。

为此,可以使用 Isolate.run() 所基于的底层 isolate API:

  • Isolate.spawn() 和 Isolate.exit()
  • ReceivePort 和 SendPort
  • SendPort.send() 方法

本节将详细介绍如何在新创建的 isolate 与主 isolate 之间建立双向通信。第一个示例 "基础端口(Basic ports)" 从高层级介绍流程,第二个示例 "稳健端口(Robust ports)" 则逐步为前者添加更多实际生产环境所需的功能。

ReceivePort和SendPort

在 isolate 之间建立长期通信需要两个类(除 Isolate 外):ReceivePort 和 SendPort。这些端口是 isolate 间通信的唯一方式。

ReceivePort 是处理其他 isolate 所发送消息的对象,这些消息通过 SendPort 发送。

提示

每个 SendPort 对象仅关联一个 ReceivePort,但单个 ReceivePort 可拥有多个 SendPort。创建 ReceivePort 时会自动生成其对应的 SendPort,也可额外创建指向同一 ReceivePort 的 SendPort 用于消息发送。

端口的行为与 Stream 对象类似(实际上 ReceivePort 实现了 Stream 接口)。可以将 SendPort 和 ReceivePort 分别类比为 Stream 的 StreamController 和监听器:SendPort 类似 StreamController,通过 SendPort.send() 方法 "添加" 消息,由 ReceivePort 作为监听器处理。ReceivePort 接收到消息后,会将其作为参数传递给开发者提供的回调函数处理。

配置端口

新创建的 isolate 仅拥有通过 Isolate.spawn() 调用接收到的信息。如果您需要主 isolate 在初始创建后继续与 Isolate.spawn() 新创建的 isolate 通信,必须建立一个通信通道,使 Isolate.spawn() 新创建的 isolate 能够向主 isolate 发送消息。isolate 只能通过消息传递进行通信。它们无法 "看到" 彼此的内存内部,这正是 "isolate"(隔离)名称的由来。

要建立这种双向通信,需先在主 isolate 中创建 ReceivePort,随后在通过 Isolate.spawn() 创建新 isolate 时将其 SendPort 作为参数传入。新 isolate 会创建自己的 ReceivePort,并通过主 isolate 传递的 SendPort 将自己的 SendPort 回传。主 isolate 接收该 SendPort 后,双方即建立起可收发消息的开放通道。

提示!

本节的示意图较为概略,主要用于展示 isolate 之间通过端口通信的概念。实际实现需要更多代码,您可在本页后续内容中找到具体实现。

1、在主 isolate 中创建一个 ReceivePort,SendPort 会作为 ReceivePort 的属性自动生成。
2、使用 Isolate.spawn() 生成工作 isolate
3、将 ReceivePort.sendPort 的引用作为第一条消息传递给工作 isolate
4、在工作 isolate 中再创建一个新的 ReceivePort
5、将工作 isolate 的 ReceivePort.sendPort 引用作为第一条消息传回主 isolate

除了创建端口和建立通信连接外,您还需要为每个端口配置消息接收时的处理逻辑。这可以通过在各个 ReceivePort 上调用 listen 方法来实现。

1、通过主 isolate 持有的工作 isolate 的 SendPort 发送消息。
2、在工作 isolate 的 ReceivePort 监听器上接收并处理消息,此处执行需要从主 isolate 移出的计算任务。
3、通过工作 isolate 持有的主 isolate 的 SendPort 发送返回消息。
4、在主 isolate 的 ReceivePort 监听器上接收返回消息。

基础端口(Basic ports)示例

这个示例演示了如何建立一个长期存活的工作 isolate,并实现其与主 isolate 之间的双向通信。代码示例展示了将 JSON 文本发送到新 isolate 进行解析和解码,然后再将结果返回主 isolate 的完整流程。

请注意

这个示例仅演示创建新 isolate 所需的核心功能,实现 isolate 间持续收发消息的能力。

该示例未包含生产环境所需的关键功能,如错误处理、端口关闭和消息序列化管理等。

下一节的 "稳健端口示例" 将涵盖这些功能,并讨论缺少这些功能可能引发的问题。

步骤 1:定义 worker 类

首先,为你的后台工作 isolate 创建一个类。该类需要包含以下功能:

  • 生成一个 isolate。

  • 向该 isolate 发送消息。

  • 使 isolate 能够解码JSON数据

  • 将解码后的 JSON 数据传回主 isolate

该类提供两个公共方法:一个用于生成工作 isolate,另一个用于向工作 isolate发送消息。

本示例的后续部分将逐步演示如何逐个实现这些类方法。

class Worker {Future<void> spawn() async {// TODO: 添加生成工作isolate的功能}void _handleResponsesFromIsolate(dynamic message) {// TODO: 处理来自工作isolate的返回消息}static void _startRemoteIsolate(SendPort port) {// TODO: 定义需在工作isolate上执行的代码}Future<void> parseJson(String message) async {// TODO: 定义一个可向工作isolate发送消息的公共方法}
}

步骤2:生成一个工作 isolate

Worker.spawn() 方法是用来封装创建工作 isolate 并确保其能收发消息的代码逻辑的:

  • 首先,创建一个 ReceivePort。这能让主 isolate 接收来自新创建的工作 isolate 的消息。
  • 接着,为接收端口(receive port)添加监听器,以处理工作 isolate 返回的消息。传给监听器的回调函数 _handleResponsesFromIsolate 将在步骤 4 讲解。
  • 最后,用 Isolate.spawn() 创建工作 isolate。它需要两个参数:一个在工作 isolate 上执行的函数(步骤 3 会讲到),以及接收端口(receive port)的 sendPort 属性。
Future<void> spawn() async {final receivePort = ReceivePort();receivePort.listen(_handleResponsesFromIsolate);await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

receivePort.sendPort 参数会在工作 isolate 上调用回调函数(_startRemoteIsolate)时作为参数传入。这是确保工作 isolate 能够向主 isolate 回传消息的第一步。

步骤3:在工作 isolate 上执行代码

在此步骤中,您需要定义 _startRemoteIsolate 方法,该方法会在工作 isolate 启动时被执行。这个方法相当于工作 isolate 的"主"方法。

  • 首先创建另一个新的 ReceivePort,用于接收来自主 isolate 的后续消息。
  • 接着将该端口的 SendPort 发送回主 isolate。
  • 最后为新的 ReceivePort 添加监听器,用于处理主 isolate 发送给工作 isolate 的消息。
static void _startRemoteIsolate(SendPort port) {final receivePort = ReceivePort();port.send(receivePort.sendPort);receivePort.listen((dynamic message) async {if (message is String) {final transformed = jsonDecode(message);port.send(transformed);}});
}

工作 isolate 中的 ReceivePort 监听器会解码从主 isolate 传递的 JSON 数据,然后将解码后的数据回传至主 isolate。

该监听器是主 isolate 向工作 isolate 发送消息的入口点,这也是您指定工作 isolate 后续执行代码的唯一机会

步骤4:在主 isolate 处理消息

最后,您需要让主 isolate 知道如何处理工作 isolate 回传的消息。为此,您需要完善 _handleResponsesFromIsolate 方法。回顾步骤 2 可知,该方法已作为参数传给了 receivePort.listen 方法:

Future<void> spawn() async {final receivePort = ReceivePort();receivePort.listen(_handleResponsesFromIsolate);await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

同时请回顾步骤 3 中您已将 SendPort 传回主 isolate。该方法既处理该 SendPort 的接收,也处理后续消息(即解码后的 JSON 数据)。

  • 首先检查消息是否为 SendPort。若是,则将该端口赋值给类的 _sendPort 属性,供后续发送消息使用。
  • 接着检查消息是否为 Map<String, dynamic> 类型(即预期的解码 JSON 类型)。若是,则按应用特定逻辑处理该消息(本示例中直接打印消息)。
void _handleResponsesFromIsolate(dynamic message) {if (message is SendPort) {_sendPort = message;_isolateReady.complete();} else if (message is Map<String, dynamic>) {print(message);}
}

步骤5:添加 completer 确保 isolate 初始化完成

为完善该类,需要定义一个名为 parseJson 的公共方法,该方法负责向工作 isolate 发送消息。同时需要确保在 isolate 完全初始化前不会发送消息。为此需使用 Completer。

  • 首先添加一个名为 _isolateReady 的 Completer 类属性。
  • 接着在 _handleResponsesFromIsolate 方法(步骤 4 创建)中,当收到 SendPort 时调用 completer 的 complete() 方法。
  • 最后在 parseJson 方法中,在调用 _sendPort.send 前添加 await _isolateReady.future,确保在工作 isolate 完成初始化并传回 SendPort 前不会发送任何消息。
Future<void> parseJson(String message) async {await _isolateReady.future;_sendPort.send(message);
}

完整示例

import 'dart:async';
import 'dart:convert';
import 'dart:isolate';void main() async {final worker = Worker();await worker.spawn();await worker.parseJson('{"key":"value"}');
}class Worker {late SendPort _sendPort;final Completer<void> _isolateReady = Completer.sync();Future<void> spawn() async {final receivePort = ReceivePort();receivePort.listen(_handleResponsesFromIsolate);await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);}void _handleResponsesFromIsolate(dynamic message) {if (message is SendPort) {_sendPort = message;_isolateReady.complete();} else if (message is Map<String, dynamic>) {print(message);}}static void _startRemoteIsolate(SendPort port) {final receivePort = ReceivePort();port.send(receivePort.sendPort);receivePort.listen((dynamic message) async {if (message is String) {final transformed = jsonDecode(message);port.send(transformed);}});}Future<void> parseJson(String message) async {await _isolateReady.future;_sendPort.send(message);}}

稳健端口(Robust ports)示例

前例演示了建立长期存活的 isolate 并实现双向通信所需的基础构件。但如前所述,该示例缺少一些重要功能:错误处理机制、闲置端口关闭能力,以及在特定情况下可能出现的消息顺序不一致问题。

本示例基于首个示例进行扩展,创建了一个具备更多功能且遵循更佳设计模式的长期存活工作 isolate。虽然代码与首个示例有相似之处,但并非其直接扩展。

提示

本示例假设您已熟悉如何使用 Isolate.spawn() 和端口建立 isolate 间的通信,此内容已在前面的示例中介绍过。

步骤1:定义工作 isolate 类

首先创建一个后台工作 isolate 类,该类包含以下所有功能:

  • 生成 isolate。
  • 向该 isolate 发送消息。
  • 使 isolate 解码 JSON 数据。
  • 将解码后的 JSON 数据回传至主 isolate。

该类公开三个公共方法:用于创建工作 isolate 的 createWorker()、处理消息发送的 sendMessage(),以及关闭闲置端口的 dispose() 方法。

class Worker {final SendPort _commands;final ReceivePort _responses;Future<Object?> parseJson(String message) async {// TODO: 确保端口仍处于开启状态_commands.send(message);}static Future<Worker> spawn() async {// TODO: 添加功能以创建一个新的Worker对象,该对象与生成的isolate建立连接throw UnimplementedError();}Worker._(this._responses, this._commands) {// TODO: 初始化主isolate接收端口监听器}void _handleResponsesFromIsolate(dynamic message) {// TODO: 处理工作 isolate 返回的消息}static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {// TODO: 处理从工作 isolate 发回的消息}static void _startRemoteIsolate(SendPort sp) {// TODO: 初始化工作 isolate 的通信端口}
}

提示

本示例中,SendPort 和 ReceivePort 实例遵循最佳实践命名规范:根据主 isolate 的关系命名。从主 isolate 通过 SendPort 发送到工作 isolate 的消息称为命令,而返回主 isolate 的消息称为响应。

步骤2:在 Worker.spawn() 方法中创建 RawReceivePort

在生成 isolate 前,需先创建 RawReceivePort,这是 ReceivePort 的底层实现。采用 RawReceivePort 是推荐做法,因其能将 isolate 的启动逻辑与消息传递处理逻辑分离。

在 Worker.spawn 方法中:

  • 首先,创建 RawReceivePort。此 ReceivePort 仅负责接收来自工作 isolate 的初始消息(将是一个SendPort)。
  • 接着,创建 Completer,用于指示 isolate 何时准备好接收消息。完成后将返回包含 ReceivePort 和 SendPort 的记录。
  • 然后,定义 RawReceivePort.handler 属性。该属性是一个 Function?,其行为类似于 ReceivePort.listener。当此端口接收到消息时会调用该函数。
  • 在 handler 函数内部,调用 connection.complete()。此方法需要传入包含 ReceivePort 和 SendPort 的记录作为参数。SendPort 是工作 isolate 发送的初始消息,下一步将把它赋值给名为 _commands 的类级别 SendPort。
  • 最后,使用 ReceivePort.fromRawReceivePort 构造函数创建新的 ReceivePort,并传入 initPort。
class Worker {final SendPort _commands;final ReceivePort _responses;static Future<Worker> spawn() async {// 创建接收端口并设置其初始消息处理器(handler)final initPort = RawReceivePort();final connection = Completer<(ReceivePort, SendPort)>.sync();initPort.handler = (initialMessage) {final commandPort = initialMessage as SendPort;connection.complete((ReceivePort.fromRawReceivePort(initPort),commandPort,));};}
}

通过先创建 RawReceivePort,再创建 ReceivePort,之后就能向 ReceivePort.listen 添加新的回调。相反,如果直接创建 ReceivePort,由于 ReceivePort 实现的是 Stream 而非 BroadcastStream,就只能添加一个 listener。

这种做法有效地将 isolate 启动逻辑与通信建立后的消息接收处理逻辑分离开来。随着其他方法中的逻辑增加,这一优势将更加明显。

步骤3:使用 Isolate.spawn 创建工作 isolate

本步骤继续完善 Worker.spawn 方法。您将添加创建 isolate 所需的代码,并返回该类的 Worker 实例。在此示例中,Isolate.spawn 调用被包裹在 try/catch 块中,确保当 isolate 启动失败时,initPort 会被关闭且 Worker 对象不会被创建。

  • 首先,在 try/catch 块中尝试创建工作 isolate。若失败则关闭上一步创建的接收端口。传递给 Isolate.spawn 的方法将在后续步骤说明。
  • 接着,await connection.future,并从返回的记录中解构出发送端口和接收端口。
  • 最后,通过调用 Worker 的私有构造函数并传入这些端口,返回 Worker 实例。
class Worker {final SendPort _commands;final ReceivePort _responses;static Future<Worker> spawn() async {// 创建接收端口并设置其初始消息处理器(handler)final initPort = RawReceivePort();final connection = Completer<(ReceivePort, SendPort)>.sync();initPort.handler = (initialMessage) {final commandPort = initialMessage as SendPort;connection.complete((ReceivePort.fromRawReceivePort(initPort),commandPort,));};// 生成isolate.try {await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));} on Object {initPort.close();rethrow;}final (ReceivePort receivePort, SendPort sendPort) =await connection.future;return Worker._(receivePort, sendPort);}
}

注意在此示例中(与前例相比),Worker.spawn 作为该类的异步静态构造函数,是创建 Worker 实例的唯一方式。这种设计简化了 API,使创建 Worker 实例的代码更加清晰。

步骤4:完成 isolate 设置流程

在此步骤中,您将完成基础的 isolate 设置流程。这与之前的示例几乎完全对应,没有新增概念。唯一的细微差别在于代码被拆分成了更多方法,这种设计实践为后续示例中逐步添加更多功能奠定了基础。如需了解设置 isolate 的基础流程详解,请参阅 “基础端口” 示例。

首先,创建一个由 Worker.spawn 方法返回的私有构造函数。在构造函数体内,为主 isolate 使用的接收端口添加监听器,并将一个尚未定义的方法 _handleResponsesFromIsolate 传递给该监听器。

class Worker {final SendPort _commands;final ReceivePort _responses;Worker._(this._responses, this._commands) {_responses.listen(_handleResponsesFromIsolate);}
}

接下来,在 _startRemoteIsolate 方法中添加负责初始化工作 isolate 端口的代码。请注意,该方法此前已通过 Worker.spawn 方法传递给 Isolate.spawn,并将接收主 isolate 的 SendPort 作为参数。

  • 新建一个 ReceivePort。
  • 将该端口的 SendPort 发回主 isolate。
  • 调用新方法 _handleCommandsToIsolate,并将新创建的 ReceivePort 和主 isolate 的 SendPort 作为参数传入。
static void _startRemoteIsolate(SendPort sendPort) {final receivePort = ReceivePort();sendPort.send(receivePort.sendPort);_handleCommandsToIsolate(receivePort, sendPort);
}

接下来添加 _handleCommandsToIsolate 方法,该方法负责接收来自主 isolate 的消息,在工作 isolate 上解码 json,并将解码后的 json 作为响应发送回去。

  • 首先,在工作 isolate 的 ReceivePort 上声明一个监听器。
  • 在添加到监听器的回调中,尝试在 try/catch 块中解码从主 isolate 传递的 JSON。如果解码成功,则将解码后的 JSON 发送回主 isolate。
  • 如果出现错误,则返回 RemoteError。
static void _handleCommandsToIsolate(ReceivePort receivePort,SendPort sendPort,
) {receivePort.listen((message) {try {final jsonData = jsonDecode(message as String);sendPort.send(jsonData);} catch (e) {sendPort.send(RemoteError(e.toString(), ''));}});
}

接下来,添加 _handleResponsesFromIsolate 方法的代码。

  • 首先,检查消息是否是 RemoteError,如果是则抛出该错误。
  • 否则,打印该消息。在后续步骤中,将更新此代码以返回消息而非打印。
void _handleResponsesFromIsolate(dynamic message) {if (message is RemoteError) {throw message;} else {print(message);}
}

最后,添加 parseJson 方法,这是一个公共方法,允许外部代码发送 JSON 到工作 isolate 进行解码。

Future<Object?> parseJson(String message) async {_commands.send(message);
}

你将在下一步更新此方法。

步骤5:同时处理多条消息

目前,如果快速向工作 isolate 发送多条消息,isolate 会按照完成顺序而非发送顺序返回解码后的 JSON 响应,导致无法确定响应与原始消息的对应关系。

本步骤将解决该问题,具体方式是为每条消息添加唯一 ID,并使用 Completer 对象来确保外部代码调用 parseJson 时能正确获取对应的响应结果。

首先,在Worker类中添加两个类级别的属性:

  • Map<int, Completer<Object?>> _activeRequests
  • int _idCounter
class Worker {final SendPort _commands;final ReceivePort _responses;final Map<int, Completer<Object?>> _activeRequests = {};int _idCounter = 0;// ···
}

_activeRequests 映射表用于关联发送到工作 isolate 的消息与对应的 Completer。其键值来自 _idCounter,该计数器会随消息发送递增。

接下来,更新 parseJson 方法,在向工作 isolate 发送消息前创建 completer。

  • 首先,创建 Completer 对象
  • 接下来,递增 _idCounter 确保每个 Completer 有唯一编号。
  • 向 _activeRequests 映射表添加条目,键为当前 _idCounter 值,值为 completer。
  • 将消息和 ID 一起发送到工作 isolate。由于 SendPort 每次只能传递一个值,需要将 ID 和消息包装成记录形式发送。
  • 最后,返回 completer 的 future,该 future 最终将包含来自工作 isolate 的响应结果。
Future<Object?> parseJson(String message) async {final completer = Completer<Object?>.sync();final id = _idCounter++;_activeRequests[id] = completer;_commands.send((id, message));return await completer.future;
}

同时需要更新 _handleResponsesFromIsolate 和 _handleCommandsToIsolate 方法以适配该机制。

在 _handleCommandsToIsolate 中,需要处理 message 作为包含两个值的记录(而非纯JSON文本)的情况,通过解构方式从 message 提取这些值。

然后,在解码 JSON 后,更新 sendPort.send 的调用,使用记录结构将 id 和解码后的 JSON 一起传回主 isolate。

static void _handleCommandsToIsolate(ReceivePort receivePort,SendPort sendPort,
) {receivePort.listen((message) {final (int id, String jsonText) = message as (int, String); // Newtry {final jsonData = jsonDecode(jsonText);sendPort.send((id, jsonData)); // Updated} catch (e) {sendPort.send((id, RemoteError(e.toString(), '')));}});
}

最后,更新 _handleResponsesFromIsolate 方法。

  • 首先,从 message 参数中解构出 id 和 response。
  • 然后,从 _activeRequests 映射中移除该请求对应的 completer。
  • 最后,不再抛出错误或打印解码结果,而是通过 completer.complete(response) 返回结果,最终该响应会返回到主 isolate 调用 parseJson 的代码处。
void _handleResponsesFromIsolate(dynamic message) {final (int id, Object? response) = message as (int, Object?); // Newfinal completer = _activeRequests.remove(id)!; // Newif (response is RemoteError) {completer.completeError(response); // Updated} else {completer.complete(response); // Updated}
}

步骤6:添加关闭端口的功能

当isolate不再使用时,应关闭主isolate和工作isolate的端口。

  • 首先,添加一个类级别的布尔变量来跟踪端口是否已关闭。
  • 然后,添加Worker.close方法,在该方法中:
    • 将 _closed 更新为 true。
    • 向工作 isolate 发送最后一条消息。该消息是一个内容为 "shutdown" 的 String,但也可以是您想要的任何对象。您将在下一个代码片段中使用它。
  • 最后,检查 _activeRequests 是否为空。如果是,则关闭主 isolate 中名为 _responses 的 ReceivePort。
class Worker {bool _closed = false;// ···void close() {if (!_closed) {_closed = true;_commands.send('shutdown');if (_activeRequests.isEmpty) _responses.close();print('--- port closed --- ');}}
}
  • 接下来,需要在工作 isolate 中处理 "shutdown" 消息。将以下代码添加到 _handleCommandsToIsolate 方法中。检查消息是否为 "shutdown" 的 String,如果是,则关闭工作 isolate 的 ReceivePort 并返回。
static void _handleCommandsToIsolate(ReceivePort receivePort,SendPort sendPort,
) {receivePort.listen((message) {// New if-block.if (message == 'shutdown') {receivePort.close();return;}final (int id, String jsonText) = message as (int, String);try {final jsonData = jsonDecode(jsonText);sendPort.send((id, jsonData));} catch (e) {sendPort.send((id, RemoteError(e.toString(), '')));}});
}
  • 最后,应在发送消息前添加检查端口是否关闭的代码。在 Worker.parseJson 方法中添加一行检查。
Future<Object?> parseJson(String message) async {if (_closed) throw StateError('Closed'); // Newfinal completer = Completer<Object?>.sync();final id = _idCounter++;_activeRequests[id] = completer;_commands.send((id, message));return await completer.future;
}

完整示例

import 'dart:async';
import 'dart:convert';
import 'dart:isolate';void main() async {final worker = await Worker.spawn();print(await worker.parseJson('{"key":"value"}'));print(await worker.parseJson('"banana"'));print(await worker.parseJson('[true, false, null, 1, "string"]'));print(await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]),);worker.close();
}class Worker {final SendPort _commands;final ReceivePort _responses;final Map<int, Completer<Object?>> _activeRequests = {};int _idCounter = 0;bool _closed = false;Future<Object?> parseJson(String message) async {if (_closed) throw StateError('Closed');final completer = Completer<Object?>.sync();final id = _idCounter++;_activeRequests[id] = completer;_commands.send((id, message));return await completer.future;}static Future<Worker> spawn() async {// Create a receive port and add its initial message handlerfinal initPort = RawReceivePort();final connection = Completer<(ReceivePort, SendPort)>.sync();initPort.handler = (initialMessage) {final commandPort = initialMessage as SendPort;connection.complete((ReceivePort.fromRawReceivePort(initPort),commandPort,));};// Spawn the isolate.try {await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));} on Object {initPort.close();rethrow;}final (ReceivePort receivePort, SendPort sendPort) =await connection.future;return Worker._(receivePort, sendPort);}Worker._(this._responses, this._commands) {_responses.listen(_handleResponsesFromIsolate);}void _handleResponsesFromIsolate(dynamic message) {final (int id, Object? response) = message as (int, Object?);final completer = _activeRequests.remove(id)!;if (response is RemoteError) {completer.completeError(response);} else {completer.complete(response);}if (_closed && _activeRequests.isEmpty) _responses.close();}static void _handleCommandsToIsolate(ReceivePort receivePort,SendPort sendPort,) {receivePort.listen((message) {if (message == 'shutdown') {receivePort.close();return;}final (int id, String jsonText) = message as (int, String);try {final jsonData = jsonDecode(jsonText);sendPort.send((id, jsonData));} catch (e) {sendPort.send((id, RemoteError(e.toString(), '')));}});}static void _startRemoteIsolate(SendPort sendPort) {final receivePort = ReceivePort();sendPort.send(receivePort.sendPort);_handleCommandsToIsolate(receivePort, sendPort);}void close() {if (!_closed) {_closed = true;_commands.send('shutdown');if (_activeRequests.isEmpty) _responses.close();print('--- port closed --- ');}}
}

相关文章:

  • 《C++ 模板:泛型编程的核心》
  • 【无人机】使用扩展卡尔曼滤波 (EKF) 算法来处理传感器测量,各传感器的参数设置,高度数据融合、不同传感器融合模式
  • 第十五届蓝桥杯 2024 C/C++组 下一次相遇
  • 逻辑回归:损失和正则化技术的深入研究
  • Git分支管理方案
  • 【Git】Git Revert 命令详解
  • 【springsecurity oauth2授权中心】jwt令牌更换成自省令牌 OpaqueToken P4
  • 前端频繁调用后端接口问题思考
  • 转型探讨:未来投资与布局
  • qt.tlsbackend.ossl: Failed to load libssl/libcrypto.
  • 【springsecurity oauth2授权中心】将硬编码的参数提出来放到 application.yml 里 P3
  • OpenCV --- 图像预处理(六)
  • 25、简述.NET程序集(Assembly)
  • JavaFX实战:从零打造一个功能丰富的“猜数字”游戏
  • ASP.Net Web Api如何更改URL
  • 解码思维链:AI思维链如何重塑人类与机器的对话逻辑
  • “思考更长时间”而非“模型更大”是提升模型在复杂软件工程任务中表现的有效途径 | 学术研究系列
  • 简化K8S部署流程:通过Apisix实现蓝绿发布策略详解(上)
  • 15.第二阶段x64游戏实战-分析怪物血量(遍历周围)
  • 多表查询之嵌套查询
  • 两部门发布“五一”假期全国森林草原火险形势预测
  • 经济日报社论:书写新征程上奋斗华章
  • 中央网信办部署开展“清朗·整治AI技术滥用”专项行动
  • 解放日报:上海深化改革开放,系统集成创新局
  • 量子传感新技术“攻克”退相干难题
  • 王毅:携手做世界和平与发展事业的中流砥柱