【SpringBoot】10 分钟上手:告别阻塞!从 0 到 1 搭建Spring WebFlux 生产级响应式架构标准模板

发布于:2025-09-12 ⋅ 阅读:(17) ⋅ 点赞:(0)

Spring WebFlux 核心知识

Spring WebFlux 是 Spring 5 推出的异步非阻塞响应式 Web 框架,Spring Boot 2.X 自动集成,专为解决高并发、IO 密集场景性能瓶颈设计,核心知识简化如下:

1.基础核心:2个关键概念

1.1. 响应式编程(底层思想)

  • 核心区别:和传统“命令式编程”不同,数据变化会自动传播:
    • 命令式:a = b + c → a 值固定,后续改 b/c 不影响 a;
    • 响应式:a = b + c → a 随 b/c 实时变,自动同步结果。
  • 核心优势:不用加集群,少量线程就能扛高并发,节省资源。
  • 通俗理解
    • 传统阻塞:请求来了→线程等任务做完→才处理下一个;
    • 响应式:请求来了→丢给异步线程→线程继续处理其他请求→任务完了再通知返回。

1.2. Reactor 框架(底层依赖)

WebFlux 的响应式能力全靠它,核心是2个“数据载体”:

载体 数据范围 典型场景
Flux 0~N个元素(异步序列) 查列表、实时流推送
Mono 0~1个元素(异步结果) 查详情、单条新增/修改返回

2.WebFlux vs Spring MVC:核心差异

2.1. 编程模型(开发方式)

编程模型 WebFlux 特点 核心工具
注解式 复用 Spring MVC 注解(如@Controller),迁移方便 @GetMapping/@PostMapping
函数式 用 Java 8 Lambda,更贴合响应式 RouterFunction(路由)、HandlerFunction(处理器)

2.2. 关键维度对比

对比项 Spring WebFlux Spring MVC
并发模型 异步非阻塞(EventLoop 事件循环) 同步阻塞(线程池)
适用场景 IO 密集(如实时推送、高并发网关) CPU 密集、简单 CRUD
开发难度 稍高(要学 Flux/Mono) 低(传统同步代码)
生态兼容 只支持响应式库(如 MongoDB Reactive) 兼容所有 Servlet 生态(JDBC、MyBatis 等)

3.选型与注意事项

3.1. 选谁?3个原则

  • 现有 MVC 够用→不迁移(迁移成本高,对 CPU 密集场景没用);
  • 优先选 WebFlux→需扛 10000+ 并发、实时交互(如聊天、监控);
  • 别选 WebFlux→做复杂计算(CPU 密集)、用传统 JDBC/MyBatis。

3.三个关键注意事项

  1. 容器:用 Netty 或 Tomcat 9+(需支持非阻塞);
  2. 数据库:只能用响应式的(如 MongoDB Reactive、R2DBC),传统 JDBC 用不了;
  3. 调试:异步流难追踪,可加 Hooks.onOperatorDebug() 看详细日志。

在这里插入图片描述

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::               (v2.6.13)

.......
.......
.......
2025-09-08 17:23:41.542  INFO 24900 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
.......
.......
.......

在这里插入图片描述

.......
.......
.......
2025-09-08 17:23:34.279  INFO 16580 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8081 (http)
.......
.......
.......

springwebflux和springweb对应的内置服务器是不一样的 前者是以netty实现的非阻塞服务器后者是tomcat


四、 Spring Boot 2.6 + WebFlux 生产级响应式编程标准模板

下面是一个完整的标准化代码结构模板,适用于使用 Spring Boot 2.6 和 WebFlux 进行响应式编程的开发。这个模板包含了从控制器到测试的全套结构,并遵循最佳实践。

1. 项目结构

src/main/java/com/oyjp/v2/
├── config/            # 配置类
├── controller/        # 控制器层
├── exception/         # 异常处理
├── handler/           # 函数式端点处理器(可选)
├── model/             # 数据模型
│   ├── dto/           # 数据传输对象
│   ├── entity/        # 实体类
│   └── request/       # 请求对象
├── repository/        # 数据访问层
├── service/           # 服务层
├── util/              # 工具类
└── SpringbootWebfluxApplication.java # 启动类

