响应式编程框架Reactor【7】

发布于:2025-09-01 ⋅ 阅读:(14) ⋅ 点赞:(0)

十一、create方法

Reactor 的 create 方法是一个高级的、灵活的 Flux 创建方法,它提供了对数据发射的完全控制能力。与 generate 方法不同,create 方法是异步友好的,并且可以处理多线程场景。

11.1 create方法概述

create 方法的主要特点:

  • 异步友好:可以在多个线程中安全地发射数据
  • 背压感知:可以响应消费者的背压请求
  • 灵活控制:提供完整的 FluxSink API 来控制数据流
  • 多线程安全:支持从多个线程并发发射数据

create 与 generate 的区别

选择创建方法
需要多线程/异步发射?
使用 create
需要维护状态?
使用 generate
考虑 just/fromIterable 等
异步事件驱动
同步状态生成

11.2 基本用法示例

11.2.1 简单的create示例

import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class BasicCreateExample {
    
    public static void main(String[] args) throws InterruptedException {
        // 基本create用法
        Flux<String> flux = Flux.create(sink -> {
            // 同步发射一些数据
            sink.next("Hello");
            sink.next("World");
            
            // 可以发射完成信号
            sink.complete();
            
            // 或者发射错误信号
            // sink.error(new RuntimeException("Something went wrong"));
        });
        
        flux.subscribe(
            value -> System.out.println("Received: " + value),
            error -> System.err.println("Error: " + error.getMessage()),
            () -> System.out.println("Completed")
        );
        
        // 异步create示例
        Flux<Integer> asyncFlux = Flux.create(sink -> {
            // 在另一个线程中发射数据
            Executors.newSingleThreadExecutor().submit(() -> {
                for (int i = 1; i <= 5; i++) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                        sink.next(i);
                    } catch (InterruptedException e) {
                        sink.error(e);
                        return;
                    }
                }
                sink.complete();
            });
        });
        
        asyncFlux.subscribe(
            value -> System.out.println("Async value: " + value),
            error -> System.err.println("Async error: " + error),
            () -> System.out.println("Async completed")
        );
        
        // 等待异步操作完成
        TimeUnit.SECONDS.sleep(1);
    }
}

11.2.2 背压处理示例

import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.atomic.AtomicLong;

public class BackpressureCreateExample {
    
