文章目录
一、响应式流规范背景
为了解决异步系统间的背压(Backpressure)问题(即生产者与消费者的速度不匹配)。
由 Netflix、Lightbend 等公司推动,统一了响应式编程中关于流处理的规范,规范中发布了一组接口,用于实现响应式流。
响应式流规范网址:http://www.reactive-streams.org
响应式流的规范文档:
https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md
maven仓库地址:
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-examples</artifactId>
<version>1.0.3</version>
</dependency>
响应式流规范定义了异步系统中组件间通过背压(Backpressure)进行交互的标准。
其核心目标是为了解决生产者与消费者之间的速率不匹配问题,确保数据流的高效、可控传递。
Java 9 通过 java.util.concurrent.Flow 类提供了对响应式流规范的官方适配。需注意:
Flow API 是规范而非具体实现,其语义与响应式流完全一致。
开发者需依赖第三方库(如 Project Reactor、Akka Streams)获得实际功能支持。
二、响应式流规范接口
规范定义了以下 4 个基础接口:
Publisher
数据流的生产者(数据源):提供一个 subscribe() 方法让消费者注册到生产者并接收数据,
作为生产者与消费者连接的标准化入口点
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Subscriber
数据流的消费者,提供了一个 onSubscribe 方法来通知 Subscriber 订阅成功
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
- onSubscribe: 生产者在开始处理之前调用,并向消费者传递一个 Subscription 对象。
- onNext: 用于通知消费者,生产者发布了新的数据项。
- onError: 用于通知消费者,生产者遇到了异常,不再发布数据事件。
- onComplete: 用于通知消费者,所有的数据事件都已发布完成。
Subscription
代表 订阅关系,用于控制流量,管理 Publisher 和 Subscriber 之间的交互
public interface Subscription {
public void request(long n);
public void cancel();
}
- request: 用于让消费者通知生产者随后需要发布的元素数量,用于控制数据流量(背压管理)
- cancel: 用于让消费者取消生产者随后的事件流,用于取消订阅(释放资源)
Processor
同时扮演 数据生产者(Publisher) 和 数据消费者(Subscriber) 的双重角色,用于在数据流中进行中间处理(如转换、过滤、聚合等)
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
以上所有接口都存在于 org.reactivestreams 包中:
底层机制
Publisher 保证只有在 Subscriber 要求时才发送元素中新的部分。
Publisher 的整体实现既可以采用纯粹的阻塞式等待,也可以采用仅在 Subscriber 请求时才生成数据
的机制。
该规范为我们提供了混合推拉模型,此模型可以对背压进行合理控制。
另外,在某些情况下,可以优先考虑纯推模型。响应式流非常灵活,除动态推拉模型外,该规范还提供了独立的推模型和拉模型。
三、响应式流规范实战
异步生产者
package com.gwx.webflux.demo;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 异步迭代 Publisher 实现,使用指定的 Executor 异步执行,
* 并为指定的 Iterable 生成元素,以 unicast 形式为 Subscriber 提供数据。
*
* <p>注意:本实现包含大量 try-catch 块,用于展示何时可以抛出异常,何时不能抛出异常。</p>
*
* @param <T> 发布的数据类型
*/
public class AsyncIterablePublisher<T> implements Publisher<T> {
// 默认批次大小
private static final int DEFAULT_BATCHSIZE = 1024;
// 数据源或生成器
private final Iterable<T> elements;
// 用于异步执行的线程池
private final Executor executor;
// 单个线程处理的最大元素数量
private final int batchSize;
/**
* 构造 AsyncIterablePublisher 实例
*
* @param elements 元素生成器
* @param executor 线程池
*/
public AsyncIterablePublisher(final Iterable<T> elements, final Executor executor) {
this(elements, DEFAULT_BATCHSIZE, executor);
}
/**
* 构造 AsyncIterablePublisher 实例
*
* @param elements 元素生成器
* @param batchSize 批次大小
* @param executor 线程池
* @throws IllegalArgumentException 如果 batchSize 小于1
* @throws NullPointerException 如果 elements 或 executor 为 null
*/
public AsyncIterablePublisher(final Iterable<T> elements, final int batchSize, final Executor executor) {
if (elements == null) throw new NullPointerException("elements cannot be null");
if (executor == null) throw new NullPointerException("executor cannot be null");
if (batchSize < 1) throw new IllegalArgumentException("batchSize must be greater than zero");
this.elements = elements;
this.executor = executor;
this.batchSize = batchSize;
}
@Override
public void subscribe(final Subscriber<? super T> subscriber) {
new SubscriptionImpl(subscriber).init();
}
// ========== 内部类和接口 ==========
/**
* 信号接口,用于订阅者和发布者之间的通信
*/
private interface Signal {
}
/**
* 取消订阅信号
*/
private enum Cancel implements Signal {INSTANCE}
/**
* 订阅信号
*/
private enum Subscribe implements Signal {INSTANCE}
/**
* 发送数据信号
*/
private enum Send implements Signal {INSTANCE}
/**
* 请求数据信号
*/
private static final class Request implements Signal {
final long n;
Request(final long n) {
this.n = n;
}
}
/**
* Subscription 实现类,处理订阅关系和数据流控制
*/
private final class SubscriptionImpl implements Subscription, Runnable {
// 订阅者引用
final Subscriber<? super T> subscriber;
// 订阅是否已取消
private volatile boolean cancelled = false;
// 未处理的请求数量
private long demand = 0;
// 数据迭代器
private Iterator<T> iterator;
// 入站信号队列
private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<>();
// 防止并发执行的标志
private final AtomicBoolean on = new AtomicBoolean(false);
/**
* 创建 Subscription 实例
*
* @param subscriber 订阅者
* @throws NullPointerException 如果 subscriber 为 null
*/
SubscriptionImpl(final Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException("subscriber cannot be null");
this.subscriber = subscriber;
}
// ========== 核心方法 ==========
/**
* 处理数据请求
*
* @param n 请求的元素数量
* @throws IllegalArgumentException 如果 n < 1
*/
private void doRequest(final long n) {
if (n < 1) {
terminateDueTo(new IllegalArgumentException(
subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
return;
}
// 处理 long 溢出情况
if (demand + n < 1) {
demand = Long.MAX_VALUE;
} else {
demand += n;
}
doSend();
}
/**
* 取消订阅
*/
private void doCancel() {
cancelled = true;
}
/**
* 执行订阅逻辑
*/
private void doSubscribe() {
try {
iterator = elements.iterator();
if (iterator == null) {
iterator = Collections.<T>emptyList().iterator();
}
} catch (final Throwable t) {
// 获取迭代器失败,发送错误信号
subscriber.onSubscribe(new Subscription() {
@Override
public void cancel() {
}
@Override
public void request(long n) {
}
});
terminateDueTo(t);
return;
}
if (!cancelled) {
try {
subscriber.onSubscribe(this);
} catch (final Throwable t) {
terminateDueTo(new IllegalStateException(
subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t));
return;
}
boolean hasElements;
try {
hasElements = iterator.hasNext();
} catch (final Throwable t) {
terminateDueTo(t);
return;
}
if (!hasElements) {
try {
doCancel();
subscriber.onComplete();
} catch (final Throwable t) {
// 记录 onComplete 抛出的异常
new IllegalStateException(
subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)
.printStackTrace(System.err);
}
}
}
}
/**
* 发送数据给订阅者
*/
private void doSend() {
try {
int leftInBatch = batchSize;
do {
T next;
boolean hasNext;
try {
next = iterator.next();
hasNext = iterator.hasNext();
} catch (final Throwable t) {
terminateDueTo(t);
return;
}
subscriber.onNext(next);
if (!hasNext) {
doCancel();
subscriber.onComplete();
return;
}
} while (!cancelled && --leftInBatch > 0 && --demand > 0);
if (!cancelled && demand > 0) {
signal(Send.INSTANCE);
}
} catch (final Throwable t) {
doCancel();
new IllegalStateException(
subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext or onComplete.", t)
.printStackTrace(System.err);
}
}
/**
* 终止订阅并发送错误信号
*
* @param t 错误原因
*/
private void terminateDueTo(final Throwable t) {
cancelled = true;
try {
subscriber.onError(t);
} catch (final Throwable t2) {
new IllegalStateException(
subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)
.printStackTrace(System.err);
}
}
// ========== 信号处理 ==========
/**
* 发送信号到队列并尝试调度执行
*
* @param signal 要发送的信号
*/
private void signal(final Signal signal) {
if (inboundSignals.offer(signal)) {
tryScheduleToExecute();
}
}
/**
* 尝试调度任务到线程池执行
*/
private void tryScheduleToExecute() {
if (on.compareAndSet(false, true)) {
try {
executor.execute(this);
} catch (Throwable t) {
if (!cancelled) {
doCancel();
try {
terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
} finally {
inboundSignals.clear();
on.set(false);
}
}
}
}
}
@Override
public void run() {
if (on.get()) {
try {
final Signal s = inboundSignals.poll();
if (!cancelled) {
if (s instanceof Request) {
doRequest(((Request) s).n);
} else if (s == Send.INSTANCE) {
doSend();
} else if (s == Cancel.INSTANCE) {
doCancel();
} else if (s == Subscribe.INSTANCE) {
doSubscribe();
}
}
} finally {
on.set(false);
if (!inboundSignals.isEmpty()) {
tryScheduleToExecute();
}
}
}
}
// ========== Subscription 接口实现 ==========
@Override
public void request(final long n) {
signal(new Request(n));
}
@Override
public void cancel() {
signal(Cancel.INSTANCE);
}
/**
* 初始化订阅,发送订阅信号
*/
void init() {
signal(Subscribe.INSTANCE);
}
}
}
异步消费者
package com.gwx.webflux.demo;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 基于 Executor 异步运行的订阅者实现,一次请求一个元素,
* 然后对每个元素调用用户定义的方法进行处理。
*
* <p>注意:本类包含大量 try-catch 块,用于说明何时可以抛出异常,何时不能抛出异常。</p>
*
* @param <T> 订阅的数据类型
*/
public abstract class AsyncSubscriber<T> implements Subscriber<T>, Runnable {
// ========== 内部信号定义 ==========
/**
* 发布者和订阅者之间的异步协议信号接口
*/
private interface Signal {
}
/**
* 数据流完成信号
*/
private enum OnComplete implements Signal {INSTANCE}
/**
* 错误信号
*/
private static class OnError implements Signal {
final Throwable error;
OnError(Throwable error) {
this.error = error;
}
}
/**
* 数据项信号
*/
private static class OnNext<T> implements Signal {
final T next;
OnNext(T next) {
this.next = next;
}
}
/**
* 订阅成功信号
*/
private static class OnSubscribe implements Signal {
final Subscription subscription;
OnSubscribe(Subscription subscription) {
this.subscription = subscription;
}
}
// ========== 成员变量 ==========
private Subscription subscription; // 订阅关系
private boolean done; // 是否已完成处理
private final Executor executor; // 异步执行线程池
// 入站信号队列
private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<>();
// 防止并发执行的标志
private final AtomicBoolean on = new AtomicBoolean(false);
// ========== 构造方法 ==========
/**
* 创建 AsyncSubscriber 实例
*
* @param executor 用于异步处理的线程池
* @throws NullPointerException 如果 executor 为 null
*/
protected AsyncSubscriber(Executor executor) {
if (executor == null) throw new NullPointerException("executor cannot be null");
this.executor = executor;
}
// ========== 抽象方法 ==========
/**
* 处理下一个元素
*
* @param element 接收到的元素
* @return 是否还需要更多元素
*/
protected abstract boolean whenNext(T element);
/**
* 处理完成信号
*/
protected void whenComplete() {
}
/**
* 处理错误信号
*
* @param error 错误原因
*/
protected void whenError(Throwable error) {
}
// ========== 核心方法 ==========
/**
* 标记订阅者已完成处理
*/
private void done() {
done = true;
if (subscription != null) {
try {
subscription.cancel();
} catch (Throwable t) {
new IllegalStateException(
subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)
.printStackTrace(System.err);
}
}
}
/**
* 处理订阅信号
*
* @param s 订阅关系
*/
private void handleOnSubscribe(Subscription s) {
if (s == null) {
// 忽略 null Subscription
return;
}
if (subscription != null) {
// 重复订阅,取消新订阅
try {
s.cancel();
} catch (Throwable t) {
new IllegalStateException(
s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)
.printStackTrace(System.err);
}
} else {
subscription = s;
try {
// 每次只请求一个元素
s.request(1);
} catch (Throwable t) {
new IllegalStateException(
s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)
.printStackTrace(System.err);
}
}
}
/**
* 处理数据信号
*
* @param element 接收到的元素
*/
private void handleOnNext(T element) {
if (done) return;
if (subscription == null) {
new IllegalStateException(
"Violated Reactive Streams rule 1.09 and 2.1 by signalling OnNext before Subscription.request")
.printStackTrace(System.err);
return;
}
try {
if (whenNext(element)) {
try {
// 处理成功后请求下一个元素
subscription.request(1);
} catch (Throwable t) {
new IllegalStateException(
subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)
.printStackTrace(System.err);
}
} else {
done();
}
} catch (Throwable t) {
done();
try {
onError(t);
} catch (Throwable t2) {
new IllegalStateException(
this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)
.printStackTrace(System.err);
}
}
}
/**
* 处理完成信号
*/
private void handleOnComplete() {
if (subscription == null) {
new IllegalStateException(
"Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")
.printStackTrace(System.err);
return;
}
done = true;
whenComplete();
}
/**
* 处理错误信号
*
* @param error 错误原因
*/
private void handleOnError(Throwable error) {
if (subscription == null) {
new IllegalStateException(
"Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")
.printStackTrace(System.err);
return;
}
done = true;
whenError(error);
}
// ========== Subscriber 接口实现 ==========
@Override
public final void onSubscribe(Subscription s) {
if (s == null) throw new NullPointerException("Subscription cannot be null");
signal(new OnSubscribe(s));
}
@Override
public final void onNext(T element) {
if (element == null) throw new NullPointerException("Element cannot be null");
signal(new OnNext<>(element));
}
@Override
public final void onError(Throwable t) {
if (t == null) throw new NullPointerException("Throwable cannot be null");
signal(new OnError(t));
}
@Override
public final void onComplete() {
signal(OnComplete.INSTANCE);
}
// ========== Runnable 接口实现 ==========
@Override
public final void run() {
if (on.get()) {
try {
Signal s = inboundSignals.poll();
if (!done) {
if (s instanceof OnNext) {
handleOnNext(((OnNext<T>) s).next);
} else if (s instanceof OnSubscribe) {
handleOnSubscribe(((OnSubscribe) s).subscription);
} else if (s instanceof OnError) {
handleOnError(((OnError) s).error);
} else if (s == OnComplete.INSTANCE) {
handleOnComplete();
}
}
} finally {
on.set(false);
if (!inboundSignals.isEmpty()) {
tryScheduleToExecute();
}
}
}
}
// ========== 信号处理 ==========
/**
* 发送信号到队列
*
* @param signal 要发送的信号
*/
private void signal(Signal signal) {
if (inboundSignals.offer(signal)) {
tryScheduleToExecute();
}
}
/**
* 尝试调度任务到线程池执行
*/
private void tryScheduleToExecute() {
if (on.compareAndSet(false, true)) {
try {
executor.execute(this);
} catch (Throwable t) {
if (!done) {
try {
done();
} finally {
inboundSignals.clear();
on.set(false);
}
}
}
}
}
}
测试类
public class ReactiveTest {
public static void main(String[] args) {
// 测试1:基本功能测试
System.out.println("=== 测试1:基本功能测试 ===");
testBasicFunctionality();
// 测试2:异常处理测试
System.out.println("\n=== 测试2:异常处理测试 ===");
testErrorHandling();
// 测试3:大数据量测试
System.out.println("\n=== 测试3:大数据量测试 ===");
testLargeDataSet();
}
private static void testBasicFunctionality() {
Set<Integer> elements = new HashSet<>();
for (int i = 1; i <= 5; i++) {
elements.add(i);
}
ExecutorService executor = Executors.newFixedThreadPool(3);
AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executor);
publisher.subscribe(new AsyncSubscriber<Integer>(Executors.newSingleThreadExecutor()) {
@Override
protected boolean whenNext(Integer element) {
System.out.println("处理元素: " + element);
return true;
}
@Override
protected void whenComplete() {
System.out.println("所有元素处理完成");
executor.shutdown();
}
});
}
private static void testErrorHandling() {
Set<Integer> elements = new HashSet<>();
elements.add(1);
elements.add(2);
elements.add(3);
ExecutorService executor = Executors.newFixedThreadPool(2);
AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executor);
publisher.subscribe(new AsyncSubscriber<Integer>(Executors.newSingleThreadExecutor()) {
@Override
protected boolean whenNext(Integer element) {
System.out.println("处理元素: " + element);
if (element == 2) {
throw new RuntimeException("模拟处理元素2时出错");
}
return true;
}
@Override
protected void whenError(Throwable error) {
System.err.println("发生错误: " + error.getMessage());
executor.shutdown();
}
});
}
private static void testLargeDataSet() {
Set<Integer> elements = new HashSet<>();
for (int i = 1; i <= 1000; i++) {
elements.add(i);
}
ExecutorService executor = Executors.newFixedThreadPool(10);
AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executor);
long startTime = System.currentTimeMillis();
publisher.subscribe(new AsyncSubscriber<Integer>(Executors.newFixedThreadPool(4)) {
private int count = 0;
@Override
protected boolean whenNext(Integer element) {
count++;
if (count % 100 == 0) {
System.out.println("已处理 " + count + " 个元素");
}
return true;
}
@Override
protected void whenComplete() {
long duration = System.currentTimeMillis() - startTime;
System.out.println("处理完成,共处理 " + count + " 个元素,耗时 " + duration + " 毫秒");
executor.shutdown();
}
});
}
}