响应式编程Spring Reactor探索

发布于:2024-05-09 ⋅ 阅读:(24) ⋅ 点赞:(0)

一,介绍

响应式编程(Reactive Programming),简单来说是一种生产者只负责生成并发出数据/事件,消费者来监听并负责定义如何处理数据/事件的变化传递方式的编程思想。

响应式编程借鉴了Reactor设计模式,我们通常会在高性能NIO网络通信框架中见到Reactor设计模式的身影,用来实现I/O多路复用。

基本思想是将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程阻塞在多路复用器上,通过轮询或者边缘触发的方式来处理网络I/O事件。当有新的I/O事件到来或准备就绪时,多路复用器返回并将事件分发到对应的处理器中。Reactor设计模式和响应式编程类似,它们都不主动调用某个请求的API,而是通过注册对应接口,实现事件触发执行。

Reactor 诞生在响应式流规范制定之后,从一开始就是严格按照响应式流规范设计并实现了它的 API,因此Spring 选择它作为默认响应式编程框架。

背压处理
背压是所有响应式编程框架所必须要考虑的核心机制,Reactor 框架支持所有常见的背压传播模式,包括以下几种。

纯推模式:订阅者通过 subscription.request(Long.MAX_VALUE) 请求有效无限数量的元素。

纯拉模式:订阅者通过 subscription.request(1) 方法在收到前一个元素后只请求下一个元素。

推-拉混合模式:当订阅者有实时控制需求时,发布者可以适应所提出的数据消费速度。

二,动手实现

1,引入pom

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </exclusion>
    </exclusions>
</dependency>

2,Controller层创建Mono来接收请求

创建Mono,设置超时时间为60秒,将任务交给service层去处理,请求会在这儿挂着,然后可以继续接收新的请求。service层处理后续业务,处理完成之后,调用monoSink.success方法返回。

如果60秒没有处理完成,就直接完成这个请求,发送超时的返回值。

如果中途执行中出现校验异常之类的,也会直接返回,结束这个请求。

/**
 * 接收请求
 *
 * @param cmd 请求指令
 * @return 结果
 */
@PostMapping(value = "order")
public Mono<OrderResponse> order(@RequestBody OrderCmd cmd, ServerHttpRequest request) {
    return createMono(cmd, request);
}
/**
 * 创建 Mono
 *
 * @param cmd     请求指令
 * @param request 请求
 * @return Mono
 */
public Mono<OrderResponse> createMono(OrderCmd cmd, ServerHttpRequest request) {
    // 生成关联 ID
    String correlationId = genCorrelationId(request.getId());
    // 创建MonoSink, 执行业务逻辑
    return Mono.create((Consumer<MonoSink<OrderResponse>>) monoSink -> orderService.dealRequest(correlationId, cmd, monoSink))
            // 超时时间
            .timeout(Duration.ofSeconds(60))
            //只处理 OrderException,给正常响应包
            .onErrorResume(OrderException.class, e -> Mono.just(orderService.dealException(correlationId, cmd, e)))
            .doOnSuccess(obj -> {
                if (Objects.equals(obj.get("超时返回"), "超时返回")) {
                    exceptionNoticesService.notice(new OrderException("超时返回"));
                }
            });
}

3,service层执行逻辑,并注册dispose事件

可以将MonoSink缓存,后面业务逻辑执行完成之后从缓存中获取MonoSink对象。

// 将对象存储
cacheStore.put(correlationId, monoSink);

// 当完成其操作并关闭时,onDispose方法会被调用,以便释放资源或执行其他必要的清理工作。
monoSink.onDispose(() -> cacheStore.remove(correlationId));

//完成业务逻辑之后,调success方法,返回请求结果

MonoSink<OrderResponse> monoSink = cacheStore.get(correlationId);
monoSink.success(transResponse);

三,小结

Spring Reactor框架是响应式编程的一个很好的实践,能帮助开发者快速完成相关的需求,能很好的实现支持背压处理。