    public static void main(String[] args) throws InterruptedException {
        // 创建支持背压的Flux
        Flux<Long> flux = Flux.create(sink -> {
            AtomicLong counter = new AtomicLong();
            
            // 注册背压请求处理器
            sink.onRequest(n -> {
                System.out.println("Requested: " + n + " elements");
                
                // 发射请求数量的元素
                for (long i = 0; i < n; i++) {
                    long value = counter.getAndIncrement();
                    if (value < 20) { // 限制总数
                        sink.next(value);
                    } else {
                        sink.complete();
                        break;
                    }
                }
            });
            
            // 注册取消处理器
            sink.onCancel(() -> {
                System.out.println("Subscription cancelled");
            });
            
            // 注册处理完成处理器
            sink.onDispose(() -> {
                System.out.println("Flux disposed");
            });
        });
        
        // 使用不同的背压策略订阅
        flux
            .limitRate(5) // 每批请求5个元素
            .subscribe(
                value -> {
                    System.out.println("Received: " + value);
                    try {
                        Thread.sleep(50); // 模拟处理延迟
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                },
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Completed")
            );
        
        Thread.sleep(2000);
    }
}

11.3 高级用法与模式

11.3.1 事件监听器模式

import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class EventListenerExample {
    
    // 事件监听器接口
    interface EventListener {
        void onEvent(String event);
        void onError(Throwable error);
        void onComplete();
    }
    
    // 事件源类
    static class EventSource {
        private final List<EventListener> listeners = new CopyOnWriteArrayList<>();
        
        public void addListener(EventListener listener) {
            listeners.add(listener);
        }
        
        public void removeListener(EventListener listener) {
            listeners.remove(listener);
        }
        
        public void fireEvent(String event) {
            for (EventListener listener : listeners) {
                listener.onEvent(event);
            }
        }
        
        public void fireError(Throwable error) {
            for (EventListener listener : listeners) {
                listener.onError(error);
            }
        }
        
        public void fireComplete() {
            for (EventListener listener : listeners) {
                listener.onComplete();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        EventSource eventSource = new EventSource();
        
        // 使用create桥接传统事件监听器到响应式流
        Flux<String> eventFlux = Flux.create(sink -> {
            EventListener listener = new EventListener() {
                @Override
                public void onEvent(String event) {
                    sink.next(event);
                }
                
                @Override
                public void onError(Throwable error) {
                    sink.error(error);
                }
                
                @Override
                public void onComplete() {
                    sink.complete();
                }
            };
            
            // 注册监听器
            eventSource.addListener(listener);
            
            // 当Flux被取消或完成时,移除监听器
            sink.onDispose(() -> {
                eventSource.removeListener(listener);
                System.out.println("Listener removed");
            });
        });
        
        // 订阅事件流
        eventFlux.subscribe(
            event -> System.out.println("Event: " + event),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Event stream completed")
        );
        
        // 模拟事件产生
        for (int i = 1; i <= 5; i++) {
            Thread.sleep(100);
            eventSource.fireEvent("Event " + i);
        }
        
        // 完成事件流
        eventSource.fireComplete();
        
        Thread.sleep(100);
    }
}

11.3.2 多线程发射控制

import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class MultiThreadCreateExample {
    
    public static void main(String[] args) throws InterruptedException {
        // 创建支持多线程发射的Flux
        Flux<Integer> flux = Flux.create(sink -> {
            AtomicInteger counter = new AtomicInteger(1);
            int threadCount = 3;
            
            // 创建多个生产者线程
            for (int i = 0; i < threadCount; i++) {
                final int threadId = i;
                Executors.newSingleThreadExecutor().submit(() -> {
                    try {
                        while (!sink.isCancelled() && counter.get() <= 20) {
                            int value = counter.getAndIncrement();
                            if (value > 20) break;
                            
                            sink.next(threadId * 100 + value);
                            System.out.println("Thread " + threadId + " emitted: " + value);
                            
                            TimeUnit.MILLISECONDS.sleep(50);
                        }
                        
                        // 检查是否所有线程都完成了
                        if (counter.get() > 20) {
                            sink.complete();
                        }
                    } catch (InterruptedException e) {
                        sink.error(e);
                    }
                });
            }
            
            // 处理背压
            sink.onRequest(n -> {
                System.out.println("Backpressure request: " + n);
            });
            
            // 处理取消
            sink.onCancel(() -> {
                System.out.println("Flux cancelled");
            });
        });
        
        // 订阅并限制消费速率
        flux
            .limitRate(2) // 每批请求2个元素
            .subscribe(
                value -> {
                    System.out.println("Consumed: " + value);
                    try {
                        TimeUnit.MILLISECONDS.sleep(100); // 慢速消费者
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                },
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Completed")
            );
        
        Thread.sleep(5000);
    }
}

11.3.3 资源管理与清理

import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ResourceManagementExample {
    
    // 模拟需要管理的资源
    static class DatabaseConnection implements Closeable {
        private boolean connected = true;
        
        public String query(int id) {
            if (!connected) {
                throw new IllegalStateException("Connection closed");
            }
            return "Result for id " + id;
        }
        
        @Override
        public void close() throws IOException {
            connected = false;
            System.out.println("Database connection closed");
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        Flux<String> databaseFlux = Flux.create(sink -> {
            DatabaseConnection connection = new DatabaseConnection();
            
            // 安排定期查询
            var scheduler = Executors.newSingleThreadScheduledExecutor();
            var future = scheduler.scheduleAtFixedRate(() -> {
                try {
                    if (sink.isCancelled()) {
                        scheduler.shutdown();
                        return;
                    }
                    
                    // 模拟查询
                    String result = connection.query((int) (Math.random() * 10));
                    sink.next(result);
                    
                } catch (Exception e) {
                    sink.error(e);
                    scheduler.shutdown();
                }
            }, 0, 100, TimeUnit.MILLISECONDS);
            
            // 注册清理回调
            sink.onDispose(() -> {
                try {
                    future.cancel(true);
                    scheduler.shutdown();
                    connection.close();
                    System.out.println("Resources cleaned up");
                } catch (IOException e) {
                    System.err.println("Error closing connection: " + e.getMessage());
                }
            });
        });
        
        // 订阅并只取前5个结果
        databaseFlux
            .take(5)
            .subscribe(
                result -> System.out.println("Received: " + result),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Query completed")
            );
        
        Thread.sleep(1000);
    }
}

11.4 create工作原理

11.4.1 create 执行流程

Subscriber Flux.create Creator Lambda FluxSink subscribe() accept(FluxSink) 设置回调(onRequest, onCancel等) 可能立即发射数据 异步操作可能在其他线程进行 线程1: next(data1) 线程2: next(data2) par [多线程发射- ] onNext(data) request(n) [背压] 触发onRequest回调 cancel() [取消] 触发onCancel/onDispose回调 Subscriber Flux.create Creator Lambda FluxSink

11.4.2 背压处理机制

在这里插入图片描述

11.5 应用场景与最佳实践

11.5.1 适用场景

  1. 事件驱动架构:将传统事件监听器转换为响应式流
  2. 异步API桥接:包装回调式或Future-based的API
  3. 多数据源聚合:从多个来源聚合数据到一个流中
  4. 自定义背压策略:实现特定的背压处理逻辑
  5. 资源密集型操作:需要精细控制资源生命周期的场景

11.5.2 最佳实践

资源管理

Flux.create(sink -> {
    Resource resource = new Resource();
    
    // 使用资源
    sink.next(resource.getData());
    
    // 注册清理回调
    sink.onDispose(() -> {
        try {
            resource.close();
        } catch (Exception e) {
            // 处理清理异常
        }
    });
});

背压处理

Flux.create(sink -> {
    sink.onRequest(n -> {
        // 按请求数量发射数据
        for (int i = 0; i < n; i++) {
            if (hasMoreData()) {
                sink.next(getNextData());
            } else {
                sink.complete();
                break;
            }
        }
    });
});

错误处理

Flux.create(sink -> {
    try {
        // 可能抛出异常的操作
        processData(sink);
    } catch (Exception e) {
        // 发射错误而不是抛出异常
        sink.error(e);
    }
});

多线程安全

Flux.create(sink -> {
    // 使用线程安全的数据结构
    AtomicInteger counter = new AtomicInteger();
    
    // 在多线程中安全地发射数据
    executor.submit(() -> {
        while (!sink.isCancelled()) {
            int value = counter.incrementAndGet();
            sink.next(value);
        }
    });
});

取消感知

Flux.create(sink -> {
    Runnable task = () -> {
        while (!sink.isCancelled()) {
            // 检查是否已取消
            Data data = getData();
            if (data != null) {
                sink.next(data);
            } else {
                sink.complete();
                break;
            }
        }
    };
    
    Thread thread = new Thread(task);
    thread.start();
    
    // 取消时中断线程
    sink.onDispose(() -> {
        thread.interrupt();
    });
});

11.5.3 完整示例:WebSocket 客户端

import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;

public class WebSocketExample {
    
    @ClientEndpoint
    public static class WebSocketClient {
        private Session session;
        private FluxSink<String> sink;
        private CountDownLatch latch = new CountDownLatch(1);
        
        public WebSocketClient(FluxSink<String> sink) {
            this.sink = sink;
        }
        
        @OnOpen
        public void onOpen(Session session) {
            this.session = session;
            latch.countDown();
            sink.next("Connected to WebSocket");
        }
        
        @OnMessage
        public void onMessage(String message) {
            sink.next("Received: " + message);
        }
        
        @OnError
        public void onError(Throwable error) {
            sink.error(error);
        }
        
        @OnClose
        public void onClose(CloseReason reason) {
            sink.next("Connection closed: " + reason.getReasonPhrase());
            sink.complete();
        }
        
        public void sendMessage(String message) throws IOException {
            try {
                latch.await(); // 等待连接建立
                session.getBasicRemote().sendText(message);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted while waiting for connection", e);
            }
        }
        
        public void close() throws IOException {
            if (session != null) {
                session.close();
            }
        }
    }
    
    public static Flux<String> createWebSocketFlux(String url) {
        return Flux.create(sink -> {
            try {
                WebSocketClient client = new WebSocketClient(sink);
                WebSocketContainer container = ContainerProvider.getWebSocketContainer();
                container.connectToServer(client, URI.create(url));
                
                // 注册清理回调
                sink.onDispose(() -> {
                    try {
                        client.close();
                    } catch (IOException e) {
                        System.err.println("Error closing WebSocket: " + e.getMessage());
                    }
                });
                
                // 允许从外部发送消息(通过context或side channel)
                sink.onRequest(n -> {
                    // 可以在这里发送初始消息等
                    if (n > 0) {
                        try {
                            client.sendMessage("Hello Server!");
                        } catch (IOException e) {
                            sink.error(e);
                        }
                    }
                });
                
            } catch (Exception e) {
                sink.error(e);
            }
        });
    }
    
    public static void main(String[] args) throws InterruptedException {
        // 注意:这需要实际的WebSocket服务器才能运行
        /*
        Flux<String> webSocketFlux = createWebSocketFlux("ws://localhost:8080/ws");
        
        webSocketFlux
            .take(10) // 只取前10条消息
            .subscribe(
                message -> System.out.println("WebSocket: " + message),
                error -> System.err.println("WebSocket error: " + error),
                () -> System.out.println("WebSocket completed")
            );
        
        Thread.sleep(10000);
        */
    }
}

11.6 总结

Reactor的create方法是一个强大而灵活的工具,适用于创建复杂的、异步的数据流。通过本文的详细讲解和丰富示例,我们可以看到:

  1. 核心特性:异步友好、背压感知、多线程安全
  2. 基本用法:从简单创建到复杂的异步数据流
  3. 高级模式:事件监听器桥接、多线程发射、资源管理
  4. 背压处理:通过onRequest回调实现消费者驱动的数据流
  5. 应用场景:WebSocket、事件系统、异步API桥接等

create方法特别适合需要从多个线程发射数据或需要与现有异步API集成的场景。它提供了比generate更强大的控制能力,但也需要开发者承担更多的责任,特别是资源管理和背压处理方面。

通过遵循最佳实践,特别是资源清理、错误处理和背压管理,可以创建出高效、健壮的响应式数据流。create方法是Reactor高级用法的体现,掌握了它就能够解决复杂的响应式编程挑战

十二、generate 方法

Reactor 的 generate 方法是一个强大的工具,用于创建复杂的、有状态的序列生成器。与 create 方法不同,generate 是同步的、逐个元素生成的,并且可以维护状态。

12.1 generate方法概述

generate 方法有三种重载形式,适用于不同的场景:

  1. 无状态生成:generate(Consumer<SynchronousSink<T>> generator)
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator) {
    Objects.requireNonNull(generator, "generator");
    return onAssembly(new FluxGenerate<>(generator));
}
  1. 有状态生成:generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) {
		return onAssembly(new FluxGenerate<>(stateSupplier, generator));
	}
  1. 带状态清理的生成:generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer) {
    return onAssembly(new FluxGenerate<>(stateSupplier, generator, stateConsumer));
}

在这里插入图片描述

12.2 generate与create的区别

选择生成器方法
需要维护状态?
使用 generate
需要异步或多线程?
使用 create
考虑其他方法
状态同步生成
异步事件驱动

12.3 基本使用示例

12.3.1 无状态generate

在这里插入图片描述

// 参数说明
// generator – SynchronousSink 使用 Reactor 提供的每个订阅者,在每次传递中生成 单个 信号
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator) {
    Objects.requireNonNull(generator, "generator");
    return onAssembly(new FluxGenerate<>(generator));
}

使用示例

package cn.tcmeta.generate;

import reactor.core.publisher.Flux;

/**
 * @author: laoren
 * @date: 2025/8/27 14:10
 * @description: 无状态 generate
 * @version: 1.0.0
 */
public class StatelessGenerateExample {
    public static void main(String[] args) {
        // 1. 无状态generate - 生成5个随机数
        Flux<Double> randomNumber = Flux.generate(sink -> {
            double n = Math.random();
            sink.next(n);

            // 模拟条件完成
            if (n > 0.8) {
                sink.complete();
            }
        });

        // 2. 订阅
        randomNumber.subscribe(
                System.out::println,
                Throwable::printStackTrace,
                () -> System.out.println("Done")
        );

        // 生成固定的元素
        Flux<String> fixedCountStr = Flux.generate(sink -> {
            // 这种方式无法控制数量,因为无法维护状态
            // 需要使用有状态generate
            sink.next("Hello 🍋🍋");
            // 这里会无限生成1,因为没有终止条件
        });

        // 只取三个元素即可
        fixedCountStr.take(3).subscribe(System.out::println);
    }
}

0.23585869887623168
0.07111979487714148
0.16405884193127096
0.4672955468599558
0.1245039338107009
0.3365117658051757
0.41334625983368756
0.7453378381025055
0.9330594906646099
Done
Hello 🍋🍋
Hello 🍋🍋
Hello 🍋🍋

12.3.2 有状态的generate

在这里插入图片描述

// 参数说明:
// 1. stateSupplier – 调用每个传入用户为生成器双函数提供初始状态
// 2. generator– 使用 SynchronousSink Reactor 提供的每个订阅者以及当前状态,以在每次传递时生成 单个 信号并返回(新)状态。
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) {
    return onAssembly(new FluxGenerate<>(stateSupplier, generator));
}
package cn.tcmeta.generate;

