【Spring WebFlux】 三、响应式流规范与实战

发布于:2025-07-29 ⋅ 阅读:(21) ⋅ 点赞:(0)

一、响应式流规范背景

为了解决异步系统间的背压(Backpressure)问题(即生产者与消费者的速度不匹配)。

由 Netflix、Lightbend 等公司推动,统一了响应式编程中关于流处理的规范,规范中发布了一组接口,用于实现响应式流。

响应式流规范网址:http://www.reactive-streams.org

响应式流的规范文档
https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md

maven仓库地址:

<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams-tck</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams-tck-flow</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams-examples</artifactId>
    <version>1.0.3</version>
</dependency>

响应式流规范定义了异步系统中组件间通过背压(Backpressure)进行交互的标准。

其核心目标是为了解决生产者与消费者之间的速率不匹配问题,确保数据流的高效、可控传递。

Java 9 通过 java.util.concurrent.Flow 类提供了对响应式流规范的官方适配。需注意:

Flow API 是规范而非具体实现,其语义与响应式流完全一致。

开发者需依赖第三方库(如 Project Reactor、Akka Streams)获得实际功能支持。

二、响应式流规范接口

规范定义了以下 4 个基础接口:

Publisher

数据流的生产者(数据源):提供一个 subscribe() 方法让消费者注册到生产者并接收数据,
作为生产者与消费者连接的标准化入口点

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Subscriber

数据流的消费者,提供了一个 onSubscribe 方法来通知 Subscriber 订阅成功

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);

    public void onNext(T t);

    public void onError(Throwable t);

    public void onComplete();
}
  • onSubscribe: 生产者在开始处理之前调用,并向消费者传递一个 Subscription 对象。
  • onNext: 用于通知消费者,生产者发布了新的数据项。
  • onError: 用于通知消费者,生产者遇到了异常,不再发布数据事件。
  • onComplete: 用于通知消费者,所有的数据事件都已发布完成。

Subscription

代表 订阅关系,用于控制流量,管理 Publisher 和 Subscriber 之间的交互

public interface Subscription {
    public void request(long n);
    public void cancel();
}
  • request: 用于让消费者通知生产者随后需要发布的元素数量,用于控制数据流量(背压管理)
  • cancel: 用于让消费者取消生产者随后的事件流,用于取消订阅(释放资源)

Processor

同时扮演 数据生产者(Publisher) 和 数据消费者(Subscriber) 的双重角色,用于在数据流中进行中间处理(如转换、过滤、聚合等)

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }

以上所有接口都存在于 org.reactivestreams 包中:

底层机制

在这里插入图片描述
Publisher 保证只有在 Subscriber 要求时才发送元素中新的部分。

Publisher 的整体实现既可以采用纯粹的阻塞式等待,也可以采用仅在 Subscriber 请求时才生成数据
的机制。

该规范为我们提供了混合推拉模型,此模型可以对背压进行合理控制。

另外,在某些情况下,可以优先考虑纯推模型。响应式流非常灵活,除动态推拉模型外,该规范还提供了独立的推模型和拉模型。

三、响应式流规范实战

异步生产者

package com.gwx.webflux.demo;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 异步迭代 Publisher 实现,使用指定的 Executor 异步执行,
 * 并为指定的 Iterable 生成元素,以 unicast 形式为 Subscriber 提供数据。
 *
 * <p>注意:本实现包含大量 try-catch 块,用于展示何时可以抛出异常,何时不能抛出异常。</p>
 *
 * @param <T> 发布的数据类型
 */
public class AsyncIterablePublisher<T> implements Publisher<T> {
    // 默认批次大小
    private static final int DEFAULT_BATCHSIZE = 1024;

    // 数据源或生成器
    private final Iterable<T> elements;

    // 用于异步执行的线程池
    private final Executor executor;

    // 单个线程处理的最大元素数量
    private final int batchSize;

    /**
     * 构造 AsyncIterablePublisher 实例
     *
     * @param elements 元素生成器
     * @param executor 线程池
     */
    public AsyncIterablePublisher(final Iterable<T> elements, final Executor executor) {
        this(elements, DEFAULT_BATCHSIZE, executor);
    }