src/test/java/com/oyjp/v2/
├── controller/        # 控制器测试
├── service/           # 服务层测试
└── repository/        # 仓库层测试

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.oyjp</groupId>
    <artifactId>boot_webflux</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot_webflux</name>
    <description>springboot_webflux</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.6.13</spring-boot.version>
    </properties>
    <dependencies>
        <!--同时引入了 Spring Web MVC(通过 WebMvcAutoConfiguration)
        和 Spring WebFlux(通过 DelegatingWebFluxConfiguration)相关的自动配置类,
        而这两个框架在处理请求映射时都定义了名为 requestMappingHandlerAdapter 的 Bean,
        并且默认情况下 Spring Boot 禁止 Bean 定义覆盖,所以导致了冲突。-->

        <!--如果你的项目是基于响应式编程的,建议只保留 spring-boot-starter-webflux 依赖,
        移除可能引入 Spring Web MVC 相关配置的其他依赖;
        如果项目是基于传统的 Servlet 模型,则移除 spring-boot-starter-webflux 依赖。-->

        <!--<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>-->
        <!-- Spring Boot WebFlux + R2DBC 核心依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- 响应式数据库(如 R2DBC) -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>

        <!-- R2DBC MySQL 驱动(推荐使用最新稳定版) -->
        <dependency>
            <groupId>dev.miku</groupId>
            <artifactId>r2dbc-mysql</artifactId>
            <version>0.8.2.RELEASE</version> <!-- 或更高版本 -->
        </dependency>

        <!-- 可选:R2DBC 连接池(提升性能) -->
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-pool</artifactId>
        </dependency>

        <!-- 测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>


    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.oyjp.v2.SpringbootWebfluxApplication</mainClass>
                    <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

编写配置文件appliction.yml

# 应用服务 WEB 访问端口
server:
  port: 8080

spring:
  r2dbc:
    url: r2dbc:mysql://localhost:3306/test?serverZoneId=GMT%2B8&characterEncoding=UTF-8
    username: root
    password: root
    # 可选:启用连接池
    pool:
      enabled: true
      initial-size: 5
      max-size: 20

# 日志配置
logging:
  level:
    root: info
    reactor:
      core:
        publisher: debug
      netty:
        http:
          server: debug

2. 核心代码模板

2.1 启动类

package com.oyjp.v2;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbootWebfluxApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringbootWebfluxApplication.class, args);
    }
}

2.2 配置类

熟悉Spring Data JPA的同学应该很轻车熟路了。

package com.oyjp.v2.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.reactive.function.client.WebClient;

/**
 * WebFlux 全局配置类,用于定制 JSON 序列化/反序列化行为及 WebClient 客户端。

 * @see WebFluxConfigurer WebFlux 配置接口
 * @see ObjectMapper Jackson JSON 处理器
 */
@Configuration
@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer {

    /**
     * 配置 JSON 编码器(序列化),使用自定义的 {@link ObjectMapper}。
     * @param objectMapper 自定义的 Jackson {@link ObjectMapper} 实例
     * @return {@link Jackson2JsonEncoder} 编码器实例
     */
    @Bean
    public Jackson2JsonEncoder jackson2JsonEncoder(ObjectMapper objectMapper) {
        return new Jackson2JsonEncoder(objectMapper);
    }

    /**
     * 配置 JSON 解码器(反序列化),使用自定义的 {@link ObjectMapper}。
     *
     * @param objectMapper 自定义的 Jackson {@link ObjectMapper} 实例
     * @return {@link Jackson2JsonDecoder} 解码器实例
     */
    @Bean
    public Jackson2JsonDecoder jackson2JsonDecoder(ObjectMapper objectMapper) {
        return new Jackson2JsonDecoder(objectMapper);
    }

    /**
     * 配置 HTTP 消息编解码器,覆盖默认的 JSON 编解码逻辑。
     * <p>通过 {@link ServerCodecConfigurer} 注册自定义的编解码器,确保全局生效。</p>
     *
     * @param configurer Spring WebFlux 提供的编解码配置器
     */
    @Override
    public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
        configurer.defaultCodecs().jackson2JsonEncoder(jackson2JsonEncoder(objectMapper()));
        configurer.defaultCodecs().jackson2JsonDecoder(jackson2JsonDecoder(objectMapper()));
    }

    /**
     * 自定义 {@link ObjectMapper},用于 JSON 序列化/反序列化。
     * <p>配置项:</p>
     * <ul>
     *     <li>注册 {@link JavaTimeModule} 支持 Java 8 日期时间 API(如 {@code LocalDateTime})</li>
     *     <li>禁用 {@link SerializationFeature#WRITE_DATES_AS_TIMESTAMPS},强制使用 ISO-8601 格式</li>
     * </ul>
     *
     * @return 配置完成的 {@link ObjectMapper} 实例
     */
    @Bean
    public ObjectMapper objectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule()); // 支持 Java 8 日期时间
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); // 禁用时间戳输出
        return mapper;
    }

    /**
     * 初始化响应式 {@link WebClient} 客户端,默认指向 "http://api.example.com"。
     * <p>可通过 {@code WebClient.Builder} 进一步扩展配置(如超时、拦截器等)。</p>
     *
     * @return 预配置的 {@link WebClient} 实例
     */
    @Bean
    public WebClient webClient() {
        return WebClient.builder()
                .baseUrl("http://api.example.com") // 默认基础 URL
                .build();
    }
}