import reactor.core.publisher.Flux;

/**
 * @author: laoren
 * @date: 2025/8/27 14:26
 * @description: 有状态 generate
 * @version: 1.0.0
 */
public class StatefulGenerateExample {
    public static void main(String[] args) {
        // 示例1: 生成斐波那契数列
        Flux<Long> fibonacci = Flux.generate(
                () -> new Long[]{0L, 1L}, // 初始状态[prev, current]
                (state, sink) -> {
                    long nextValue = state[0];
                    sink.next(nextValue);

                    // 更新状态[current, prev + current]
                    state[0] = state[1];
                    state[1] = nextValue + state[1];
                    return state;
                }
        );

        // 取前10个斐波那契数
        fibonacci.take(10).subscribe(
                value -> System.out.println("Fibonacci: " + value)
        );
    }
}

在这里插入图片描述

有限序列

package cn.tcmeta.generate;

import reactor.core.publisher.Flux;

/**
 * @author: laoren
 * @description: 生成有限序列
 * @version: 1.0.0
 */
public class StatefulGenerateExample2 {
    public static void main(String[] args) {
        var limitedSequence = Flux.generate(
                () -> 1, // 初始状态: 计数器1, 初始值可以是任意类型的值
                (state, sink) -> {
                    if (state <= 5) {
                        sink.next(state);
                        return state + 1;
                    } else {
                        sink.complete();
                        return state;
                    }
                }
        );

        limitedSequence.subscribe(
                System.out::println,
                error -> System.out.println("Error: " + error),
                () -> System.out.println("Sequence completed")
        );
    }
}