    /**
     * 构造 AsyncIterablePublisher 实例
     *
     * @param elements  元素生成器
     * @param batchSize 批次大小
     * @param executor  线程池
     * @throws IllegalArgumentException 如果 batchSize 小于1
     * @throws NullPointerException     如果 elements 或 executor 为 null
     */
    public AsyncIterablePublisher(final Iterable<T> elements, final int batchSize, final Executor executor) {
        if (elements == null) throw new NullPointerException("elements cannot be null");
        if (executor == null) throw new NullPointerException("executor cannot be null");
        if (batchSize < 1) throw new IllegalArgumentException("batchSize must be greater than zero");

        this.elements = elements;
        this.executor = executor;
        this.batchSize = batchSize;
    }

    @Override
    public void subscribe(final Subscriber<? super T> subscriber) {
        new SubscriptionImpl(subscriber).init();
    }

    // ========== 内部类和接口 ==========

    /**
     * 信号接口,用于订阅者和发布者之间的通信
     */
    private interface Signal {
    }

    /**
     * 取消订阅信号
     */
    private enum Cancel implements Signal {INSTANCE}

    /**
     * 订阅信号
     */
    private enum Subscribe implements Signal {INSTANCE}

    /**
     * 发送数据信号
     */
    private enum Send implements Signal {INSTANCE}

    /**
     * 请求数据信号
     */
    private static final class Request implements Signal {
        final long n;

        Request(final long n) {
            this.n = n;
        }
    }

    /**
     * Subscription 实现类,处理订阅关系和数据流控制
     */
    private final class SubscriptionImpl implements Subscription, Runnable {
        // 订阅者引用
        final Subscriber<? super T> subscriber;

        // 订阅是否已取消
        private volatile boolean cancelled = false;

        // 未处理的请求数量
        private long demand = 0;

        // 数据迭代器
        private Iterator<T> iterator;

        // 入站信号队列
        private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<>();

        // 防止并发执行的标志
        private final AtomicBoolean on = new AtomicBoolean(false);

        /**
         * 创建 Subscription 实例
         *
         * @param subscriber 订阅者
         * @throws NullPointerException 如果 subscriber 为 null
         */
        SubscriptionImpl(final Subscriber<? super T> subscriber) {
            if (subscriber == null) throw new NullPointerException("subscriber cannot be null");
            this.subscriber = subscriber;
        }

        // ========== 核心方法 ==========