2.3 模型类

实体类 (示例)
package com.oyjp.v2.model.entity;

import com.oyjp.v2.model.dto.UserDto;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;

/**
 * 实体类 (示例)
 CREATE TABLE `users` (
 `id` bigint NOT NULL AUTO_INCREMENT,
 `username` varchar(255) DEFAULT NULL,
 `email` varchar(255) DEFAULT NULL,
 PRIMARY KEY (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

 INSERT INTO `users` (`username`, `email`) VALUES
 ('john_doe', 'john@example.com'),
 ('jane_smith', 'jane@example.com'),
 ('alice_wonder', 'alice@example.org'),
 ('bob_builder', 'bob@construction.com'),
 ('charlie_brown', 'charlie@peanuts.com'),
 ('dave_dev', 'dave@code.dev'),
 ('emma_editor', 'emma@content.io'),
 ('frank_finance', 'frank@money.com'),
 ('grace_designer', 'grace@art.design'),
 ('henry_hr', 'henry@company.com');

 SELECT * FROM `users`;

 */
@Table("users")
@Data
public class User {
    @Id
    private Long id;
    private String username;
    private String email;
}
DTO 类 (示例)
package com.oyjp.v2.model.dto;

import com.oyjp.v2.model.entity.User;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author JianPeng OuYang
 * @version v1.0
 * @date 2025/9/8 15:14
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserDto {
    private Long id;
    private String username;
    private String email;


    // 构造器、getter和setter省略
    // 实际项目中建议使用Lombok
    public  static   UserDto convertToDto(User user) {
        return new UserDto(user.getId(),user.getUsername(), user.getEmail());
    }

    public static User convertToEntity(UserDto dto) {
        User user = new User();
        user.setUsername(dto.getUsername());
        user.setEmail(dto.getEmail());
        return user;
    }
}

2.4 仓库层(响应式)

声明 CRUD 接口
上面实体类中的@Table注解是有说法的,当我们的操作接口继承的是ReactiveCrudRepository<T, ID> 或者ReactiveSortingRepository<T, ID>时,需要在实体类上使用@Table注解,这也是推荐的用法。

/**
 * 用户数据访问接口,提供基于 Reactive Streams 的数据库操作。
 * 继承 Spring Data R2DBC 的基础仓库功能,并扩展自定义查询方法。
 *
 * @version 1.0
 * @see User 关联的用户实体类
 * @see R2dbcRepository 基础响应式仓库接口
 */
@Repository
public interface UserRepository extends R2dbcRepository<User, Long> {

    /**
     * 根据用户名查询单个用户(精确匹配)。
     *
     * @param username 用户名,不可为 null
     * @return {@link Mono<User>} 包含查询结果的响应式流,若无结果则返回空 Mono
     * @throws IllegalArgumentException 如果 username 为 null
     */
    Mono<User> findByUsername(String username);

