Reactor boundedElastic
boundedElastic
调度器专门用于处理无法避免的阻塞代码,而single
和parallel
调度器则不支持阻塞操作。因此,如果在single
或parallel
调度器上使用 Reactor 的阻塞 API(如block()
、blockFirst()
、blockLast()
)或通过toIterable()
、toStream()
进行迭代,会抛出IllegalStateException
异常。
自定义调度器也可以通过实现NonBlocking
标记接口来标记为“仅适用于非阻塞操作”。
1. boundedElastic
与 single
/parallel
的区别
-
boundedElastic
:- 是一个有界弹性线程池,适合处理阻塞 I/O 操作。
- 它允许任务排队并在线程可用时重新调度,避免过多线程消耗资源。
- 适用于遗留的阻塞代码,但不是必须的。
-
single
和parallel
:single
:提供一个可重用的线程,适合低延迟任务。parallel
:提供一个固定大小的线程池(通常等于 CPU 核心数),适合 CPU 密集型任务。- 但它们不支持阻塞操作,因为阻塞操作会阻塞线程,影响其他任务的执行。
结论:
boundedElastic
是唯一支持阻塞操作的调度器,而single
和parallel
不支持。
2. 为什么在 single
/parallel
上使用阻塞 API 会抛出异常?
- 阻塞操作会阻塞线程,而
single
和parallel
调度器是为非阻塞操作设计的。 - 如果在这些调度器上执行阻塞操作,会导致线程被阻塞,其他任务无法执行。
- 例如,使用
block()
、blockFirst()
、blockLast()
或toIterable()
、toStream()
进行迭代,会强制阻塞线程,违反了响应式流的非阻塞原则。 - Reactor 会检测到这种行为,并抛出
IllegalStateException
。
示例:
Mono.just("Hello").subscribeOn(Schedulers.single()) // 报错.block(); // 抛出 IllegalStateException
3. 如何标记自定义调度器为“仅适用于非阻塞操作”?
NonBlocking
标记接口:Reactor 提供了一个NonBlocking
标记接口,用于标识哪些线程可以安全地执行非阻塞操作。- 自定义线程:你可以创建一个线程,并实现
NonBlocking
接口,这样该线程就被标记为“仅适用于非阻塞操作”。 - 作用:确保在这些线程上不会执行阻塞操作,从而避免
IllegalStateException
。
示例:
Thread thread = new Thread(() -> {// 非阻塞操作
});
thread.setDaemon(true);
thread.start();
4. 总结
调度器 | 是否支持阻塞操作 | 用途 | 是否抛出异常(在阻塞操作下) |
---|---|---|---|
boundedElastic | ✅ | 阻塞 I/O 操作 | ❌(不抛出异常) |
single | ❌ | 低延迟任务 | ✅(抛出异常) |
parallel | ❌ | CPU 密集型任务 | ✅(抛出异常) |
关键点:
boundedElastic
是唯一支持阻塞操作的调度器。single
和parallel
不支持阻塞操作,否则会抛出异常。- 自定义调度器可以通过实现
NonBlocking
接口来标记为“仅适用于非阻塞操作”。
5. 相关引用
Schedulers.boundedElastic()
是为阻塞 I/O 操作设计的,而single
和parallel
不是 。- 在
single
和parallel
调度器上使用block()
、blockFirst()
、blockLast()
会抛出IllegalStateException
。 - 自定义调度器可以通过实现
NonBlocking
标记接口来标记为“仅适用于非阻塞操作” 。