在这里插入图片描述

使用对象作为状态

package cn.tcmeta.generate;

import reactor.core.publisher.Flux;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author: laoren
 * @date: 2025/8/27 15:05
 * @description: TODO
 * @version: 1.0.0
 */
public class StatefulGenerateExample3 {
    public static void main(String[] args) {
        Flux<Object> objectStateExample  = Flux.generate(
                () -> new AtomicInteger(0), // 初始状态
                (state, sink) -> {
                    int value = state.getAndIncrement();
                    if (value < 3) {
                        sink.next(value);
                        return state;
                    } else {
                        sink.complete();
                        return state;
                    }
                }
        );

        // 订阅一下
        objectStateExample.subscribe(System.out::println);
    }
}

在这里插入图片描述

12.3.3 带状态清理的generate

package cn.tcmeta.generate;

import reactor.core.publisher.Flux;

import java.io.BufferedReader;
import java.io.StringReader;
import java.util.concurrent.CountDownLatch;

/**
 * @author: laoren
 * @date: 2025/8/27 15:10
 * @description: 带状态清理的generate
 * @version: 1.0.0
 */
public class GenerateWithCleanupExample {
    public static void main(String[] args) {
        String content = "Line 1\nLine 2\nLine 3\nLine 4\nLine 5";

        Flux<String> lines = Flux.generate(
                // 模拟文件内容
                // 状态供应器 - 创建BufferedReader
                () -> new BufferedReader(new StringReader(content)),
                // 生成器函数 - 读取每一行
                (reader, sink) -> {
                    try {
                        String line = reader.readLine();
                        if (line == null) {
                            sink.complete();  // 文件读取完成
                        } else {
                            sink.next(line);
                        }
                    } catch (Exception e) {
                        sink.error(e);
                    }
                    return reader;
                },

                reader -> {
                    try {
                        reader.close();
                        System.out.println("Reader closed");
                    } catch (Exception e) {
                        System.out.println("Error closing reader");
                    }
                }
        );
        // 状态清理器 - 关闭BufferedReader
        lines.subscribe(
                line -> System.out.println("Read: " + line),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("File reading completed")
        );
    }
}

