响应式编程框架Reactor【7】
文章目录
- 十一、create方法
- 11.1 create方法概述
- 11.2 基本用法示例
- 11.2.1 简单的create示例
- 11.2.2 背压处理示例
- 11.3 高级用法与模式
- 11.3.1 事件监听器模式
- 11.3.2 多线程发射控制
- 11.3.3 资源管理与清理
- 11.4 create工作原理
- 11.4.1 create 执行流程
- 11.4.2 背压处理机制
- 11.5 应用场景与最佳实践
- 11.5.1 适用场景
- 11.5.2 最佳实践
- 11.5.3 完整示例:WebSocket 客户端
- 11.6 总结
- 十二、generate 方法
- 12.1 generate方法概述
- 12.2 generate与create的区别
- 12.3 基本使用示例
- 12.3.1 无状态generate
- 12.3.2 有状态的generate
- 12.3.3 带状态清理的generate
- 12.4 高级用法与模式
- 12.4.1 错误处理与重试机制
- 12.4.2 背压感知生成
- 12.4.3 复杂状态管理
- 12.5 generate工作原理
- 12.5.1 generate执行流程
- 12.5.2 generate 状态管理
- 12.6 应用场景与最佳实践
- 12.6.1 适用场景
- 12.6.2 最佳实践
- 12.6.3 文件读取器
- 12.7 总结
十一、create方法
Reactor 的 create
方法是一个高级的、灵活的 Flux 创建方法,它提供了对数据发射的完全控制能力。与 generate
方法不同,create
方法是异步友好的,并且可以处理多线程场景。
11.1 create方法概述
create
方法的主要特点:
- 异步友好:可以在多个线程中安全地发射数据
- 背压感知:可以响应消费者的背压请求
- 灵活控制:提供完整的 FluxSink API 来控制数据流
- 多线程安全:支持从多个线程并发发射数据
create 与 generate 的区别
11.2 基本用法示例
11.2.1 简单的create示例
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class BasicCreateExample {public static void main(String[] args) throws InterruptedException {// 基本create用法Flux<String> flux = Flux.create(sink -> {// 同步发射一些数据sink.next("Hello");sink.next("World");// 可以发射完成信号sink.complete();// 或者发射错误信号// sink.error(new RuntimeException("Something went wrong"));});flux.subscribe(value -> System.out.println("Received: " + value),error -> System.err.println("Error: " + error.getMessage()),() -> System.out.println("Completed"));// 异步create示例Flux<Integer> asyncFlux = Flux.create(sink -> {// 在另一个线程中发射数据Executors.newSingleThreadExecutor().submit(() -> {for (int i = 1; i <= 5; i++) {try {TimeUnit.MILLISECONDS.sleep(100);sink.next(i);} catch (InterruptedException e) {sink.error(e);return;}}sink.complete();});});asyncFlux.subscribe(value -> System.out.println("Async value: " + value),error -> System.err.println("Async error: " + error),() -> System.out.println("Async completed"));// 等待异步操作完成TimeUnit.SECONDS.sleep(1);}
}
11.2.2 背压处理示例
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.atomic.AtomicLong;public class BackpressureCreateExample {public static void main(String[] args) throws InterruptedException {// 创建支持背压的FluxFlux<Long> flux = Flux.create(sink -> {AtomicLong counter = new AtomicLong();// 注册背压请求处理器sink.onRequest(n -> {System.out.println("Requested: " + n + " elements");// 发射请求数量的元素for (long i = 0; i < n; i++) {long value = counter.getAndIncrement();if (value < 20) { // 限制总数sink.next(value);} else {sink.complete();break;}}});// 注册取消处理器sink.onCancel(() -> {System.out.println("Subscription cancelled");});// 注册处理完成处理器sink.onDispose(() -> {System.out.println("Flux disposed");});});// 使用不同的背压策略订阅flux.limitRate(5) // 每批请求5个元素.subscribe(value -> {System.out.println("Received: " + value);try {Thread.sleep(50); // 模拟处理延迟} catch (InterruptedException e) {Thread.currentThread().interrupt();}},error -> System.err.println("Error: " + error),() -> System.out.println("Completed"));Thread.sleep(2000);}
}
11.3 高级用法与模式
11.3.1 事件监听器模式
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;public class EventListenerExample {// 事件监听器接口interface EventListener {void onEvent(String event);void onError(Throwable error);void onComplete();}// 事件源类static class EventSource {private final List<EventListener> listeners = new CopyOnWriteArrayList<>();public void addListener(EventListener listener) {listeners.add(listener);}public void removeListener(EventListener listener) {listeners.remove(listener);}public void fireEvent(String event) {for (EventListener listener : listeners) {listener.onEvent(event);}}public void fireError(Throwable error) {for (EventListener listener : listeners) {listener.onError(error);}}public void fireComplete() {for (EventListener listener : listeners) {listener.onComplete();}}}public static void main(String[] args) throws InterruptedException {EventSource eventSource = new EventSource();// 使用create桥接传统事件监听器到响应式流Flux<String> eventFlux = Flux.create(sink -> {EventListener listener = new EventListener() {@Overridepublic void onEvent(String event) {sink.next(event);}@Overridepublic void onError(Throwable error) {sink.error(error);}@Overridepublic void onComplete() {sink.complete();}};// 注册监听器eventSource.addListener(listener);// 当Flux被取消或完成时,移除监听器sink.onDispose(() -> {eventSource.removeListener(listener);System.out.println("Listener removed");});});// 订阅事件流eventFlux.subscribe(event -> System.out.println("Event: " + event),error -> System.err.println("Error: " + error),() -> System.out.println("Event stream completed"));// 模拟事件产生for (int i = 1; i <= 5; i++) {Thread.sleep(100);eventSource.fireEvent("Event " + i);}// 完成事件流eventSource.fireComplete();Thread.sleep(100);}
}
11.3.2 多线程发射控制
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;public class MultiThreadCreateExample {public static void main(String[] args) throws InterruptedException {// 创建支持多线程发射的FluxFlux<Integer> flux = Flux.create(sink -> {AtomicInteger counter = new AtomicInteger(1);int threadCount = 3;// 创建多个生产者线程for (int i = 0; i < threadCount; i++) {final int threadId = i;Executors.newSingleThreadExecutor().submit(() -> {try {while (!sink.isCancelled() && counter.get() <= 20) {int value = counter.getAndIncrement();if (value > 20) break;sink.next(threadId * 100 + value);System.out.println("Thread " + threadId + " emitted: " + value);TimeUnit.MILLISECONDS.sleep(50);}// 检查是否所有线程都完成了if (counter.get() > 20) {sink.complete();}} catch (InterruptedException e) {sink.error(e);}});}// 处理背压sink.onRequest(n -> {System.out.println("Backpressure request: " + n);});// 处理取消sink.onCancel(() -> {System.out.println("Flux cancelled");});});// 订阅并限制消费速率flux.limitRate(2) // 每批请求2个元素.subscribe(value -> {System.out.println("Consumed: " + value);try {TimeUnit.MILLISECONDS.sleep(100); // 慢速消费者} catch (InterruptedException e) {Thread.currentThread().interrupt();}},error -> System.err.println("Error: " + error),() -> System.out.println("Completed"));Thread.sleep(5000);}
}
11.3.3 资源管理与清理
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class ResourceManagementExample {// 模拟需要管理的资源static class DatabaseConnection implements Closeable {private boolean connected = true;public String query(int id) {if (!connected) {throw new IllegalStateException("Connection closed");}return "Result for id " + id;}@Overridepublic void close() throws IOException {connected = false;System.out.println("Database connection closed");}}public static void main(String[] args) throws InterruptedException {Flux<String> databaseFlux = Flux.create(sink -> {DatabaseConnection connection = new DatabaseConnection();// 安排定期查询var scheduler = Executors.newSingleThreadScheduledExecutor();var future = scheduler.scheduleAtFixedRate(() -> {try {if (sink.isCancelled()) {scheduler.shutdown();return;}// 模拟查询String result = connection.query((int) (Math.random() * 10));sink.next(result);} catch (Exception e) {sink.error(e);scheduler.shutdown();}}, 0, 100, TimeUnit.MILLISECONDS);// 注册清理回调sink.onDispose(() -> {try {future.cancel(true);scheduler.shutdown();connection.close();System.out.println("Resources cleaned up");} catch (IOException e) {System.err.println("Error closing connection: " + e.getMessage());}});});// 订阅并只取前5个结果databaseFlux.take(5).subscribe(result -> System.out.println("Received: " + result),error -> System.err.println("Error: " + error),() -> System.out.println("Query completed"));Thread.sleep(1000);}
}
11.4 create工作原理
11.4.1 create 执行流程
11.4.2 背压处理机制
11.5 应用场景与最佳实践
11.5.1 适用场景
- 事件驱动架构:将传统事件监听器转换为响应式流
- 异步API桥接:包装回调式或Future-based的API
- 多数据源聚合:从多个来源聚合数据到一个流中
- 自定义背压策略:实现特定的背压处理逻辑
- 资源密集型操作:需要精细控制资源生命周期的场景
11.5.2 最佳实践
资源管理
Flux.create(sink -> {Resource resource = new Resource();// 使用资源sink.next(resource.getData());// 注册清理回调sink.onDispose(() -> {try {resource.close();} catch (Exception e) {// 处理清理异常}});
});
背压处理
Flux.create(sink -> {sink.onRequest(n -> {// 按请求数量发射数据for (int i = 0; i < n; i++) {if (hasMoreData()) {sink.next(getNextData());} else {sink.complete();break;}}});
});
错误处理
Flux.create(sink -> {try {// 可能抛出异常的操作processData(sink);} catch (Exception e) {// 发射错误而不是抛出异常sink.error(e);}
});
多线程安全
Flux.create(sink -> {// 使用线程安全的数据结构AtomicInteger counter = new AtomicInteger();// 在多线程中安全地发射数据executor.submit(() -> {while (!sink.isCancelled()) {int value = counter.incrementAndGet();sink.next(value);}});
});
取消感知
Flux.create(sink -> {Runnable task = () -> {while (!sink.isCancelled()) {// 检查是否已取消Data data = getData();if (data != null) {sink.next(data);} else {sink.complete();break;}}};Thread thread = new Thread(task);thread.start();// 取消时中断线程sink.onDispose(() -> {thread.interrupt();});
});
11.5.3 完整示例:WebSocket 客户端
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;public class WebSocketExample {@ClientEndpointpublic static class WebSocketClient {private Session session;private FluxSink<String> sink;private CountDownLatch latch = new CountDownLatch(1);public WebSocketClient(FluxSink<String> sink) {this.sink = sink;}@OnOpenpublic void onOpen(Session session) {this.session = session;latch.countDown();sink.next("Connected to WebSocket");}@OnMessagepublic void onMessage(String message) {sink.next("Received: " + message);}@OnErrorpublic void onError(Throwable error) {sink.error(error);}@OnClosepublic void onClose(CloseReason reason) {sink.next("Connection closed: " + reason.getReasonPhrase());sink.complete();}public void sendMessage(String message) throws IOException {try {latch.await(); // 等待连接建立session.getBasicRemote().sendText(message);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IOException("Interrupted while waiting for connection", e);}}public void close() throws IOException {if (session != null) {session.close();}}}public static Flux<String> createWebSocketFlux(String url) {return Flux.create(sink -> {try {WebSocketClient client = new WebSocketClient(sink);WebSocketContainer container = ContainerProvider.getWebSocketContainer();container.connectToServer(client, URI.create(url));// 注册清理回调sink.onDispose(() -> {try {client.close();} catch (IOException e) {System.err.println("Error closing WebSocket: " + e.getMessage());}});// 允许从外部发送消息(通过context或side channel)sink.onRequest(n -> {// 可以在这里发送初始消息等if (n > 0) {try {client.sendMessage("Hello Server!");} catch (IOException e) {sink.error(e);}}});} catch (Exception e) {sink.error(e);}});}public static void main(String[] args) throws InterruptedException {// 注意:这需要实际的WebSocket服务器才能运行/*Flux<String> webSocketFlux = createWebSocketFlux("ws://localhost:8080/ws");webSocketFlux.take(10) // 只取前10条消息.subscribe(message -> System.out.println("WebSocket: " + message),error -> System.err.println("WebSocket error: " + error),() -> System.out.println("WebSocket completed"));Thread.sleep(10000);*/}
}
11.6 总结
Reactor的create
方法是一个强大而灵活的工具,适用于创建复杂的、异步的数据流。通过本文的详细讲解和丰富示例,我们可以看到:
- 核心特性:异步友好、背压感知、多线程安全
- 基本用法:从简单创建到复杂的异步数据流
- 高级模式:事件监听器桥接、多线程发射、资源管理
- 背压处理:通过
onRequest
回调实现消费者驱动的数据流 - 应用场景:WebSocket、事件系统、异步API桥接等
create
方法特别适合需要从多个线程发射数据或需要与现有异步API集成的场景。它提供了比generate
更强大的控制能力,但也需要开发者承担更多的责任,特别是资源管理和背压处理方面。
通过遵循最佳实践,特别是资源清理、错误处理和背压管理,可以创建出高效、健壮的响应式数据流。create
方法是Reactor高级用法的体现,掌握了它就能够解决复杂的响应式编程挑战
十二、generate 方法
Reactor 的 generate
方法是一个强大的工具,用于创建复杂的、有状态的序列生成器。与 create
方法不同,generate
是同步的、逐个元素生成的,并且可以维护状态。
12.1 generate方法概述
generate
方法有三种重载形式,适用于不同的场景:
- 无状态生成:generate(Consumer<SynchronousSink<T>> generator)
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator) {Objects.requireNonNull(generator, "generator");return onAssembly(new FluxGenerate<>(generator));
}
- 有状态生成:generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) {return onAssembly(new FluxGenerate<>(stateSupplier, generator));}
- 带状态清理的生成:generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer) {return onAssembly(new FluxGenerate<>(stateSupplier, generator, stateConsumer));
}
12.2 generate与create的区别
12.3 基本使用示例
12.3.1 无状态generate
// 参数说明
// generator – SynchronousSink 使用 Reactor 提供的每个订阅者,在每次传递中生成 单个 信号
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator) {Objects.requireNonNull(generator, "generator");return onAssembly(new FluxGenerate<>(generator));
}
使用示例
package cn.tcmeta.generate;import reactor.core.publisher.Flux;/*** @author: laoren* @date: 2025/8/27 14:10* @description: 无状态 generate* @version: 1.0.0*/
public class StatelessGenerateExample {public static void main(String[] args) {// 1. 无状态generate - 生成5个随机数Flux<Double> randomNumber = Flux.generate(sink -> {double n = Math.random();sink.next(n);// 模拟条件完成if (n > 0.8) {sink.complete();}});// 2. 订阅randomNumber.subscribe(System.out::println,Throwable::printStackTrace,() -> System.out.println("Done"));// 生成固定的元素Flux<String> fixedCountStr = Flux.generate(sink -> {// 这种方式无法控制数量,因为无法维护状态// 需要使用有状态generatesink.next("Hello 🍋🍋");// 这里会无限生成1,因为没有终止条件});// 只取三个元素即可fixedCountStr.take(3).subscribe(System.out::println);}
}
0.23585869887623168
0.07111979487714148
0.16405884193127096
0.4672955468599558
0.1245039338107009
0.3365117658051757
0.41334625983368756
0.7453378381025055
0.9330594906646099
Done
Hello 🍋🍋
Hello 🍋🍋
Hello 🍋🍋
12.3.2 有状态的generate
// 参数说明:
// 1. stateSupplier – 调用每个传入用户为生成器双函数提供初始状态
// 2. generator– 使用 SynchronousSink Reactor 提供的每个订阅者以及当前状态,以在每次传递时生成 单个 信号并返回(新)状态。
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) {return onAssembly(new FluxGenerate<>(stateSupplier, generator));
}
package cn.tcmeta.generate;import reactor.core.publisher.Flux;/*** @author: laoren* @date: 2025/8/27 14:26* @description: 有状态 generate* @version: 1.0.0*/
public class StatefulGenerateExample {public static void main(String[] args) {// 示例1: 生成斐波那契数列Flux<Long> fibonacci = Flux.generate(() -> new Long[]{0L, 1L}, // 初始状态[prev, current](state, sink) -> {long nextValue = state[0];sink.next(nextValue);// 更新状态[current, prev + current]state[0] = state[1];state[1] = nextValue + state[1];return state;});// 取前10个斐波那契数fibonacci.take(10).subscribe(value -> System.out.println("Fibonacci: " + value));}
}
有限序列
package cn.tcmeta.generate;import reactor.core.publisher.Flux;/*** @author: laoren* @description: 生成有限序列* @version: 1.0.0*/
public class StatefulGenerateExample2 {public static void main(String[] args) {var limitedSequence = Flux.generate(() -> 1, // 初始状态: 计数器1, 初始值可以是任意类型的值(state, sink) -> {if (state <= 5) {sink.next(state);return state + 1;} else {sink.complete();return state;}});limitedSequence.subscribe(System.out::println,error -> System.out.println("Error: " + error),() -> System.out.println("Sequence completed"));}
}
使用对象作为状态
package cn.tcmeta.generate;import reactor.core.publisher.Flux;import java.util.concurrent.atomic.AtomicInteger;/*** @author: laoren* @date: 2025/8/27 15:05* @description: TODO* @version: 1.0.0*/
public class StatefulGenerateExample3 {public static void main(String[] args) {Flux<Object> objectStateExample = Flux.generate(() -> new AtomicInteger(0), // 初始状态(state, sink) -> {int value = state.getAndIncrement();if (value < 3) {sink.next(value);return state;} else {sink.complete();return state;}});// 订阅一下objectStateExample.subscribe(System.out::println);}
}
12.3.3 带状态清理的generate
package cn.tcmeta.generate;import reactor.core.publisher.Flux;import java.io.BufferedReader;
import java.io.StringReader;
import java.util.concurrent.CountDownLatch;/*** @author: laoren* @date: 2025/8/27 15:10* @description: 带状态清理的generate* @version: 1.0.0*/
public class GenerateWithCleanupExample {public static void main(String[] args) {String content = "Line 1\nLine 2\nLine 3\nLine 4\nLine 5";Flux<String> lines = Flux.generate(// 模拟文件内容// 状态供应器 - 创建BufferedReader() -> new BufferedReader(new StringReader(content)),// 生成器函数 - 读取每一行(reader, sink) -> {try {String line = reader.readLine();if (line == null) {sink.complete(); // 文件读取完成} else {sink.next(line);}} catch (Exception e) {sink.error(e);}return reader;},reader -> {try {reader.close();System.out.println("Reader closed");} catch (Exception e) {System.out.println("Error closing reader");}});// 状态清理器 - 关闭BufferedReaderlines.subscribe(line -> System.out.println("Read: " + line),error -> System.err.println("Error: " + error),() -> System.out.println("File reading completed"));}
}
Read: Line 1
Read: Line 2
Read: Line 3
Read: Line 4
Read: Line 5
File reading completed
Reader closed
数据连接清理
package cn.tcmeta.generate;import reactor.core.publisher.Flux;/*** @author: laoren* @description: 数据库连接清理操作* @version: 1.0.0*/
public class GenerateWithCleanupExample2 {public static void main(String[] args) {// 另一个示例: 数据库连接清理Flux<String> dbData = Flux.generate(() -> {System.out.println("Creating database connection");return new DatabaseConnection(3); // 模拟数据库连接},(connection, sink) -> {String data = connection.fetchNext();if (data != null) {sink.next(data);} else {sink.complete();}return connection;},connection -> {System.out.println("Closing database connection");connection.close();});dbData.subscribe(System.out::println,error -> System.out.println("Error: " + error),() -> System.out.println("Sequence completed"));}}// 模拟数据库连接类
class DatabaseConnection {private int count = 0;public DatabaseConnection(int count) {this.count = count;System.out.println("Database connection created");}public String fetchNext() {if (count < 5) {return "Record " + (++count);}return null;}public void close() {System.out.println("Database connection closed");}
}
Creating database connection
Database connection created
Record 4
Record 5
Sequence completed
Closing database connection
Database connection closed
12.4 高级用法与模式
12.4.1 错误处理与重试机制
package cn.tcmeta.generate;import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;import java.util.concurrent.atomic.AtomicInteger;public class GenerateErrorHandling {public static void main(String[] args) {// 示例: 带错误处理的generateFlux<String> withErrorHandling = Flux.generate(() -> new AtomicInteger(0),(counter, sink) -> {int attempt = counter.getAndIncrement();try {if (attempt == 2) {throw new RuntimeException("Simulated error on attempt 2");}if (attempt < 5) {sink.next("Value " + attempt);} else {sink.complete();}} catch (Exception e) {// 错误处理策略1: 发射错误并终止// sink.error(e);// 错误处理策略2: 跳过错误继续处理System.err.println("Error on attempt " + attempt + ": " + e.getMessage());// 不调用sink.error,继续返回状态}return counter;});withErrorHandling.subscribe(value -> System.out.println("Received: " + value),error -> System.err.println("Subscription error: " + error),() -> System.out.println("Completed successfully"));System.out.println("----------------------------------------");// 示例: 带重试机制的generateFlux<Integer> withRetry = Flux.generate(() -> new StateWithRetry(0, 0),(state, sink) -> {try {if (state.retryCount > 2) {sink.error(new RuntimeException("Max retries exceeded"));return state;}// 模拟可能失败的操作if (state.value == 2 && state.retryCount < 2) {throw new RuntimeException("Temporary failure");}sink.next(state.value);state.value++;if (state.value >= 5) {sink.complete();}} catch (Exception e) {System.out.println("Operation failed, retrying...");state.retryCount++;// 不发射元素,保持状态不变}return state;});withRetry.subscribe(value -> System.out.println("Value: " + value),error -> System.err.println("Failed: " + error.getMessage()),() -> System.out.println("With retry completed"));}static class StateWithRetry {int value;int retryCount;StateWithRetry(int value, int retryCount) {this.value = value;this.retryCount = retryCount;}}
}
12.4.2 背压感知生成
package cn.tcmeta.generate;import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import java.util.concurrent.atomic.AtomicLong;public class GenerateBackpressureAware {public static void main(String[] args) {// 示例: 背压感知的生成器Flux<Long> backpressureAware = Flux.generate(() -> new AtomicLong(0L),(counter, sink) -> {long value = counter.getAndIncrement();// 模拟资源密集型操作try {Thread.sleep(100); // 模拟处理延迟} catch (InterruptedException e) {Thread.currentThread().interrupt();sink.error(e);return counter;}sink.next(value);// 限制生成速率 - 只生成100个元素if (value >= 100) {sink.complete();}return counter;});// 使用不同的背压策略订阅backpressureAware.onBackpressureBuffer(10) // 缓冲区大小为10.subscribe(value -> {// 模拟慢速消费者try {Thread.sleep(200);} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("Processed: " + value);},error -> System.err.println("Error: " + error),() -> System.out.println("Backpressure example completed"));// 另一个示例: 有条件生成Flux<Integer> conditionalGenerate = Flux.generate(() -> 0,(state, sink) -> {// 只有在有需求时才生成元素// 注意: generate是同步的,但我们可以模拟背压感知if (state < 20) {sink.next(state);return state + 1;} else {sink.complete();return state;}});// 使用limitRate控制消费速率conditionalGenerate.limitRate(5) // 每批请求5个元素.subscribe(value -> {System.out.println("Conditional: " + value);try {Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}});try {Thread.sleep(5000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
12.4.3 复杂状态管理
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import java.util.*;public class ComplexStateGenerate {public static void main(String[] args) {// 示例: 分页数据获取Flux<List<String>> pagedData = Flux.generate(() -> new PagingState(0, 3), // 初始状态: 页码0, 每页3条(state, sink) -> {System.out.println("Fetching page " + state.page);// 模拟分页API调用List<String> pageData = fetchPageData(state.page, state.pageSize);if (pageData.isEmpty()) {sink.complete(); // 没有更多数据} else {sink.next(pageData);state.page++; // 下一页}return state;},state -> System.out.println("Paging completed at page " + state.page));pagedData.subscribe(page -> System.out.println("Received page: " + page),error -> System.err.println("Error: " + error),() -> System.out.println("All pages fetched"));// 示例: 状态机实现Flux<String> stateMachine = Flux.generate(() -> new TrafficLightState("RED"),(state, sink) -> {sink.next(state.color);try {Thread.sleep(state.duration); // 模拟状态持续时间} catch (InterruptedException e) {Thread.currentThread().interrupt();sink.error(e);return state;}// 状态转换switch (state.color) {case "RED":state.color = "GREEN";state.duration = 5000;break;case "GREEN":state.color = "YELLOW";state.duration = 2000;break;case "YELLOW":state.color = "RED";state.duration = 3000;break;}return state;});// 取10个状态变化stateMachine.take(10).subscribe(color -> System.out.println("Traffic light: " + color));try {Thread.sleep(30000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}// 模拟分页数据获取private static List<String> fetchPageData(int page, int pageSize) {// 模拟数据源List<String> allData = Arrays.asList("A1", "A2", "A3", "B1", "B2", "B3", "C1", "C2", "C3", "D1");int start = page * pageSize;if (start >= allData.size()) {return Collections.emptyList();}int end = Math.min(start + pageSize, allData.size());return allData.subList(start, end);}// 分页状态类static class PagingState {int page;int pageSize;PagingState(int page, int pageSize) {this.page = page;this.pageSize = pageSize;}}// 交通灯状态类static class TrafficLightState {String color;long duration;TrafficLightState(String color) {this.color = color;// 设置初始持续时间this.duration = color.equals("RED") ? 3000 : color.equals("GREEN") ? 5000 : 2000;}}
}
12.5 generate工作原理
12.5.1 generate执行流程
12.5.2 generate 状态管理
12.6 应用场景与最佳实践
12.6.1 适用场景
- 分页数据获取:从数据库或API分页获取数据
- 状态机实现:如工作流、协议处理等
- 资源迭代:文件读取、数据库游标处理等
- 序列生成:数学序列(斐波那契、素数等)
- 定时器/计数器:基于状态的定时事件生成
12.6.2 最佳实践
- 状态设计:
- 使用不可变状态或防御性拷贝避免意外修改
- 确保状态对象是线程安全的(尽管generate是同步的)
- 错误处理:
- 在生成器内部处理可恢复错误
- 使用sink.error()处理不可恢复错误
- 资源管理:
- 总是提供状态清理器来释放资源
- 确保在错误情况下也能清理资源
- 背压考虑:
- generate是同步的,但应考虑消费者的处理能力
- 避免在生成器中执行耗时操作
- 测试策略:
- 使用StepVerifier测试generate生成的流
- 验证状态转换和清理逻辑
12.6.3 文件读取器
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;public class FileReaderExample {public static Flux<String> readFileLines(String filePath) {return Flux.generate(() -> {try {Path path = Paths.get(filePath);return Files.newBufferedReader(path);} catch (IOException e) {throw new RuntimeException("Failed to open file: " + filePath, e);}},(reader, sink) -> {try {String line = reader.readLine();if (line != null) {sink.next(line);} else {sink.complete();}} catch (IOException e) {sink.error(new RuntimeException("Error reading file", e));}return reader;},reader -> {try {reader.close();System.out.println("File reader closed");} catch (IOException e) {System.err.println("Error closing file reader: " + e.getMessage());}});}public static void main(String[] args) {// 创建一个临时文件用于演示Path tempFile;try {tempFile = Files.createTempFile("reactor-generate", ".txt");Files.write(tempFile, Arrays.asList("Line 1", "Line 2", "Line 3", "Line 4", "Line 5"));// 读取文件readFileLines(tempFile.toString()).subscribe(line -> System.out.println("Read: " + line),error -> System.err.println("Error: " + error.getMessage()),() -> System.out.println("File reading completed"));// 删除临时文件Files.deleteIfExists(tempFile);} catch (IOException e) {e.printStackTrace();}}
}
12.7 总结
Reactor的generate
方法是一个强大而灵活的工具,适用于创建有状态的序列生成器。
- 三种形式:无状态、有状态和带清理的有状态generate
- 状态管理:如何设计和管理生成器状态
- 错误处理:在生成器中处理异常的策略
- 资源清理:确保资源正确释放的重要性
- 应用场景:分页数据、状态机、文件读取等实用案例
generate
方法特别适合需要维护状态 between元素生成的场景,它提供了比create
更简单、更可控的同步生成方式。通过合理设计状态对象和生成器逻辑,可以创建出高效、可靠的响应式数据流。
记住最佳实践,特别是在资源管理和错误处理方面,可以确保使用generate
创建的流既高效又健壮