        /**
         * 处理数据请求
         *
         * @param n 请求的元素数量
         * @throws IllegalArgumentException 如果 n < 1
         */
        private void doRequest(final long n) {
            if (n < 1) {
                terminateDueTo(new IllegalArgumentException(
                        subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
                return;
            }

            // 处理 long 溢出情况
            if (demand + n < 1) {
                demand = Long.MAX_VALUE;
            } else {
                demand += n;
            }

            doSend();
        }

        /**
         * 取消订阅
         */
        private void doCancel() {
            cancelled = true;
        }

        /**
         * 执行订阅逻辑
         */
        private void doSubscribe() {
            try {
                iterator = elements.iterator();
                if (iterator == null) {
                    iterator = Collections.<T>emptyList().iterator();
                }
            } catch (final Throwable t) {
                // 获取迭代器失败,发送错误信号
                subscriber.onSubscribe(new Subscription() {
                    @Override
                    public void cancel() {
                    }

                    @Override
                    public void request(long n) {
                    }
                });
                terminateDueTo(t);
                return;
            }

            if (!cancelled) {
                try {
                    subscriber.onSubscribe(this);
                } catch (final Throwable t) {
                    terminateDueTo(new IllegalStateException(
                            subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t));
                    return;
                }

                boolean hasElements;
                try {
                    hasElements = iterator.hasNext();
                } catch (final Throwable t) {
                    terminateDueTo(t);
                    return;
                }

                if (!hasElements) {
                    try {
                        doCancel();
                        subscriber.onComplete();
                    } catch (final Throwable t) {
                        // 记录 onComplete 抛出的异常
                        new IllegalStateException(
                                subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)
                                .printStackTrace(System.err);
                    }
                }
            }
        }

        /**
         * 发送数据给订阅者
         */
        private void doSend() {
            try {
                int leftInBatch = batchSize;
                do {
                    T next;
                    boolean hasNext;

                    try {
                        next = iterator.next();
                        hasNext = iterator.hasNext();
                    } catch (final Throwable t) {
                        terminateDueTo(t);
                        return;
                    }

                    subscriber.onNext(next);

                    if (!hasNext) {
                        doCancel();
                        subscriber.onComplete();
                        return;
                    }
                } while (!cancelled && --leftInBatch > 0 && --demand > 0);

                if (!cancelled && demand > 0) {
                    signal(Send.INSTANCE);
                }
            } catch (final Throwable t) {
                doCancel();
                new IllegalStateException(
                        subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext or onComplete.", t)
                        .printStackTrace(System.err);
            }
        }

        /**
         * 终止订阅并发送错误信号
         *
         * @param t 错误原因
         */
        private void terminateDueTo(final Throwable t) {
            cancelled = true;
            try {
                subscriber.onError(t);
            } catch (final Throwable t2) {
                new IllegalStateException(
                        subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)
                        .printStackTrace(System.err);
            }
        }

        // ========== 信号处理 ==========

        /**
         * 发送信号到队列并尝试调度执行
         *
         * @param signal 要发送的信号
         */
        private void signal(final Signal signal) {
            if (inboundSignals.offer(signal)) {
                tryScheduleToExecute();
            }
        }

        /**
         * 尝试调度任务到线程池执行
         */
        private void tryScheduleToExecute() {
            if (on.compareAndSet(false, true)) {
                try {
                    executor.execute(this);
                } catch (Throwable t) {
                    if (!cancelled) {
                        doCancel();
                        try {
                            terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
                        } finally {
                            inboundSignals.clear();
                            on.set(false);
                        }
                    }
                }
            }
        }

        @Override
        public void run() {
            if (on.get()) {
                try {
                    final Signal s = inboundSignals.poll();
                    if (!cancelled) {
                        if (s instanceof Request) {
                            doRequest(((Request) s).n);
                        } else if (s == Send.INSTANCE) {
                            doSend();
                        } else if (s == Cancel.INSTANCE) {
                            doCancel();
                        } else if (s == Subscribe.INSTANCE) {
                            doSubscribe();
                        }
                    }
                } finally {
                    on.set(false);
                    if (!inboundSignals.isEmpty()) {
                        tryScheduleToExecute();
                    }
                }
            }
        }

        // ========== Subscription 接口实现 ==========

        @Override
        public void request(final long n) {
            signal(new Request(n));
        }

        @Override
        public void cancel() {
            signal(Cancel.INSTANCE);
        }

        /**
         * 初始化订阅,发送订阅信号
         */
        void init() {
            signal(Subscribe.INSTANCE);
        }
    }
}

异步消费者

package com.gwx.webflux.demo;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 基于 Executor 异步运行的订阅者实现,一次请求一个元素,
 * 然后对每个元素调用用户定义的方法进行处理。
 *
 * <p>注意:本类包含大量 try-catch 块,用于说明何时可以抛出异常,何时不能抛出异常。</p>
 *
 * @param <T> 订阅的数据类型
 */
public abstract class AsyncSubscriber<T> implements Subscriber<T>, Runnable {
    // ========== 内部信号定义 ==========

    /**
     * 发布者和订阅者之间的异步协议信号接口
     */
    private interface Signal {
    }

    /**
     * 数据流完成信号
     */
    private enum OnComplete implements Signal {INSTANCE}

    /**
     * 错误信号
     */
    private static class OnError implements Signal {
        final Throwable error;

        OnError(Throwable error) {
            this.error = error;
        }
    }

    /**
     * 数据项信号
     */
    private static class OnNext<T> implements Signal {
        final T next;

        OnNext(T next) {
            this.next = next;
        }
    }

    /**
     * 订阅成功信号
     */
    private static class OnSubscribe implements Signal {
        final Subscription subscription;

        OnSubscribe(Subscription subscription) {
            this.subscription = subscription;
        }
    }

    // ========== 成员变量 ==========

    private Subscription subscription;  // 订阅关系
    private boolean done;             // 是否已完成处理
    private final Executor executor;  // 异步执行线程池

    // 入站信号队列
    private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<>();

    // 防止并发执行的标志
    private final AtomicBoolean on = new AtomicBoolean(false);

    // ========== 构造方法 ==========

    /**
     * 创建 AsyncSubscriber 实例
     *
     * @param executor 用于异步处理的线程池
     * @throws NullPointerException 如果 executor 为 null
     */
    protected AsyncSubscriber(Executor executor) {
        if (executor == null) throw new NullPointerException("executor cannot be null");
        this.executor = executor;
    }

    // ========== 抽象方法 ==========

    /**
     * 处理下一个元素
     *
     * @param element 接收到的元素
     * @return 是否还需要更多元素
     */
    protected abstract boolean whenNext(T element);

    /**
     * 处理完成信号
     */
    protected void whenComplete() {
    }

    /**
     * 处理错误信号
     *
     * @param error 错误原因
     */
    protected void whenError(Throwable error) {
    }

    // ========== 核心方法 ==========

    /**
     * 标记订阅者已完成处理
     */
    private void done() {
        done = true;
        if (subscription != null) {
            try {
                subscription.cancel();
            } catch (Throwable t) {
                new IllegalStateException(
                        subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)
                        .printStackTrace(System.err);
            }
        }
    }

    /**
     * 处理订阅信号
     *
     * @param s 订阅关系
     */
    private void handleOnSubscribe(Subscription s) {
        if (s == null) {
            // 忽略 null Subscription
            return;
        }

        if (subscription != null) {
            // 重复订阅,取消新订阅
            try {
                s.cancel();
            } catch (Throwable t) {
                new IllegalStateException(
                        s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)
                        .printStackTrace(System.err);
            }
        } else {
            subscription = s;
            try {
                // 每次只请求一个元素
                s.request(1);
            } catch (Throwable t) {
                new IllegalStateException(
                        s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)
                        .printStackTrace(System.err);
            }
        }
    }

    /**
     * 处理数据信号
     *
     * @param element 接收到的元素
     */
    private void handleOnNext(T element) {
        if (done) return;

        if (subscription == null) {
            new IllegalStateException(
                    "Violated Reactive Streams rule 1.09 and 2.1 by signalling OnNext before Subscription.request")
                    .printStackTrace(System.err);
            return;
        }

        try {
            if (whenNext(element)) {
                try {
                    // 处理成功后请求下一个元素
                    subscription.request(1);
                } catch (Throwable t) {
                    new IllegalStateException(
                            subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)
                            .printStackTrace(System.err);
                }
            } else {
                done();
            }
        } catch (Throwable t) {
            done();
            try {
                onError(t);
            } catch (Throwable t2) {
                new IllegalStateException(
                        this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)
                        .printStackTrace(System.err);
            }
        }
    }

    /**
     * 处理完成信号
     */
    private void handleOnComplete() {
        if (subscription == null) {
            new IllegalStateException(
                    "Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")
                    .printStackTrace(System.err);
            return;
        }

        done = true;
        whenComplete();
    }

    /**
     * 处理错误信号
     *
     * @param error 错误原因
     */
    private void handleOnError(Throwable error) {
        if (subscription == null) {
            new IllegalStateException(
                    "Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")
                    .printStackTrace(System.err);
            return;
        }

        done = true;
        whenError(error);
    }

    // ========== Subscriber 接口实现 ==========

    @Override
    public final void onSubscribe(Subscription s) {
        if (s == null) throw new NullPointerException("Subscription cannot be null");
        signal(new OnSubscribe(s));
    }

    @Override
    public final void onNext(T element) {
        if (element == null) throw new NullPointerException("Element cannot be null");
        signal(new OnNext<>(element));
    }

    @Override
    public final void onError(Throwable t) {
        if (t == null) throw new NullPointerException("Throwable cannot be null");
        signal(new OnError(t));
    }

    @Override
    public final void onComplete() {
        signal(OnComplete.INSTANCE);
    }

    // ========== Runnable 接口实现 ==========

    @Override
    public final void run() {
        if (on.get()) {
            try {
                Signal s = inboundSignals.poll();
                if (!done) {
                    if (s instanceof OnNext) {
                        handleOnNext(((OnNext<T>) s).next);
                    } else if (s instanceof OnSubscribe) {
                        handleOnSubscribe(((OnSubscribe) s).subscription);
                    } else if (s instanceof OnError) {
                        handleOnError(((OnError) s).error);
                    } else if (s == OnComplete.INSTANCE) {
                        handleOnComplete();
                    }
                }
            } finally {
                on.set(false);
                if (!inboundSignals.isEmpty()) {
                    tryScheduleToExecute();
                }
            }
        }
    }

    // ========== 信号处理 ==========

    /**
     * 发送信号到队列
     *
     * @param signal 要发送的信号
     */
    private void signal(Signal signal) {
        if (inboundSignals.offer(signal)) {
            tryScheduleToExecute();
        }
    }

    /**
     * 尝试调度任务到线程池执行
     */
    private void tryScheduleToExecute() {
        if (on.compareAndSet(false, true)) {
            try {
                executor.execute(this);
            } catch (Throwable t) {
                if (!done) {
                    try {
                        done();
                    } finally {
                        inboundSignals.clear();
                        on.set(false);
                    }
                }
            }
        }
    }
}

测试类

public class ReactiveTest {
    public static void main(String[] args) {
        // 测试1:基本功能测试
        System.out.println("=== 测试1:基本功能测试 ===");
        testBasicFunctionality();

        // 测试2:异常处理测试
        System.out.println("\n=== 测试2:异常处理测试 ===");
        testErrorHandling();

        // 测试3:大数据量测试
        System.out.println("\n=== 测试3:大数据量测试 ===");
        testLargeDataSet();
    }

    private static void testBasicFunctionality() {
        Set<Integer> elements = new HashSet<>();
        for (int i = 1; i <= 5; i++) {
            elements.add(i);
        }

        ExecutorService executor = Executors.newFixedThreadPool(3);
        AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executor);

        publisher.subscribe(new AsyncSubscriber<Integer>(Executors.newSingleThreadExecutor()) {
            @Override
            protected boolean whenNext(Integer element) {
                System.out.println("处理元素: " + element);
                return true;
            }

            @Override
            protected void whenComplete() {
                System.out.println("所有元素处理完成");
                executor.shutdown();
            }
        });
    }

    private static void testErrorHandling() {
        Set<Integer> elements = new HashSet<>();
        elements.add(1);
        elements.add(2);
        elements.add(3);

        ExecutorService executor = Executors.newFixedThreadPool(2);
        AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executor);

        publisher.subscribe(new AsyncSubscriber<Integer>(Executors.newSingleThreadExecutor()) {
            @Override
            protected boolean whenNext(Integer element) {
                System.out.println("处理元素: " + element);
                if (element == 2) {
                    throw new RuntimeException("模拟处理元素2时出错");
                }
                return true;
            }

            @Override
            protected void whenError(Throwable error) {
                System.err.println("发生错误: " + error.getMessage());
                executor.shutdown();
            }
        });
    }

    private static void testLargeDataSet() {
        Set<Integer> elements = new HashSet<>();
        for (int i = 1; i <= 1000; i++) {
            elements.add(i);
        }

        ExecutorService executor = Executors.newFixedThreadPool(10);
        AsyncIterablePublisher<Integer> publisher = new AsyncIterablePublisher<>(elements, executor);

        long startTime = System.currentTimeMillis();

        publisher.subscribe(new AsyncSubscriber<Integer>(Executors.newFixedThreadPool(4)) {
            private int count = 0;

            @Override
            protected boolean whenNext(Integer element) {
                count++;
                if (count % 100 == 0) {
                    System.out.println("已处理 " + count + " 个元素");
                }
                return true;
            }

            @Override
            protected void whenComplete() {
                long duration = System.currentTimeMillis() - startTime;
                System.out.println("处理完成,共处理 " + count + " 个元素,耗时 " + duration + " 毫秒");
                executor.shutdown();
            }
        });
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到