Read: Line 1
Read: Line 2
Read: Line 3
Read: Line 4
Read: Line 5
File reading completed
Reader closed

数据连接清理

package cn.tcmeta.generate;

import reactor.core.publisher.Flux;


/**
 * @author: laoren
 * @description: 数据库连接清理操作
 * @version: 1.0.0
 */
public class GenerateWithCleanupExample2 {
    public static void main(String[] args) {
        // 另一个示例: 数据库连接清理
        Flux<String> dbData = Flux.generate(
                () -> {
                    System.out.println("Creating database connection");
                    return new DatabaseConnection(3); // 模拟数据库连接
                },
                (connection, sink) -> {
                    String data = connection.fetchNext();
                    if (data != null) {
                        sink.next(data);
                    } else {
                        sink.complete();
                    }
                    return connection;
                },
                connection -> {
                    System.out.println("Closing database connection");
                    connection.close();
                }
        );

        dbData.subscribe(
                System.out::println,
                error -> System.out.println("Error: " + error),
                () -> System.out.println("Sequence completed")
        );
    }

}

// 模拟数据库连接类
class DatabaseConnection {
    private int count = 0;

    public DatabaseConnection(int count) {
        this.count = count;
        System.out.println("Database connection created");
    }

    public String fetchNext() {
        if (count < 5) {
            return "Record " + (++count);
        }
        return null;
    }