    /**
     * 根据邮箱关键字模糊查询用户列表(包含匹配)。
     *
     * @param email 邮箱关键字,不可为 null
     * @return {@link Flux<User>} 包含匹配结果的响应式流
     * @throws IllegalArgumentException 如果 email 为 null
     */
    Flux<User> findByEmailContaining(String email);
}

  • 继承R2dbcRepository<T, ID>接口。然后ReactiveClientUserSortingRepository将提供一些操作数据库的方法。
    在这里插入图片描述

2.5 服务层(响应式)

package com.oyjp.v2.service;


import com.oyjp.v2.exception.CustomTimeoutException;
import com.oyjp.v2.exception.ResourceNotFoundException;
import com.oyjp.v2.exception.TimeoutException;
import com.oyjp.v2.model.dto.UserDto;
import com.oyjp.v2.model.entity.User;
import com.oyjp.v2.repository.UserRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

import java.time.Duration;

/**
 * 用户业务逻辑服务层,处理用户数据的 CRUD 及响应式流操作。
 * 使用 {@link UserRepository} 进行数据访问,并集成事务管理。
 * 包含生产级优化:背压控制、超时处理、重试机制等。
 *
 * @version 1.0
 * @see UserRepository 数据访问层
 * @see UserDto 数据传输对象
 */
@Service
@RequiredArgsConstructor
@Transactional(readOnly = true)
public class UserService {
    private final UserRepository userRepository;


    /**
     * 获取所有用户列表(响应式流)。
     * 生产级优化:
     * <ul>
     *     <li>背压控制:{@link Flux#onBackpressureBuffer} 限制队列大小</li>
     *     <li>超时处理:3秒超时自动终止</li>
     *     <li>重试机制:指数退避重试3次</li>
     * </ul>
     *
     * @return {@link Flux<UserDto>} 用户数据流,异常时返回错误信号
     * @throws CustomTimeoutException 如果操作超时
     */
    public Flux<UserDto> getAllUsers() {
        return userRepository.findAll()
                //生产级优化建议
                .onBackpressureBuffer(100) // 限制队列大小: 使用 Flux#onBackpressureBuffer 或 Flux#onBackpressureDrop 防止消费者过载。
                .timeout(Duration.ofSeconds(3)) // 3秒超时处理
                .onErrorResume(TimeoutException.class, e -> Flux.error(new CustomTimeoutException("超时")))
                .retryWhen(Retry.backoff(3, Duration.ofMillis(100))) // 指数退避重试
                .map(UserDto::convertToDto);
    }


    /**
     * 模拟大数据流测试(用于客户端实时接收能力验证)。返回一个延迟流(如 Flux.interval 生成的模拟数据),观察客户端是否能实时接收数据:
     * 每100ms生成一个模拟用户数据,共生成50条。
     *
     * @return {@link Flux<UserDto>} 模拟用户数据流
     */
    public Flux<UserDto> intervalGetAllUsers() {
        return Flux.interval(Duration.ofMillis(100)) // 每100ms生成一个数据
                .map(i -> new UserDto(i, "user" + i, "user" + i + "@example.com"))
                .take(50); // 生成10条数据
    }

    /**
     * 根据ID获取单个用户。
     *
     * @param id 用户ID,不可为null
     * @return {@link Mono<UserDto>} 用户数据,若不存在则返回错误信号
     * @throws ResourceNotFoundException 如果用户不存在
     */
    public Mono<UserDto> getUserById(Long id) {
        return userRepository.findById(id)
                .map(UserDto::convertToDto)
                .switchIfEmpty(Mono.error(new ResourceNotFoundException("User not found")));
    }

    /**
     * 创建新用户(写操作)。
     * 使用 {@link Transactional} 注解确保事务性。
     *
     * @param userDtoMono 包含用户数据的 {@link Mono<UserDto>},不可为null
     * @return {@link Mono<UserDto>} 创建成功的用户数据
     */
    @Transactional
    public Mono<UserDto> createUser(Mono<UserDto> userDtoMono) {
        return userDtoMono
                .map(UserDto::convertToEntity)
                .flatMap(userRepository::save)
                .map(UserDto::convertToDto);
    }

