十一、create方法
Reactor 的 create
方法是一个高级的、灵活的 Flux 创建方法,它提供了对数据发射的完全控制能力。与 generate
方法不同,create
方法是异步友好的,并且可以处理多线程场景。
11.1 create方法概述
create
方法的主要特点:
- 异步友好:可以在多个线程中安全地发射数据
- 背压感知:可以响应消费者的背压请求
- 灵活控制:提供完整的 FluxSink API 来控制数据流
- 多线程安全:支持从多个线程并发发射数据
create 与 generate 的区别
11.2 基本用法示例
11.2.1 简单的create示例
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class BasicCreateExample {
public static void main(String[] args) throws InterruptedException {
// 基本create用法
Flux<String> flux = Flux.create(sink -> {
// 同步发射一些数据
sink.next("Hello");
sink.next("World");
// 可以发射完成信号
sink.complete();
// 或者发射错误信号
// sink.error(new RuntimeException("Something went wrong"));
});
flux.subscribe(
value -> System.out.println("Received: " + value),
error -> System.err.println("Error: " + error.getMessage()),
() -> System.out.println("Completed")
);
// 异步create示例
Flux<Integer> asyncFlux = Flux.create(sink -> {
// 在另一个线程中发射数据
Executors.newSingleThreadExecutor().submit(() -> {
for (int i = 1; i <= 5; i++) {
try {
TimeUnit.MILLISECONDS.sleep(100);
sink.next(i);
} catch (InterruptedException e) {
sink.error(e);
return;
}
}
sink.complete();
});
});
asyncFlux.subscribe(
value -> System.out.println("Async value: " + value),
error -> System.err.println("Async error: " + error),
() -> System.out.println("Async completed")
);
// 等待异步操作完成
TimeUnit.SECONDS.sleep(1);
}
}
11.2.2 背压处理示例
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.atomic.AtomicLong;
public class BackpressureCreateExample {
public static void main(String[] args) throws InterruptedException {
// 创建支持背压的Flux
Flux<Long> flux = Flux.create(sink -> {
AtomicLong counter = new AtomicLong();
// 注册背压请求处理器
sink.onRequest(n -> {
System.out.println("Requested: " + n + " elements");
// 发射请求数量的元素
for (long i = 0; i < n; i++) {
long value = counter.getAndIncrement();
if (value < 20) { // 限制总数
sink.next(value);
} else {
sink.complete();
break;
}
}
});
// 注册取消处理器
sink.onCancel(() -> {
System.out.println("Subscription cancelled");
});
// 注册处理完成处理器
sink.onDispose(() -> {
System.out.println("Flux disposed");
});
});
// 使用不同的背压策略订阅
flux
.limitRate(5) // 每批请求5个元素
.subscribe(
value -> {
System.out.println("Received: " + value);
try {
Thread.sleep(50); // 模拟处理延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
},
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
Thread.sleep(2000);
}
}
11.3 高级用法与模式
11.3.1 事件监听器模式
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class EventListenerExample {
// 事件监听器接口
interface EventListener {
void onEvent(String event);
void onError(Throwable error);
void onComplete();
}
// 事件源类
static class EventSource {
private final List<EventListener> listeners = new CopyOnWriteArrayList<>();
public void addListener(EventListener listener) {
listeners.add(listener);
}
public void removeListener(EventListener listener) {
listeners.remove(listener);
}
public void fireEvent(String event) {
for (EventListener listener : listeners) {
listener.onEvent(event);
}
}
public void fireError(Throwable error) {
for (EventListener listener : listeners) {
listener.onError(error);
}
}
public void fireComplete() {
for (EventListener listener : listeners) {
listener.onComplete();
}
}
}
public static void main(String[] args) throws InterruptedException {
EventSource eventSource = new EventSource();
// 使用create桥接传统事件监听器到响应式流
Flux<String> eventFlux = Flux.create(sink -> {
EventListener listener = new EventListener() {
@Override
public void onEvent(String event) {
sink.next(event);
}
@Override
public void onError(Throwable error) {
sink.error(error);
}
@Override
public void onComplete() {
sink.complete();
}
};
// 注册监听器
eventSource.addListener(listener);
// 当Flux被取消或完成时,移除监听器
sink.onDispose(() -> {
eventSource.removeListener(listener);
System.out.println("Listener removed");
});
});
// 订阅事件流
eventFlux.subscribe(
event -> System.out.println("Event: " + event),
error -> System.err.println("Error: " + error),
() -> System.out.println("Event stream completed")
);
// 模拟事件产生
for (int i = 1; i <= 5; i++) {
Thread.sleep(100);
eventSource.fireEvent("Event " + i);
}
// 完成事件流
eventSource.fireComplete();
Thread.sleep(100);
}
}
11.3.2 多线程发射控制
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MultiThreadCreateExample {
public static void main(String[] args) throws InterruptedException {
// 创建支持多线程发射的Flux
Flux<Integer> flux = Flux.create(sink -> {
AtomicInteger counter = new AtomicInteger(1);
int threadCount = 3;
// 创建多个生产者线程
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
Executors.newSingleThreadExecutor().submit(() -> {
try {
while (!sink.isCancelled() && counter.get() <= 20) {
int value = counter.getAndIncrement();
if (value > 20) break;
sink.next(threadId * 100 + value);
System.out.println("Thread " + threadId + " emitted: " + value);
TimeUnit.MILLISECONDS.sleep(50);
}
// 检查是否所有线程都完成了
if (counter.get() > 20) {
sink.complete();
}
} catch (InterruptedException e) {
sink.error(e);
}
});
}
// 处理背压
sink.onRequest(n -> {
System.out.println("Backpressure request: " + n);
});
// 处理取消
sink.onCancel(() -> {
System.out.println("Flux cancelled");
});
});
// 订阅并限制消费速率
flux
.limitRate(2) // 每批请求2个元素
.subscribe(
value -> {
System.out.println("Consumed: " + value);
try {
TimeUnit.MILLISECONDS.sleep(100); // 慢速消费者
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
},
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
Thread.sleep(5000);
}
}
11.3.3 资源管理与清理
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ResourceManagementExample {
// 模拟需要管理的资源
static class DatabaseConnection implements Closeable {
private boolean connected = true;
public String query(int id) {
if (!connected) {
throw new IllegalStateException("Connection closed");
}
return "Result for id " + id;
}
@Override
public void close() throws IOException {
connected = false;
System.out.println("Database connection closed");
}
}
public static void main(String[] args) throws InterruptedException {
Flux<String> databaseFlux = Flux.create(sink -> {
DatabaseConnection connection = new DatabaseConnection();
// 安排定期查询
var scheduler = Executors.newSingleThreadScheduledExecutor();
var future = scheduler.scheduleAtFixedRate(() -> {
try {
if (sink.isCancelled()) {
scheduler.shutdown();
return;
}
// 模拟查询
String result = connection.query((int) (Math.random() * 10));
sink.next(result);
} catch (Exception e) {
sink.error(e);
scheduler.shutdown();
}
}, 0, 100, TimeUnit.MILLISECONDS);
// 注册清理回调
sink.onDispose(() -> {
try {
future.cancel(true);
scheduler.shutdown();
connection.close();
System.out.println("Resources cleaned up");
} catch (IOException e) {
System.err.println("Error closing connection: " + e.getMessage());
}
});
});
// 订阅并只取前5个结果
databaseFlux
.take(5)
.subscribe(
result -> System.out.println("Received: " + result),
error -> System.err.println("Error: " + error),
() -> System.out.println("Query completed")
);
Thread.sleep(1000);
}
}
11.4 create工作原理
11.4.1 create 执行流程
11.4.2 背压处理机制
11.5 应用场景与最佳实践
11.5.1 适用场景
- 事件驱动架构:将传统事件监听器转换为响应式流
- 异步API桥接:包装回调式或Future-based的API
- 多数据源聚合:从多个来源聚合数据到一个流中
- 自定义背压策略:实现特定的背压处理逻辑
- 资源密集型操作:需要精细控制资源生命周期的场景
11.5.2 最佳实践
资源管理
Flux.create(sink -> {
Resource resource = new Resource();
// 使用资源
sink.next(resource.getData());
// 注册清理回调
sink.onDispose(() -> {
try {
resource.close();
} catch (Exception e) {
// 处理清理异常
}
});
});
背压处理
Flux.create(sink -> {
sink.onRequest(n -> {
// 按请求数量发射数据
for (int i = 0; i < n; i++) {
if (hasMoreData()) {
sink.next(getNextData());
} else {
sink.complete();
break;
}
}
});
});
错误处理
Flux.create(sink -> {
try {
// 可能抛出异常的操作
processData(sink);
} catch (Exception e) {
// 发射错误而不是抛出异常
sink.error(e);
}
});
多线程安全
Flux.create(sink -> {
// 使用线程安全的数据结构
AtomicInteger counter = new AtomicInteger();
// 在多线程中安全地发射数据
executor.submit(() -> {
while (!sink.isCancelled()) {
int value = counter.incrementAndGet();
sink.next(value);
}
});
});
取消感知
Flux.create(sink -> {
Runnable task = () -> {
while (!sink.isCancelled()) {
// 检查是否已取消
Data data = getData();
if (data != null) {
sink.next(data);
} else {
sink.complete();
break;
}
}
};
Thread thread = new Thread(task);
thread.start();
// 取消时中断线程
sink.onDispose(() -> {
thread.interrupt();
});
});
11.5.3 完整示例:WebSocket 客户端
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
public class WebSocketExample {
@ClientEndpoint
public static class WebSocketClient {
private Session session;
private FluxSink<String> sink;
private CountDownLatch latch = new CountDownLatch(1);
public WebSocketClient(FluxSink<String> sink) {
this.sink = sink;
}
@OnOpen
public void onOpen(Session session) {
this.session = session;
latch.countDown();
sink.next("Connected to WebSocket");
}
@OnMessage
public void onMessage(String message) {
sink.next("Received: " + message);
}
@OnError
public void onError(Throwable error) {
sink.error(error);
}
@OnClose
public void onClose(CloseReason reason) {
sink.next("Connection closed: " + reason.getReasonPhrase());
sink.complete();
}
public void sendMessage(String message) throws IOException {
try {
latch.await(); // 等待连接建立
session.getBasicRemote().sendText(message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting for connection", e);
}
}
public void close() throws IOException {
if (session != null) {
session.close();
}
}
}
public static Flux<String> createWebSocketFlux(String url) {
return Flux.create(sink -> {
try {
WebSocketClient client = new WebSocketClient(sink);
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.connectToServer(client, URI.create(url));
// 注册清理回调
sink.onDispose(() -> {
try {
client.close();
} catch (IOException e) {
System.err.println("Error closing WebSocket: " + e.getMessage());
}
});
// 允许从外部发送消息(通过context或side channel)
sink.onRequest(n -> {
// 可以在这里发送初始消息等
if (n > 0) {
try {
client.sendMessage("Hello Server!");
} catch (IOException e) {
sink.error(e);
}
}
});
} catch (Exception e) {
sink.error(e);
}
});
}
public static void main(String[] args) throws InterruptedException {
// 注意:这需要实际的WebSocket服务器才能运行
/*
Flux<String> webSocketFlux = createWebSocketFlux("ws://localhost:8080/ws");
webSocketFlux
.take(10) // 只取前10条消息
.subscribe(
message -> System.out.println("WebSocket: " + message),
error -> System.err.println("WebSocket error: " + error),
() -> System.out.println("WebSocket completed")
);
Thread.sleep(10000);
*/
}
}
11.6 总结
Reactor的create
方法是一个强大而灵活的工具,适用于创建复杂的、异步的数据流。通过本文的详细讲解和丰富示例,我们可以看到:
- 核心特性:异步友好、背压感知、多线程安全
- 基本用法:从简单创建到复杂的异步数据流
- 高级模式:事件监听器桥接、多线程发射、资源管理
- 背压处理:通过
onRequest
回调实现消费者驱动的数据流 - 应用场景:WebSocket、事件系统、异步API桥接等
create
方法特别适合需要从多个线程发射数据或需要与现有异步API集成的场景。它提供了比generate
更强大的控制能力,但也需要开发者承担更多的责任,特别是资源管理和背压处理方面。
通过遵循最佳实践,特别是资源清理、错误处理和背压管理,可以创建出高效、健壮的响应式数据流。create
方法是Reactor高级用法的体现,掌握了它就能够解决复杂的响应式编程挑战
十二、generate 方法
Reactor 的 generate
方法是一个强大的工具,用于创建复杂的、有状态的序列生成器。与 create
方法不同,generate
是同步的、逐个元素生成的,并且可以维护状态。
12.1 generate方法概述
generate
方法有三种重载形式,适用于不同的场景:
- 无状态生成:generate(Consumer<SynchronousSink<T>> generator)
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator) {
Objects.requireNonNull(generator, "generator");
return onAssembly(new FluxGenerate<>(generator));
}
- 有状态生成:generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) {
return onAssembly(new FluxGenerate<>(stateSupplier, generator));
}
- 带状态清理的生成:generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer) {
return onAssembly(new FluxGenerate<>(stateSupplier, generator, stateConsumer));
}
12.2 generate与create的区别
12.3 基本使用示例
12.3.1 无状态generate
// 参数说明
// generator – SynchronousSink 使用 Reactor 提供的每个订阅者,在每次传递中生成 单个 信号
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator) {
Objects.requireNonNull(generator, "generator");
return onAssembly(new FluxGenerate<>(generator));
}
使用示例
package cn.tcmeta.generate;
import reactor.core.publisher.Flux;
/**
* @author: laoren
* @date: 2025/8/27 14:10
* @description: 无状态 generate
* @version: 1.0.0
*/
public class StatelessGenerateExample {
public static void main(String[] args) {
// 1. 无状态generate - 生成5个随机数
Flux<Double> randomNumber = Flux.generate(sink -> {
double n = Math.random();
sink.next(n);
// 模拟条件完成
if (n > 0.8) {
sink.complete();
}
});
// 2. 订阅
randomNumber.subscribe(
System.out::println,
Throwable::printStackTrace,
() -> System.out.println("Done")
);
// 生成固定的元素
Flux<String> fixedCountStr = Flux.generate(sink -> {
// 这种方式无法控制数量,因为无法维护状态
// 需要使用有状态generate
sink.next("Hello 🍋🍋");
// 这里会无限生成1,因为没有终止条件
});
// 只取三个元素即可
fixedCountStr.take(3).subscribe(System.out::println);
}
}
0.23585869887623168
0.07111979487714148
0.16405884193127096
0.4672955468599558
0.1245039338107009
0.3365117658051757
0.41334625983368756
0.7453378381025055
0.9330594906646099
Done
Hello 🍋🍋
Hello 🍋🍋
Hello 🍋🍋
12.3.2 有状态的generate
// 参数说明:
// 1. stateSupplier – 调用每个传入用户为生成器双函数提供初始状态
// 2. generator– 使用 SynchronousSink Reactor 提供的每个订阅者以及当前状态,以在每次传递时生成 单个 信号并返回(新)状态。
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) {
return onAssembly(new FluxGenerate<>(stateSupplier, generator));
}
package cn.tcmeta.generate;
import reactor.core.publisher.Flux;
/**
* @author: laoren
* @date: 2025/8/27 14:26
* @description: 有状态 generate
* @version: 1.0.0
*/
public class StatefulGenerateExample {
public static void main(String[] args) {
// 示例1: 生成斐波那契数列
Flux<Long> fibonacci = Flux.generate(
() -> new Long[]{0L, 1L}, // 初始状态[prev, current]
(state, sink) -> {
long nextValue = state[0];
sink.next(nextValue);
// 更新状态[current, prev + current]
state[0] = state[1];
state[1] = nextValue + state[1];
return state;
}
);
// 取前10个斐波那契数
fibonacci.take(10).subscribe(
value -> System.out.println("Fibonacci: " + value)
);
}
}
有限序列
package cn.tcmeta.generate;
import reactor.core.publisher.Flux;
/**
* @author: laoren
* @description: 生成有限序列
* @version: 1.0.0
*/
public class StatefulGenerateExample2 {
public static void main(String[] args) {
var limitedSequence = Flux.generate(
() -> 1, // 初始状态: 计数器1, 初始值可以是任意类型的值
(state, sink) -> {
if (state <= 5) {
sink.next(state);
return state + 1;
} else {
sink.complete();
return state;
}
}
);
limitedSequence.subscribe(
System.out::println,
error -> System.out.println("Error: " + error),
() -> System.out.println("Sequence completed")
);
}
}
使用对象作为状态
package cn.tcmeta.generate;
import reactor.core.publisher.Flux;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author: laoren
* @date: 2025/8/27 15:05
* @description: TODO
* @version: 1.0.0
*/
public class StatefulGenerateExample3 {
public static void main(String[] args) {
Flux<Object> objectStateExample = Flux.generate(
() -> new AtomicInteger(0), // 初始状态
(state, sink) -> {
int value = state.getAndIncrement();
if (value < 3) {
sink.next(value);
return state;
} else {
sink.complete();
return state;
}
}
);
// 订阅一下
objectStateExample.subscribe(System.out::println);
}
}
12.3.3 带状态清理的generate
package cn.tcmeta.generate;
import reactor.core.publisher.Flux;
import java.io.BufferedReader;
import java.io.StringReader;
import java.util.concurrent.CountDownLatch;
/**
* @author: laoren
* @date: 2025/8/27 15:10
* @description: 带状态清理的generate
* @version: 1.0.0
*/
public class GenerateWithCleanupExample {
public static void main(String[] args) {
String content = "Line 1\nLine 2\nLine 3\nLine 4\nLine 5";
Flux<String> lines = Flux.generate(
// 模拟文件内容
// 状态供应器 - 创建BufferedReader
() -> new BufferedReader(new StringReader(content)),
// 生成器函数 - 读取每一行
(reader, sink) -> {
try {
String line = reader.readLine();
if (line == null) {
sink.complete(); // 文件读取完成
} else {
sink.next(line);
}
} catch (Exception e) {
sink.error(e);
}
return reader;
},
reader -> {
try {
reader.close();
System.out.println("Reader closed");
} catch (Exception e) {
System.out.println("Error closing reader");
}
}
);
// 状态清理器 - 关闭BufferedReader
lines.subscribe(
line -> System.out.println("Read: " + line),
error -> System.err.println("Error: " + error),
() -> System.out.println("File reading completed")
);
}
}
Read: Line 1
Read: Line 2
Read: Line 3
Read: Line 4
Read: Line 5
File reading completed
Reader closed
数据连接清理
package cn.tcmeta.generate;
import reactor.core.publisher.Flux;
/**
* @author: laoren
* @description: 数据库连接清理操作
* @version: 1.0.0
*/
public class GenerateWithCleanupExample2 {
public static void main(String[] args) {
// 另一个示例: 数据库连接清理
Flux<String> dbData = Flux.generate(
() -> {
System.out.println("Creating database connection");
return new DatabaseConnection(3); // 模拟数据库连接
},
(connection, sink) -> {
String data = connection.fetchNext();
if (data != null) {
sink.next(data);
} else {
sink.complete();
}
return connection;
},
connection -> {
System.out.println("Closing database connection");
connection.close();
}
);
dbData.subscribe(
System.out::println,
error -> System.out.println("Error: " + error),
() -> System.out.println("Sequence completed")
);
}
}
// 模拟数据库连接类
class DatabaseConnection {
private int count = 0;
public DatabaseConnection(int count) {
this.count = count;
System.out.println("Database connection created");
}
public String fetchNext() {
if (count < 5) {
return "Record " + (++count);
}
return null;
}
public void close() {
System.out.println("Database connection closed");
}
}
Creating database connection
Database connection created
Record 4
Record 5
Sequence completed
Closing database connection
Database connection closed
12.4 高级用法与模式
12.4.1 错误处理与重试机制
package cn.tcmeta.generate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import java.util.concurrent.atomic.AtomicInteger;
public class GenerateErrorHandling {
public static void main(String[] args) {
// 示例: 带错误处理的generate
Flux<String> withErrorHandling = Flux.generate(
() -> new AtomicInteger(0),
(counter, sink) -> {
int attempt = counter.getAndIncrement();
try {
if (attempt == 2) {
throw new RuntimeException("Simulated error on attempt 2");
}
if (attempt < 5) {
sink.next("Value " + attempt);
} else {
sink.complete();
}
} catch (Exception e) {
// 错误处理策略1: 发射错误并终止
// sink.error(e);
// 错误处理策略2: 跳过错误继续处理
System.err.println("Error on attempt " + attempt + ": " + e.getMessage());
// 不调用sink.error,继续返回状态
}
return counter;
}
);
withErrorHandling.subscribe(
value -> System.out.println("Received: " + value),
error -> System.err.println("Subscription error: " + error),
() -> System.out.println("Completed successfully")
);
System.out.println("----------------------------------------");
// 示例: 带重试机制的generate
Flux<Integer> withRetry = Flux.generate(
() -> new StateWithRetry(0, 0),
(state, sink) -> {
try {
if (state.retryCount > 2) {
sink.error(new RuntimeException("Max retries exceeded"));
return state;
}
// 模拟可能失败的操作
if (state.value == 2 && state.retryCount < 2) {
throw new RuntimeException("Temporary failure");
}
sink.next(state.value);
state.value++;
if (state.value >= 5) {
sink.complete();
}
} catch (Exception e) {
System.out.println("Operation failed, retrying...");
state.retryCount++;
// 不发射元素,保持状态不变
}
return state;
}
);
withRetry.subscribe(
value -> System.out.println("Value: " + value),
error -> System.err.println("Failed: " + error.getMessage()),
() -> System.out.println("With retry completed")
);
}
static class StateWithRetry {
int value;
int retryCount;
StateWithRetry(int value, int retryCount) {
this.value = value;
this.retryCount = retryCount;
}
}
}
12.4.2 背压感知生成
package cn.tcmeta.generate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import java.util.concurrent.atomic.AtomicLong;
public class GenerateBackpressureAware {
public static void main(String[] args) {
// 示例: 背压感知的生成器
Flux<Long> backpressureAware = Flux.generate(
() -> new AtomicLong(0L),
(counter, sink) -> {
long value = counter.getAndIncrement();
// 模拟资源密集型操作
try {
Thread.sleep(100); // 模拟处理延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
sink.error(e);
return counter;
}
sink.next(value);
// 限制生成速率 - 只生成100个元素
if (value >= 100) {
sink.complete();
}
return counter;
}
);
// 使用不同的背压策略订阅
backpressureAware
.onBackpressureBuffer(10) // 缓冲区大小为10
.subscribe(
value -> {
// 模拟慢速消费者
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Processed: " + value);
},
error -> System.err.println("Error: " + error),
() -> System.out.println("Backpressure example completed")
);
// 另一个示例: 有条件生成
Flux<Integer> conditionalGenerate = Flux.generate(
() -> 0,
(state, sink) -> {
// 只有在有需求时才生成元素
// 注意: generate是同步的,但我们可以模拟背压感知
if (state < 20) {
sink.next(state);
return state + 1;
} else {
sink.complete();
return state;
}
}
);
// 使用limitRate控制消费速率
conditionalGenerate
.limitRate(5) // 每批请求5个元素
.subscribe(
value -> {
System.out.println("Conditional: " + value);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
12.4.3 复杂状态管理
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import java.util.*;
public class ComplexStateGenerate {
public static void main(String[] args) {
// 示例: 分页数据获取
Flux<List<String>> pagedData = Flux.generate(
() -> new PagingState(0, 3), // 初始状态: 页码0, 每页3条
(state, sink) -> {
System.out.println("Fetching page " + state.page);
// 模拟分页API调用
List<String> pageData = fetchPageData(state.page, state.pageSize);
if (pageData.isEmpty()) {
sink.complete(); // 没有更多数据
} else {
sink.next(pageData);
state.page++; // 下一页
}
return state;
},
state -> System.out.println("Paging completed at page " + state.page)
);
pagedData.subscribe(
page -> System.out.println("Received page: " + page),
error -> System.err.println("Error: " + error),
() -> System.out.println("All pages fetched")
);
// 示例: 状态机实现
Flux<String> stateMachine = Flux.generate(
() -> new TrafficLightState("RED"),
(state, sink) -> {
sink.next(state.color);
try {
Thread.sleep(state.duration); // 模拟状态持续时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
sink.error(e);
return state;
}
// 状态转换
switch (state.color) {
case "RED":
state.color = "GREEN";
state.duration = 5000;
break;
case "GREEN":
state.color = "YELLOW";
state.duration = 2000;
break;
case "YELLOW":
state.color = "RED";
state.duration = 3000;
break;
}
return state;
}
);
// 取10个状态变化
stateMachine.take(10).subscribe(
color -> System.out.println("Traffic light: " + color)
);
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 模拟分页数据获取
private static List<String> fetchPageData(int page, int pageSize) {
// 模拟数据源
List<String> allData = Arrays.asList(
"A1", "A2", "A3", "B1", "B2", "B3", "C1", "C2", "C3", "D1"
);
int start = page * pageSize;
if (start >= allData.size()) {
return Collections.emptyList();
}
int end = Math.min(start + pageSize, allData.size());
return allData.subList(start, end);
}
// 分页状态类
static class PagingState {
int page;
int pageSize;
PagingState(int page, int pageSize) {
this.page = page;
this.pageSize = pageSize;
}
}
// 交通灯状态类
static class TrafficLightState {
String color;
long duration;
TrafficLightState(String color) {
this.color = color;
// 设置初始持续时间
this.duration = color.equals("RED") ? 3000 :
color.equals("GREEN") ? 5000 : 2000;
}
}
}
12.5 generate工作原理
12.5.1 generate执行流程
12.5.2 generate 状态管理
12.6 应用场景与最佳实践
12.6.1 适用场景
- 分页数据获取:从数据库或API分页获取数据
- 状态机实现:如工作流、协议处理等
- 资源迭代:文件读取、数据库游标处理等
- 序列生成:数学序列(斐波那契、素数等)
- 定时器/计数器:基于状态的定时事件生成
12.6.2 最佳实践
- 状态设计:
- 使用不可变状态或防御性拷贝避免意外修改
- 确保状态对象是线程安全的(尽管generate是同步的)
- 错误处理:
- 在生成器内部处理可恢复错误
- 使用sink.error()处理不可恢复错误
- 资源管理:
- 总是提供状态清理器来释放资源
- 确保在错误情况下也能清理资源
- 背压考虑:
- generate是同步的,但应考虑消费者的处理能力
- 避免在生成器中执行耗时操作
- 测试策略:
- 使用StepVerifier测试generate生成的流
- 验证状态转换和清理逻辑
12.6.3 文件读取器
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public class FileReaderExample {
public static Flux<String> readFileLines(String filePath) {
return Flux.generate(
() -> {
try {
Path path = Paths.get(filePath);
return Files.newBufferedReader(path);
} catch (IOException e) {
throw new RuntimeException("Failed to open file: " + filePath, e);
}
},
(reader, sink) -> {
try {
String line = reader.readLine();
if (line != null) {
sink.next(line);
} else {
sink.complete();
}
} catch (IOException e) {
sink.error(new RuntimeException("Error reading file", e));
}
return reader;
},
reader -> {
try {
reader.close();
System.out.println("File reader closed");
} catch (IOException e) {
System.err.println("Error closing file reader: " + e.getMessage());
}
}
);
}
public static void main(String[] args) {
// 创建一个临时文件用于演示
Path tempFile;
try {
tempFile = Files.createTempFile("reactor-generate", ".txt");
Files.write(tempFile,
Arrays.asList("Line 1", "Line 2", "Line 3", "Line 4", "Line 5"));
// 读取文件
readFileLines(tempFile.toString())
.subscribe(
line -> System.out.println("Read: " + line),
error -> System.err.println("Error: " + error.getMessage()),
() -> System.out.println("File reading completed")
);
// 删除临时文件
Files.deleteIfExists(tempFile);
} catch (IOException e) {
e.printStackTrace();
}
}
}
12.7 总结
Reactor的generate
方法是一个强大而灵活的工具,适用于创建有状态的序列生成器。
- 三种形式:无状态、有状态和带清理的有状态generate
- 状态管理:如何设计和管理生成器状态
- 错误处理:在生成器中处理异常的策略
- 资源清理:确保资源正确释放的重要性
- 应用场景:分页数据、状态机、文件读取等实用案例
generate
方法特别适合需要维护状态 between元素生成的场景,它提供了比create
更简单、更可控的同步生成方式。通过合理设计状态对象和生成器逻辑,可以创建出高效、可靠的响应式数据流。
记住最佳实践,特别是在资源管理和错误处理方面,可以确保使用generate
创建的流既高效又健壮