    public void close() {
        System.out.println("Database connection closed");
    }
}

Creating database connection
Database connection created
Record 4
Record 5
Sequence completed
Closing database connection
Database connection closed

12.4 高级用法与模式

12.4.1 错误处理与重试机制

package cn.tcmeta.generate;

import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;

import java.util.concurrent.atomic.AtomicInteger;

public class GenerateErrorHandling {

    public static void main(String[] args) {
        // 示例: 带错误处理的generate
        Flux<String> withErrorHandling = Flux.generate(
                () -> new AtomicInteger(0),
                (counter, sink) -> {
                    int attempt = counter.getAndIncrement();

                    try {
                        if (attempt == 2) {
                            throw new RuntimeException("Simulated error on attempt 2");
                        }

                        if (attempt < 5) {
                            sink.next("Value " + attempt);
                        } else {
                            sink.complete();
                        }
                    } catch (Exception e) {
                        // 错误处理策略1: 发射错误并终止
                        // sink.error(e);

                        // 错误处理策略2: 跳过错误继续处理
                        System.err.println("Error on attempt " + attempt + ": " + e.getMessage());
                        // 不调用sink.error,继续返回状态
                    }

                    return counter;
                }
        );

        withErrorHandling.subscribe(
                value -> System.out.println("Received: " + value),
                error -> System.err.println("Subscription error: " + error),
                () -> System.out.println("Completed successfully")
        );

        System.out.println("----------------------------------------");
        // 示例: 带重试机制的generate
        Flux<Integer> withRetry = Flux.generate(
                () -> new StateWithRetry(0, 0),
                (state, sink) -> {
                    try {
                        if (state.retryCount > 2) {
                            sink.error(new RuntimeException("Max retries exceeded"));
                            return state;
                        }

                        // 模拟可能失败的操作
                        if (state.value == 2 && state.retryCount < 2) {
                            throw new RuntimeException("Temporary failure");
                        }

                        sink.next(state.value);
                        state.value++;

                        if (state.value >= 5) {
                            sink.complete();
                        }
                    } catch (Exception e) {
                        System.out.println("Operation failed, retrying...");
                        state.retryCount++;
                        // 不发射元素,保持状态不变
                    }

                    return state;
                }
        );

        withRetry.subscribe(
                value -> System.out.println("Value: " + value),
                error -> System.err.println("Failed: " + error.getMessage()),
                () -> System.out.println("With retry completed")
        );
    }