    /**
     * 更新用户信息(写操作)。
     * 若用户不存在则返回错误信号。
     *
     * @param id 用户ID,不可为null
     * @param userDtoMono 包含更新数据的 {@link Mono<UserDto>},不可为null
     * @return {@link Mono<UserDto>} 更新后的用户数据
     * @throws ResourceNotFoundException 如果用户不存在
     */
    @Transactional
    public Mono<UserDto> updateUser(Long id, Mono<UserDto> userDtoMono) {
        return userRepository.findById(id)
                .zipWith(userDtoMono)
                .map(tuple -> {
                    User existing = tuple.getT1();
                    UserDto updated = tuple.getT2();
                    existing.setUsername(updated.getUsername());
                    existing.setEmail(updated.getEmail());
                    return existing;
                })
                .flatMap(userRepository::save)
                .map(UserDto::convertToDto)
                .switchIfEmpty(Mono.error(new ResourceNotFoundException("User not found")));
    }

    /**
     * 删除用户(写操作)。
     * 若用户不存在则静默处理。
     *
     * @param id 用户ID,不可为null
     * @return {@link Mono<Void>} 删除完成信号
     */
    @Transactional
    public Mono<Object> deleteUser(Long id) {
        return userRepository.deleteById(id)
                .then(Mono.defer(() ->
                        Mono.error(new ResourceNotFoundException("User not found"))
                ))
                .onErrorResume(ResourceNotFoundException.class, e -> Mono.empty());
    }
}

2.6 控制器层(响应式:注解式类似 Spring MVC)

package com.oyjp.v2.controller;

import com.oyjp.v2.exception.ResourceNotFoundException;
import com.oyjp.v2.model.dto.UserDto;
import com.oyjp.v2.service.UserService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * 用户管理 RESTful API 控制器,提供用户数据的增删改查(CRUD)操作。
 * 基于响应式编程模型,返回 {@link Flux} 或 {@link Mono} 类型的数据流。
 *
 * @see UserService 业务逻辑层
 * @see UserDto 数据传输对象
 */
@RestController
@RequestMapping("/api/users")
@RequiredArgsConstructor
public class UserController {
    private final UserService userService;

    /**
     * 获取所有用户列表(响应式流)。
     *
     * @return {@link Flux<UserDto>} 用户数据流
     */
    @GetMapping
    public Flux<UserDto> getAllUsers() {
        return userService.getAllUsers();
    }

    /**
     * 模拟大数据流测试(用于客户端实时接收能力验证)。
     * 每100ms生成一个模拟用户数据,共生成50条。
     *
     * @return {@link Flux<UserDto>} 模拟用户数据流
     */
    @GetMapping(value = "/interval", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<UserDto> intervalGetAllUsers() {
        return userService.intervalGetAllUsers();
    }

    /**
     * 根据ID获取单个用户。
     *
     * @param id 用户ID,从路径变量中获取
     * @return {@link Mono<UserDto>} 用户数据,若不存在则返回错误信号
     * @throws ResourceNotFoundException 如果用户不存在(由服务层抛出)
     */
    @GetMapping("/{id}")
    public Mono<UserDto> getUserById(@PathVariable Long id) {
        return userService.getUserById(id);
    }

    /**
     * 创建新用户。
     *
     * @param userDto 包含用户数据的 {@link Mono<UserDto>},从请求体中获取
     * @return {@link Mono<UserDto>} 创建成功的用户数据
     * @responseStatus 201 Created - 创建成功
     */
    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<UserDto> createUser(@RequestBody Mono<UserDto> userDto) {
        return userService.createUser(userDto);
    }

    /**
     * 更新用户信息。
     * 若用户不存在则返回错误信号。
     *
     * @param id 用户ID,从路径变量中获取
     * @param userDto 包含更新数据的 {@link Mono<UserDto>},从请求体中获取
     * @return {@link Mono<UserDto>} 更新后的用户数据
     * @throws ResourceNotFoundException 如果用户不存在(由服务层抛出)
     */
    @PutMapping("/{id}")
    public Mono<UserDto> updateUser(@PathVariable Long id, @RequestBody Mono<UserDto> userDto) {
        return userService.updateUser(id, userDto);
    }

    /**
     * 删除用户。
     * 若用户不存在则静默处理。
     *
     * @param id 用户ID,从路径变量中获取
     * @return {@link Mono<Void>} 删除完成信号
     * @responseStatus 204 No Content - 删除成功
     */
    @DeleteMapping("/{id}")
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public Mono<Object> deleteUser(@PathVariable Long id) {
        return userService.deleteUser(id);
    }
}

2.7.全局异常处理(响应式)

自定义异常

package com.oyjp.v2.exception;

/**
 * @author JianPeng OuYang
 * @version v1.0
 * @date 2025/9/8 15:55
 */

public class CustomTimeoutException extends RuntimeException {
    public CustomTimeoutException(String message) {
        super(message);
    }
}
package com.oyjp.v2.exception;

public class ResourceNotFoundException extends RuntimeException {
    public ResourceNotFoundException(String message) {
        super(message);
    }
}

package com.oyjp.v2.exception;

/**
 * @author JianPeng OuYang
 * @version v1.0
 * @date 2025/9/8 15:55
 */

public class TimeoutException extends RuntimeException {
    public TimeoutException(String message) {
        super(message);
    }
}

编写全局异常处理aop切面

package com.oyjp.v2.exception;

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import reactor.core.publisher.Mono;

/**
 * 全局异常处理器,用于统一处理应用程序中的异常
 */
@ControllerAdvice  // 启用后会对所有Controller的异常进行统一处理
public class GlobalExceptionHandler {