    static class StateWithRetry {
        int value;
        int retryCount;

        StateWithRetry(int value, int retryCount) {
            this.value = value;
            this.retryCount = retryCount;
        }
    }
}

在这里插入图片描述

12.4.2 背压感知生成

package cn.tcmeta.generate;

import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import java.util.concurrent.atomic.AtomicLong;

public class GenerateBackpressureAware {
    
    public static void main(String[] args) {
        // 示例: 背压感知的生成器
        Flux<Long> backpressureAware = Flux.generate(
            () -> new AtomicLong(0L),
            (counter, sink) -> {
                long value = counter.getAndIncrement();
                
                // 模拟资源密集型操作
                try {
                    Thread.sleep(100); // 模拟处理延迟
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    sink.error(e);
                    return counter;
                }
                
                sink.next(value);
                
                // 限制生成速率 - 只生成100个元素
                if (value >= 100) {
                    sink.complete();
                }
                
                return counter;
            }
        );
        
        // 使用不同的背压策略订阅
        backpressureAware
            .onBackpressureBuffer(10) // 缓冲区大小为10
            .subscribe(
                value -> {
                    // 模拟慢速消费者
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    System.out.println("Processed: " + value);
                },
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Backpressure example completed")
            );
        
        // 另一个示例: 有条件生成
        Flux<Integer> conditionalGenerate = Flux.generate(
            () -> 0,
            (state, sink) -> {
                // 只有在有需求时才生成元素
                // 注意: generate是同步的,但我们可以模拟背压感知
                if (state < 20) {
                    sink.next(state);
                    return state + 1;
                } else {
                    sink.complete();
                    return state;
                }
            }
        );
        
        // 使用limitRate控制消费速率
        conditionalGenerate
            .limitRate(5) // 每批请求5个元素
            .subscribe(
                value -> {
                    System.out.println("Conditional: " + value);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            );
        
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

12.4.3 复杂状态管理

import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import java.util.*;

public class ComplexStateGenerate {
    
    public static void main(String[] args) {
        // 示例: 分页数据获取
        Flux<List<String>> pagedData = Flux.generate(
            () -> new PagingState(0, 3), // 初始状态: 页码0, 每页3条
            (state, sink) -> {
                System.out.println("Fetching page " + state.page);
                
                // 模拟分页API调用
                List<String> pageData = fetchPageData(state.page, state.pageSize);
                
                if (pageData.isEmpty()) {
                    sink.complete(); // 没有更多数据
                } else {
                    sink.next(pageData);
                    state.page++; // 下一页
                }
                
                return state;
            },
            state -> System.out.println("Paging completed at page " + state.page)
        );
        
        pagedData.subscribe(
            page -> System.out.println("Received page: " + page),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("All pages fetched")
        );
        
        // 示例: 状态机实现
        Flux<String> stateMachine = Flux.generate(
            () -> new TrafficLightState("RED"),
            (state, sink) -> {
                sink.next(state.color);
                
                try {
                    Thread.sleep(state.duration); // 模拟状态持续时间
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    sink.error(e);
                    return state;
                }
                
                // 状态转换
                switch (state.color) {
                    case "RED":
                        state.color = "GREEN";
                        state.duration = 5000;
                        break;
                    case "GREEN":
                        state.color = "YELLOW";
                        state.duration = 2000;
                        break;
                    case "YELLOW":
                        state.color = "RED";
                        state.duration = 3000;
                        break;
                }
                
                return state;
            }
        );
        
        // 取10个状态变化
        stateMachine.take(10).subscribe(
            color -> System.out.println("Traffic light: " + color)
        );
        
        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 模拟分页数据获取
    private static List<String> fetchPageData(int page, int pageSize) {
        // 模拟数据源
        List<String> allData = Arrays.asList(
            "A1", "A2", "A3", "B1", "B2", "B3", "C1", "C2", "C3", "D1"
        );
        
        int start = page * pageSize;
        if (start >= allData.size()) {
            return Collections.emptyList();
        }
        
        int end = Math.min(start + pageSize, allData.size());
        return allData.subList(start, end);
    }
    
    // 分页状态类
    static class PagingState {
        int page;
        int pageSize;
        
        PagingState(int page, int pageSize) {
            this.page = page;
            this.pageSize = pageSize;
        }
    }
    
    // 交通灯状态类
    static class TrafficLightState {
        String color;
        long duration;
        
        TrafficLightState(String color) {
            this.color = color;
            // 设置初始持续时间
            this.duration = color.equals("RED") ? 3000 : 
                           color.equals("GREEN") ? 5000 : 2000;
        }
    }
}

12.5 generate工作原理

12.5.1 generate执行流程

Subscriber Flux.generate State Supplier Generator State Consumer subscribe() call() 获取初始状态 返回初始状态 apply(state, sink) next(item) 或 complete() 或 error(e) 返回新状态 更新状态 loop [生成元素] 当完成或错误时 accept(state) 清理状态 Subscriber Flux.generate State Supplier Generator State Consumer

12.5.2 generate 状态管理

开始generate
调用状态供应器
获取初始状态
调用生成器函数
生成器操作
发射元素 next
完成 complete
错误 error
返回新状态
调用状态清理器
结束generate

12.6 应用场景与最佳实践

12.6.1 适用场景

  1. 分页数据获取:从数据库或API分页获取数据
  2. 状态机实现:如工作流、协议处理等
  3. 资源迭代:文件读取、数据库游标处理等
  4. 序列生成:数学序列(斐波那契、素数等)
  5. 定时器/计数器:基于状态的定时事件生成

12.6.2 最佳实践

  1. 状态设计
    • 使用不可变状态或防御性拷贝避免意外修改
    • 确保状态对象是线程安全的(尽管generate是同步的)
  2. 错误处理
    • 在生成器内部处理可恢复错误
    • 使用sink.error()处理不可恢复错误
  3. 资源管理
    • 总是提供状态清理器来释放资源
    • 确保在错误情况下也能清理资源
  4. 背压考虑
    • generate是同步的,但应考虑消费者的处理能力
    • 避免在生成器中执行耗时操作
  5. 测试策略
    • 使用StepVerifier测试generate生成的流
    • 验证状态转换和清理逻辑

12.6.3 文件读取器

import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

public class FileReaderExample {
    
    public static Flux<String> readFileLines(String filePath) {
        return Flux.generate(
            () -> {
                try {
                    Path path = Paths.get(filePath);
                    return Files.newBufferedReader(path);
                } catch (IOException e) {
                    throw new RuntimeException("Failed to open file: " + filePath, e);
                }
            },
            (reader, sink) -> {
                try {
                    String line = reader.readLine();
                    if (line != null) {
                        sink.next(line);
                    } else {
                        sink.complete();
                    }
                } catch (IOException e) {
                    sink.error(new RuntimeException("Error reading file", e));
                }
                return reader;
            },
            reader -> {
                try {
                    reader.close();
                    System.out.println("File reader closed");
                } catch (IOException e) {
                    System.err.println("Error closing file reader: " + e.getMessage());
                }
            }
        );
    }
    
    public static void main(String[] args) {
        // 创建一个临时文件用于演示
        Path tempFile;
        try {
            tempFile = Files.createTempFile("reactor-generate", ".txt");
            Files.write(tempFile, 
                Arrays.asList("Line 1", "Line 2", "Line 3", "Line 4", "Line 5"));
            
            // 读取文件
            readFileLines(tempFile.toString())
                .subscribe(
                    line -> System.out.println("Read: " + line),
                    error -> System.err.println("Error: " + error.getMessage()),
                    () -> System.out.println("File reading completed")
                );
                
            // 删除临时文件
            Files.deleteIfExists(tempFile);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

12.7 总结

Reactor的generate方法是一个强大而灵活的工具,适用于创建有状态的序列生成器。

  1. 三种形式:无状态、有状态和带清理的有状态generate
  2. 状态管理:如何设计和管理生成器状态
  3. 错误处理:在生成器中处理异常的策略
  4. 资源清理:确保资源正确释放的重要性
  5. 应用场景:分页数据、状态机、文件读取等实用案例

generate方法特别适合需要维护状态 between元素生成的场景,它提供了比create更简单、更可控的同步生成方式。通过合理设计状态对象和生成器逻辑,可以创建出高效、可靠的响应式数据流。

记住最佳实践,特别是在资源管理和错误处理方面,可以确保使用generate创建的流既高效又健壮


网站公告

今日签到

点亮在社区的每一天
去签到