    /**
     * 处理资源未找到异常(自定义业务异常)
     *
     * @param ex 资源未找到异常对象
     * @return 包含异常消息的响应体(404 Not Found)
     */
    @ExceptionHandler(ResourceNotFoundException.class)
    public Mono<ResponseEntity<String>> handleResourceNotFound(ResourceNotFoundException ex) {
        // 返回404响应,包含异常消息
        return Mono.just(ResponseEntity.status(HttpStatus.NOT_FOUND).body(ex.getMessage()));
    }

    /**
     * 处理未捕获的服务器内部异常
     *
     * @param ex 通用异常对象
     * @return 通用错误消息的响应体(500 Internal Server Error)
     */
    @ExceptionHandler(Exception.class)
    public Mono<ResponseEntity<String>> handleInternalServerError(Exception ex) {
        // 返回500响应,隐藏具体异常细节(生产环境建议)
        return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("Internal Server Error"));
    }
}
  • 所有方法产生异常返回Mono<ResponseEntity>,适配Spring WebFlux的响应式特性
  • 使用Mono.just()将同步结果包装为响应式流

2.8 函数式端点(Router Function) (另外一种控制器实现风格-可选)

package com.oyjp.v2.handler;


import com.oyjp.v2.exception.ResourceNotFoundException;
import com.oyjp.v2.model.dto.UserDto;
import com.oyjp.v2.service.UserService;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

/**
 * 用户业务处理器,处理与用户相关的HTTP请求
 * 使用函数式端点(Functional Endpoints)方式实现
 * @see com.oyjp.v2.config.RouterConfig
 */
@Component  // 声明为Spring组件,由Spring容器管理生命周期
public class UserHandler {
    private final UserService userService;  // 用户业务服务接口

    /**
     * 构造函数注入UserService依赖
     * @param userService 用户业务服务实现
     */
    public UserHandler(UserService userService) {
        this.userService = userService;
    }

    /**
     * 处理获取所有用户的请求
     * @param request HTTP请求对象
     * @return 包含用户列表的响应体(200 OK)
     */
    public Mono<ServerResponse> getAllUsers(ServerRequest request) {
        // 1. 调用服务层获取所有用户(返回Flux<UserDto>)
        // 2. 将结果包装为成功的响应(200 OK)
        // 3. 指定响应体类型为UserDto(用于内容协商)
        return ServerResponse.ok()
                .body(userService.getAllUsers(), UserDto.class);
    }

    /**
     * 处理根据ID获取单个用户的请求
     * @param request HTTP请求对象(包含路径变量id)
     * @return 成功时返回用户数据(200 OK),未找到时返回404
     */
    public Mono<ServerResponse> getUserById(ServerRequest request) {
        // 1. 从路径变量中解析用户ID
        Long id = Long.parseLong(request.pathVariable("id"));

        // 2. 调用服务层获取指定ID的用户(返回Mono<UserDto>)
        return userService.getUserById(id)
                // 3. 如果成功获取用户,包装为成功响应
                .flatMap(user -> ServerResponse.ok()
                        .bodyValue(user))
                // 4. 如果发生资源未找到异常,返回404响应
                .onErrorResume(ResourceNotFoundException.class,
                        ex -> ServerResponse.notFound().build());
    }
}

2.9 函数式端点路由配置

package com.oyjp.v2.config;

import com.oyjp.v2.handler.UserHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.*;

/**
 * 路由配置类,定义函数式端点的路由规则
 */
@Configuration
public class RouterConfig {

    /**
     * 定义用户{@link com.oyjp.v2.handler.UserHandler}相关的路由规则 
     * @param userHandler 自动注入的用户业务处理器
     * @return 组合的路由函数
     */
    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler userHandler) {
        // 使用函数式路由构建器
        return RouterFunctions
                // 定义第一个路由:GET /api/func/users -> 调用userHandler的getAllUsers方法
                .route(GET("/api/func/users"), userHandler::getAllUsers)

                // 追加第二个路由:GET /api/func/users/{id} -> 调用userHandler的getUserById方法
                .andRoute(GET("/api/func/users/{id}"), userHandler::getUserById);

        // 可以继续追加更多路由:.andRoute(...)
    }
}

路由构建:

  • 使用RouterFunctions.route()创建单个路由
  • 使用.andRoute()链式调用追加更多路由

每个路由包含:

  • HTTP方法+路径匹配(如GET(“/api/func/users”))
  • 对应的处理器方法引用(如userHandler::getAllUsers)

这种函数式路由定义方式是Spring WebFlux提供的声明式API,相比传统的@Controller注解方式,它提供了更灵活的路由组合能力。

3. 测试类

3.1 控制器测试

package com.oyjp.v2.controller;

import com.oyjp.v2.model.dto.UserDto;
import com.oyjp.v2.service.UserService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.http.HttpStatus;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class UserControllerTest {

    @Mock
    private UserService userService;

    @InjectMocks
    private UserController userController;

    @Test
    void getAllUsers_shouldReturnFluxOfUsers() {
        // Arrange
        UserDto user1 = new UserDto(1L, "user1", "user1@example.com");
        UserDto user2 = new UserDto(2L, "user2", "user2@example.com");
        when(userService.getAllUsers()).thenReturn(Flux.just(user1, user2));

        // Act & Assert
        StepVerifier.create(userController.getAllUsers())
                .expectNext(user1)
                .expectNext(user2)
                .verifyComplete();
    }

    @Test
    void getUserById_whenUserExists_shouldReturnUser() {
        // Arrange
        Long userId = 1L;
        UserDto expectedUser = new UserDto(1L, "user1", "user1@example.com");
        when(userService.getUserById(userId)).thenReturn(Mono.just(expectedUser));

        // Act & Assert
        StepVerifier.create(userController.getUserById(userId))
                .expectNext(expectedUser)
                .verifyComplete();
    }

    @Test
    void createUser_shouldReturnCreatedUser() {
        // Arrange
        UserDto inputDto = new UserDto(1L, "newuser", "new@example.com");
        UserDto savedDto = new UserDto(2L, "newuser", "new@example.com");
        when(userService.createUser(any(Mono.class))).thenReturn(Mono.just(savedDto));

        // Act & Assert
        StepVerifier.create(userController.createUser(Mono.just(inputDto)))
                .expectNext(savedDto)
                .verifyComplete();
    }


    @Test
    void testStreamWithBackpressure() {
        Flux<UserDto> flux = userController.getAllUsers();

        StepVerifier.create(flux)
                .expectSubscription()
                .thenRequest(1) // 每次只请求1个元素
                .expectNextCount(1)
                .thenRequest(2)
                .expectNextCount(2)
                .thenCancel()
                .verify();
    }
}

3.2 服务层测试

package com.oyjp.v2.service;

import com.oyjp.v2.exception.ResourceNotFoundException;
import com.oyjp.v2.model.dto.UserDto;
import com.oyjp.v2.model.entity.User;
import com.oyjp.v2.repository.UserRepository;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.mockito.Mockito.*;

/**
 * 测试类:验证 {@link UserService} 的业务逻辑。
 * <p>使用 Mockito 模拟 {@link UserRepository},通过 {@link StepVerifier} 测试响应式流。</p>
 */
@ExtendWith(MockitoExtension.class)
class UserServiceTest {

    @Mock
    private UserRepository userRepository; // 模拟数据库访问层

    @InjectMocks
    private UserService userService; // 待测试的服务层

    /**
     * 测试获取所有用户的场景。
     * <p>验证:返回的 {@link Flux} 包含预期的用户数据,并正确转换为 {@link UserDto}。</p>
     */
    @Test
    void getAllUsers_shouldReturnFluxOfUsers() {
        // Arrange: 模拟数据库返回的用户实体
        User user1 = new User();
        user1.setUsername("user1");
        user1.setEmail("user1@example.com");

        User user2 = new User();
        user2.setUsername("user2");
        user2.setEmail("user2@example.com");

        // 模拟 userRepository.findAll() 返回两个用户
        when(userRepository.findAll()).thenReturn(Flux.just(user1, user2));

        // Act: 调用服务层方法
        Flux<UserDto> result = userService.getAllUsers();

        // Assert: 验证返回的流是否符合预期
        StepVerifier.create(result).expectNextMatches(dto -> "user1".equals(dto.getUsername())) // 验证第一个用户的用户名
                .expectNextMatches(dto -> "user2".equals(dto.getUsername())) // 验证第二个用户的用户名
                .verifyComplete(); // 验证流正常结束
    }

    /**
     * 测试根据 ID 获取用户的场景(用户存在)。
     * <p>验证:返回的 {@link Mono} 包含正确的 {@link UserDto} 数据。</p>
     */
    @Test
    void getUserById_whenUserExists_shouldReturnUser() {
        // Arrange: 模拟数据库返回的用户实体
        Long userId = 1L;
        User user = new User();
        user.setId(userId);
        user.setUsername("existing");
        user.setEmail("existing@example.com");

        // 模拟 userRepository.findById() 返回该用户
        when(userRepository.findById(userId)).thenReturn(Mono.just(user));

        // Act: 调用服务层方法
        Mono<UserDto> result = userService.getUserById(userId);

        // Assert: 验证返回的 Mono 是否符合预期
        StepVerifier.create(result).expectNextMatches(dto -> "existing".equals(dto.getUsername())) // 验证用户名匹配
                .verifyComplete(); // 验证流正常结束
    }

    /**
     * 测试根据 ID 获取用户的场景(用户不存在)。
     * <p>验证:抛出 {@link ResourceNotFoundException} 异常。</p>
     */
    @Test
    void getUserById_whenUserNotExists_shouldThrowException() {
        // Arrange: 模拟用户不存在的情况
        Long nonExistingId = 99L;
        when(userRepository.findById(nonExistingId)).thenReturn(Mono.empty());

        // Act: 调用服务层方法
        Mono<UserDto> result = userService.getUserById(nonExistingId);

        // Assert: 验证是否抛出预期的异常
        StepVerifier.create(result).expectErrorMatches(ex -> ex instanceof ResourceNotFoundException && "User not found".equals(ex.getMessage())) // 验证异常类型和消息
                .verify(); // 验证错误信号
    }
}


这个模板提供了一个完整的、生产就绪的 Spring WebFlux 应用程序结构,遵循了响应式编程的最佳实践,并确保了高性能、可维护性和可扩展性。

通过以上步骤,可在 Spring Boot 中高效构建响应式应用,充分发挥异步非阻塞架构的优势。

四、五.总结

WebFlux 不是 Spring MVC 的“替代品”,而是“补充”——它通过异步非阻塞模型解决 IO 密集型场景的高并发瓶颈,但需依赖响应式生态,且开发/调试成本更高。选型时需明确业务场景(IO 密集 vs CPU 密集),避免为“响应式”而盲目技术选型。