Spring WebFlux 核心知识
Spring WebFlux 是 Spring 5 推出的异步非阻塞响应式 Web 框架,Spring Boot 2.X 自动集成,专为解决高并发、IO 密集场景性能瓶颈设计,核心知识简化如下:
1.基础核心:2个关键概念
1.1. 响应式编程(底层思想)
- 核心区别:和传统“命令式编程”不同,数据变化会自动传播:
- 命令式:
a = b + c
→ a 值固定,后续改 b/c 不影响 a; - 响应式:
a = b + c
→ a 随 b/c 实时变,自动同步结果。
- 命令式:
- 核心优势:不用加集群,少量线程就能扛高并发,节省资源。
- 通俗理解:
- 传统阻塞:请求来了→线程等任务做完→才处理下一个;
- 响应式:请求来了→丢给异步线程→线程继续处理其他请求→任务完了再通知返回。
1.2. Reactor 框架(底层依赖)
WebFlux 的响应式能力全靠它,核心是2个“数据载体”:
载体 | 数据范围 | 典型场景 |
---|---|---|
Flux | 0~N个元素(异步序列) | 查列表、实时流推送 |
Mono | 0~1个元素(异步结果) | 查详情、单条新增/修改返回 |
2.WebFlux vs Spring MVC:核心差异
2.1. 编程模型(开发方式)
编程模型 | WebFlux 特点 | 核心工具 |
---|---|---|
注解式 | 复用 Spring MVC 注解(如@Controller ),迁移方便 |
@GetMapping /@PostMapping |
函数式 | 用 Java 8 Lambda,更贴合响应式 | RouterFunction (路由)、HandlerFunction (处理器) |
2.2. 关键维度对比
对比项 | Spring WebFlux | Spring MVC |
---|---|---|
并发模型 | 异步非阻塞(EventLoop 事件循环) | 同步阻塞(线程池) |
适用场景 | IO 密集(如实时推送、高并发网关) | CPU 密集、简单 CRUD |
开发难度 | 稍高(要学 Flux/Mono) | 低(传统同步代码) |
生态兼容 | 只支持响应式库(如 MongoDB Reactive) | 兼容所有 Servlet 生态(JDBC、MyBatis 等) |
3.选型与注意事项
3.1. 选谁?3个原则
- 现有 MVC 够用→不迁移(迁移成本高,对 CPU 密集场景没用);
- 优先选 WebFlux→需扛 10000+ 并发、实时交互(如聊天、监控);
- 别选 WebFlux→做复杂计算(CPU 密集)、用传统 JDBC/MyBatis。
3.三个关键注意事项
- 容器:用 Netty 或 Tomcat 9+(需支持非阻塞);
- 数据库:只能用响应式的(如 MongoDB Reactive、R2DBC),传统 JDBC 用不了;
- 调试:异步流难追踪,可加
Hooks.onOperatorDebug()
看详细日志。
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.6.13)
.......
.......
.......
2025-09-08 17:23:41.542 INFO 24900 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8080
.......
.......
.......
.......
.......
.......
2025-09-08 17:23:34.279 INFO 16580 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8081 (http)
.......
.......
.......
springwebflux和springweb对应的内置服务器是不一样的 前者是以netty实现的非阻塞服务器后者是tomcat
四、 Spring Boot 2.6 + WebFlux 生产级响应式编程标准模板
下面是一个完整的标准化代码结构模板,适用于使用 Spring Boot 2.6 和 WebFlux 进行响应式编程的开发。这个模板包含了从控制器到测试的全套结构,并遵循最佳实践。
1. 项目结构
src/main/java/com/oyjp/v2/
├── config/ # 配置类
├── controller/ # 控制器层
├── exception/ # 异常处理
├── handler/ # 函数式端点处理器(可选)
├── model/ # 数据模型
│ ├── dto/ # 数据传输对象
│ ├── entity/ # 实体类
│ └── request/ # 请求对象
├── repository/ # 数据访问层
├── service/ # 服务层
├── util/ # 工具类
└── SpringbootWebfluxApplication.java # 启动类
src/test/java/com/oyjp/v2/
├── controller/ # 控制器测试
├── service/ # 服务层测试
└── repository/ # 仓库层测试
引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.oyjp</groupId>
<artifactId>boot_webflux</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot_webflux</name>
<description>springboot_webflux</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
</properties>
<dependencies>
<!--同时引入了 Spring Web MVC(通过 WebMvcAutoConfiguration)
和 Spring WebFlux(通过 DelegatingWebFluxConfiguration)相关的自动配置类,
而这两个框架在处理请求映射时都定义了名为 requestMappingHandlerAdapter 的 Bean,
并且默认情况下 Spring Boot 禁止 Bean 定义覆盖,所以导致了冲突。-->
<!--如果你的项目是基于响应式编程的,建议只保留 spring-boot-starter-webflux 依赖,
移除可能引入 Spring Web MVC 相关配置的其他依赖;
如果项目是基于传统的 Servlet 模型,则移除 spring-boot-starter-webflux 依赖。-->
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>-->
<!-- Spring Boot WebFlux + R2DBC 核心依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 响应式数据库(如 R2DBC) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<!-- R2DBC MySQL 驱动(推荐使用最新稳定版) -->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version> <!-- 或更高版本 -->
</dependency>
<!-- 可选:R2DBC 连接池(提升性能) -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
</dependency>
<!-- 测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.oyjp.v2.SpringbootWebfluxApplication</mainClass>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
编写配置文件appliction.yml
# 应用服务 WEB 访问端口
server:
port: 8080
spring:
r2dbc:
url: r2dbc:mysql://localhost:3306/test?serverZoneId=GMT%2B8&characterEncoding=UTF-8
username: root
password: root
# 可选:启用连接池
pool:
enabled: true
initial-size: 5
max-size: 20
# 日志配置
logging:
level:
root: info
reactor:
core:
publisher: debug
netty:
http:
server: debug
2. 核心代码模板
2.1 启动类
package com.oyjp.v2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringbootWebfluxApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootWebfluxApplication.class, args);
}
}
2.2 配置类
熟悉Spring Data JPA的同学应该很轻车熟路了。
package com.oyjp.v2.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.reactive.function.client.WebClient;
/**
* WebFlux 全局配置类,用于定制 JSON 序列化/反序列化行为及 WebClient 客户端。
* @see WebFluxConfigurer WebFlux 配置接口
* @see ObjectMapper Jackson JSON 处理器
*/
@Configuration
@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer {
/**
* 配置 JSON 编码器(序列化),使用自定义的 {@link ObjectMapper}。
* @param objectMapper 自定义的 Jackson {@link ObjectMapper} 实例
* @return {@link Jackson2JsonEncoder} 编码器实例
*/
@Bean
public Jackson2JsonEncoder jackson2JsonEncoder(ObjectMapper objectMapper) {
return new Jackson2JsonEncoder(objectMapper);
}
/**
* 配置 JSON 解码器(反序列化),使用自定义的 {@link ObjectMapper}。
*
* @param objectMapper 自定义的 Jackson {@link ObjectMapper} 实例
* @return {@link Jackson2JsonDecoder} 解码器实例
*/
@Bean
public Jackson2JsonDecoder jackson2JsonDecoder(ObjectMapper objectMapper) {
return new Jackson2JsonDecoder(objectMapper);
}
/**
* 配置 HTTP 消息编解码器,覆盖默认的 JSON 编解码逻辑。
* <p>通过 {@link ServerCodecConfigurer} 注册自定义的编解码器,确保全局生效。</p>
*
* @param configurer Spring WebFlux 提供的编解码配置器
*/
@Override
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
configurer.defaultCodecs().jackson2JsonEncoder(jackson2JsonEncoder(objectMapper()));
configurer.defaultCodecs().jackson2JsonDecoder(jackson2JsonDecoder(objectMapper()));
}
/**
* 自定义 {@link ObjectMapper},用于 JSON 序列化/反序列化。
* <p>配置项:</p>
* <ul>
* <li>注册 {@link JavaTimeModule} 支持 Java 8 日期时间 API(如 {@code LocalDateTime})</li>
* <li>禁用 {@link SerializationFeature#WRITE_DATES_AS_TIMESTAMPS},强制使用 ISO-8601 格式</li>
* </ul>
*
* @return 配置完成的 {@link ObjectMapper} 实例
*/
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule()); // 支持 Java 8 日期时间
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); // 禁用时间戳输出
return mapper;
}
/**
* 初始化响应式 {@link WebClient} 客户端,默认指向 "http://api.example.com"。
* <p>可通过 {@code WebClient.Builder} 进一步扩展配置(如超时、拦截器等)。</p>
*
* @return 预配置的 {@link WebClient} 实例
*/
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("http://api.example.com") // 默认基础 URL
.build();
}
}
2.3 模型类
实体类 (示例)
package com.oyjp.v2.model.entity;
import com.oyjp.v2.model.dto.UserDto;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
/**
* 实体类 (示例)
CREATE TABLE `users` (
`id` bigint NOT NULL AUTO_INCREMENT,
`username` varchar(255) DEFAULT NULL,
`email` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `users` (`username`, `email`) VALUES
('john_doe', 'john@example.com'),
('jane_smith', 'jane@example.com'),
('alice_wonder', 'alice@example.org'),
('bob_builder', 'bob@construction.com'),
('charlie_brown', 'charlie@peanuts.com'),
('dave_dev', 'dave@code.dev'),
('emma_editor', 'emma@content.io'),
('frank_finance', 'frank@money.com'),
('grace_designer', 'grace@art.design'),
('henry_hr', 'henry@company.com');
SELECT * FROM `users`;
*/
@Table("users")
@Data
public class User {
@Id
private Long id;
private String username;
private String email;
}
DTO 类 (示例)
package com.oyjp.v2.model.dto;
import com.oyjp.v2.model.entity.User;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author JianPeng OuYang
* @version v1.0
* @date 2025/9/8 15:14
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserDto {
private Long id;
private String username;
private String email;
// 构造器、getter和setter省略
// 实际项目中建议使用Lombok
public static UserDto convertToDto(User user) {
return new UserDto(user.getId(),user.getUsername(), user.getEmail());
}
public static User convertToEntity(UserDto dto) {
User user = new User();
user.setUsername(dto.getUsername());
user.setEmail(dto.getEmail());
return user;
}
}
2.4 仓库层(响应式)
声明 CRUD 接口
上面实体类中的@Table注解是有说法的,当我们的操作接口继承的是ReactiveCrudRepository<T, ID> 或者ReactiveSortingRepository<T, ID>时,需要在实体类上使用@Table注解,这也是推荐的用法。
/**
* 用户数据访问接口,提供基于 Reactive Streams 的数据库操作。
* 继承 Spring Data R2DBC 的基础仓库功能,并扩展自定义查询方法。
*
* @version 1.0
* @see User 关联的用户实体类
* @see R2dbcRepository 基础响应式仓库接口
*/
@Repository
public interface UserRepository extends R2dbcRepository<User, Long> {
/**
* 根据用户名查询单个用户(精确匹配)。
*
* @param username 用户名,不可为 null
* @return {@link Mono<User>} 包含查询结果的响应式流,若无结果则返回空 Mono
* @throws IllegalArgumentException 如果 username 为 null
*/
Mono<User> findByUsername(String username);
/**
* 根据邮箱关键字模糊查询用户列表(包含匹配)。
*
* @param email 邮箱关键字,不可为 null
* @return {@link Flux<User>} 包含匹配结果的响应式流
* @throws IllegalArgumentException 如果 email 为 null
*/
Flux<User> findByEmailContaining(String email);
}
- 继承R2dbcRepository<T, ID>接口。然后ReactiveClientUserSortingRepository将提供一些操作数据库的方法。
2.5 服务层(响应式)
package com.oyjp.v2.service;
import com.oyjp.v2.exception.CustomTimeoutException;
import com.oyjp.v2.exception.ResourceNotFoundException;
import com.oyjp.v2.exception.TimeoutException;
import com.oyjp.v2.model.dto.UserDto;
import com.oyjp.v2.model.entity.User;
import com.oyjp.v2.repository.UserRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
/**
* 用户业务逻辑服务层,处理用户数据的 CRUD 及响应式流操作。
* 使用 {@link UserRepository} 进行数据访问,并集成事务管理。
* 包含生产级优化:背压控制、超时处理、重试机制等。
*
* @version 1.0
* @see UserRepository 数据访问层
* @see UserDto 数据传输对象
*/
@Service
@RequiredArgsConstructor
@Transactional(readOnly = true)
public class UserService {
private final UserRepository userRepository;
/**
* 获取所有用户列表(响应式流)。
* 生产级优化:
* <ul>
* <li>背压控制:{@link Flux#onBackpressureBuffer} 限制队列大小</li>
* <li>超时处理:3秒超时自动终止</li>
* <li>重试机制:指数退避重试3次</li>
* </ul>
*
* @return {@link Flux<UserDto>} 用户数据流,异常时返回错误信号
* @throws CustomTimeoutException 如果操作超时
*/
public Flux<UserDto> getAllUsers() {
return userRepository.findAll()
//生产级优化建议
.onBackpressureBuffer(100) // 限制队列大小: 使用 Flux#onBackpressureBuffer 或 Flux#onBackpressureDrop 防止消费者过载。
.timeout(Duration.ofSeconds(3)) // 3秒超时处理
.onErrorResume(TimeoutException.class, e -> Flux.error(new CustomTimeoutException("超时")))
.retryWhen(Retry.backoff(3, Duration.ofMillis(100))) // 指数退避重试
.map(UserDto::convertToDto);
}
/**
* 模拟大数据流测试(用于客户端实时接收能力验证)。返回一个延迟流(如 Flux.interval 生成的模拟数据),观察客户端是否能实时接收数据:
* 每100ms生成一个模拟用户数据,共生成50条。
*
* @return {@link Flux<UserDto>} 模拟用户数据流
*/
public Flux<UserDto> intervalGetAllUsers() {
return Flux.interval(Duration.ofMillis(100)) // 每100ms生成一个数据
.map(i -> new UserDto(i, "user" + i, "user" + i + "@example.com"))
.take(50); // 生成10条数据
}
/**
* 根据ID获取单个用户。
*
* @param id 用户ID,不可为null
* @return {@link Mono<UserDto>} 用户数据,若不存在则返回错误信号
* @throws ResourceNotFoundException 如果用户不存在
*/
public Mono<UserDto> getUserById(Long id) {
return userRepository.findById(id)
.map(UserDto::convertToDto)
.switchIfEmpty(Mono.error(new ResourceNotFoundException("User not found")));
}
/**
* 创建新用户(写操作)。
* 使用 {@link Transactional} 注解确保事务性。
*
* @param userDtoMono 包含用户数据的 {@link Mono<UserDto>},不可为null
* @return {@link Mono<UserDto>} 创建成功的用户数据
*/
@Transactional
public Mono<UserDto> createUser(Mono<UserDto> userDtoMono) {
return userDtoMono
.map(UserDto::convertToEntity)
.flatMap(userRepository::save)
.map(UserDto::convertToDto);
}
/**
* 更新用户信息(写操作)。
* 若用户不存在则返回错误信号。
*
* @param id 用户ID,不可为null
* @param userDtoMono 包含更新数据的 {@link Mono<UserDto>},不可为null
* @return {@link Mono<UserDto>} 更新后的用户数据
* @throws ResourceNotFoundException 如果用户不存在
*/
@Transactional
public Mono<UserDto> updateUser(Long id, Mono<UserDto> userDtoMono) {
return userRepository.findById(id)
.zipWith(userDtoMono)
.map(tuple -> {
User existing = tuple.getT1();
UserDto updated = tuple.getT2();
existing.setUsername(updated.getUsername());
existing.setEmail(updated.getEmail());
return existing;
})
.flatMap(userRepository::save)
.map(UserDto::convertToDto)
.switchIfEmpty(Mono.error(new ResourceNotFoundException("User not found")));
}
/**
* 删除用户(写操作)。
* 若用户不存在则静默处理。
*
* @param id 用户ID,不可为null
* @return {@link Mono<Void>} 删除完成信号
*/
@Transactional
public Mono<Object> deleteUser(Long id) {
return userRepository.deleteById(id)
.then(Mono.defer(() ->
Mono.error(new ResourceNotFoundException("User not found"))
))
.onErrorResume(ResourceNotFoundException.class, e -> Mono.empty());
}
}
2.6 控制器层(响应式:注解式类似 Spring MVC)
package com.oyjp.v2.controller;
import com.oyjp.v2.exception.ResourceNotFoundException;
import com.oyjp.v2.model.dto.UserDto;
import com.oyjp.v2.service.UserService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* 用户管理 RESTful API 控制器,提供用户数据的增删改查(CRUD)操作。
* 基于响应式编程模型,返回 {@link Flux} 或 {@link Mono} 类型的数据流。
*
* @see UserService 业务逻辑层
* @see UserDto 数据传输对象
*/
@RestController
@RequestMapping("/api/users")
@RequiredArgsConstructor
public class UserController {
private final UserService userService;
/**
* 获取所有用户列表(响应式流)。
*
* @return {@link Flux<UserDto>} 用户数据流
*/
@GetMapping
public Flux<UserDto> getAllUsers() {
return userService.getAllUsers();
}
/**
* 模拟大数据流测试(用于客户端实时接收能力验证)。
* 每100ms生成一个模拟用户数据,共生成50条。
*
* @return {@link Flux<UserDto>} 模拟用户数据流
*/
@GetMapping(value = "/interval", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<UserDto> intervalGetAllUsers() {
return userService.intervalGetAllUsers();
}
/**
* 根据ID获取单个用户。
*
* @param id 用户ID,从路径变量中获取
* @return {@link Mono<UserDto>} 用户数据,若不存在则返回错误信号
* @throws ResourceNotFoundException 如果用户不存在(由服务层抛出)
*/
@GetMapping("/{id}")
public Mono<UserDto> getUserById(@PathVariable Long id) {
return userService.getUserById(id);
}
/**
* 创建新用户。
*
* @param userDto 包含用户数据的 {@link Mono<UserDto>},从请求体中获取
* @return {@link Mono<UserDto>} 创建成功的用户数据
* @responseStatus 201 Created - 创建成功
*/
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<UserDto> createUser(@RequestBody Mono<UserDto> userDto) {
return userService.createUser(userDto);
}
/**
* 更新用户信息。
* 若用户不存在则返回错误信号。
*
* @param id 用户ID,从路径变量中获取
* @param userDto 包含更新数据的 {@link Mono<UserDto>},从请求体中获取
* @return {@link Mono<UserDto>} 更新后的用户数据
* @throws ResourceNotFoundException 如果用户不存在(由服务层抛出)
*/
@PutMapping("/{id}")
public Mono<UserDto> updateUser(@PathVariable Long id, @RequestBody Mono<UserDto> userDto) {
return userService.updateUser(id, userDto);
}
/**
* 删除用户。
* 若用户不存在则静默处理。
*
* @param id 用户ID,从路径变量中获取
* @return {@link Mono<Void>} 删除完成信号
* @responseStatus 204 No Content - 删除成功
*/
@DeleteMapping("/{id}")
@ResponseStatus(HttpStatus.NO_CONTENT)
public Mono<Object> deleteUser(@PathVariable Long id) {
return userService.deleteUser(id);
}
}
2.7.全局异常处理(响应式)
自定义异常
package com.oyjp.v2.exception;
/**
* @author JianPeng OuYang
* @version v1.0
* @date 2025/9/8 15:55
*/
public class CustomTimeoutException extends RuntimeException {
public CustomTimeoutException(String message) {
super(message);
}
}
package com.oyjp.v2.exception;
public class ResourceNotFoundException extends RuntimeException {
public ResourceNotFoundException(String message) {
super(message);
}
}
package com.oyjp.v2.exception;
/**
* @author JianPeng OuYang
* @version v1.0
* @date 2025/9/8 15:55
*/
public class TimeoutException extends RuntimeException {
public TimeoutException(String message) {
super(message);
}
}
编写全局异常处理aop切面
package com.oyjp.v2.exception;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import reactor.core.publisher.Mono;
/**
* 全局异常处理器,用于统一处理应用程序中的异常
*/
@ControllerAdvice // 启用后会对所有Controller的异常进行统一处理
public class GlobalExceptionHandler {
/**
* 处理资源未找到异常(自定义业务异常)
*
* @param ex 资源未找到异常对象
* @return 包含异常消息的响应体(404 Not Found)
*/
@ExceptionHandler(ResourceNotFoundException.class)
public Mono<ResponseEntity<String>> handleResourceNotFound(ResourceNotFoundException ex) {
// 返回404响应,包含异常消息
return Mono.just(ResponseEntity.status(HttpStatus.NOT_FOUND).body(ex.getMessage()));
}
/**
* 处理未捕获的服务器内部异常
*
* @param ex 通用异常对象
* @return 通用错误消息的响应体(500 Internal Server Error)
*/
@ExceptionHandler(Exception.class)
public Mono<ResponseEntity<String>> handleInternalServerError(Exception ex) {
// 返回500响应,隐藏具体异常细节(生产环境建议)
return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Internal Server Error"));
}
}
- 所有方法产生异常返回Mono<ResponseEntity>,适配Spring WebFlux的响应式特性
- 使用Mono.just()将同步结果包装为响应式流
2.8 函数式端点(Router Function) (另外一种控制器实现风格-可选)
package com.oyjp.v2.handler;
import com.oyjp.v2.exception.ResourceNotFoundException;
import com.oyjp.v2.model.dto.UserDto;
import com.oyjp.v2.service.UserService;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
/**
* 用户业务处理器,处理与用户相关的HTTP请求
* 使用函数式端点(Functional Endpoints)方式实现
* @see com.oyjp.v2.config.RouterConfig
*/
@Component // 声明为Spring组件,由Spring容器管理生命周期
public class UserHandler {
private final UserService userService; // 用户业务服务接口
/**
* 构造函数注入UserService依赖
* @param userService 用户业务服务实现
*/
public UserHandler(UserService userService) {
this.userService = userService;
}
/**
* 处理获取所有用户的请求
* @param request HTTP请求对象
* @return 包含用户列表的响应体(200 OK)
*/
public Mono<ServerResponse> getAllUsers(ServerRequest request) {
// 1. 调用服务层获取所有用户(返回Flux<UserDto>)
// 2. 将结果包装为成功的响应(200 OK)
// 3. 指定响应体类型为UserDto(用于内容协商)
return ServerResponse.ok()
.body(userService.getAllUsers(), UserDto.class);
}
/**
* 处理根据ID获取单个用户的请求
* @param request HTTP请求对象(包含路径变量id)
* @return 成功时返回用户数据(200 OK),未找到时返回404
*/
public Mono<ServerResponse> getUserById(ServerRequest request) {
// 1. 从路径变量中解析用户ID
Long id = Long.parseLong(request.pathVariable("id"));
// 2. 调用服务层获取指定ID的用户(返回Mono<UserDto>)
return userService.getUserById(id)
// 3. 如果成功获取用户,包装为成功响应
.flatMap(user -> ServerResponse.ok()
.bodyValue(user))
// 4. 如果发生资源未找到异常,返回404响应
.onErrorResume(ResourceNotFoundException.class,
ex -> ServerResponse.notFound().build());
}
}
2.9 函数式端点路由配置
package com.oyjp.v2.config;
import com.oyjp.v2.handler.UserHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
/**
* 路由配置类,定义函数式端点的路由规则
*/
@Configuration
public class RouterConfig {
/**
* 定义用户{@link com.oyjp.v2.handler.UserHandler}相关的路由规则
* @param userHandler 自动注入的用户业务处理器
* @return 组合的路由函数
*/
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler userHandler) {
// 使用函数式路由构建器
return RouterFunctions
// 定义第一个路由:GET /api/func/users -> 调用userHandler的getAllUsers方法
.route(GET("/api/func/users"), userHandler::getAllUsers)
// 追加第二个路由:GET /api/func/users/{id} -> 调用userHandler的getUserById方法
.andRoute(GET("/api/func/users/{id}"), userHandler::getUserById);
// 可以继续追加更多路由:.andRoute(...)
}
}
路由构建:
- 使用RouterFunctions.route()创建单个路由
- 使用.andRoute()链式调用追加更多路由
每个路由包含:
- HTTP方法+路径匹配(如GET(“/api/func/users”))
- 对应的处理器方法引用(如userHandler::getAllUsers)
这种函数式路由定义方式是Spring WebFlux提供的声明式API,相比传统的@Controller注解方式,它提供了更灵活的路由组合能力。
3. 测试类
3.1 控制器测试
package com.oyjp.v2.controller;
import com.oyjp.v2.model.dto.UserDto;
import com.oyjp.v2.service.UserService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.http.HttpStatus;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class UserControllerTest {
@Mock
private UserService userService;
@InjectMocks
private UserController userController;
@Test
void getAllUsers_shouldReturnFluxOfUsers() {
// Arrange
UserDto user1 = new UserDto(1L, "user1", "user1@example.com");
UserDto user2 = new UserDto(2L, "user2", "user2@example.com");
when(userService.getAllUsers()).thenReturn(Flux.just(user1, user2));
// Act & Assert
StepVerifier.create(userController.getAllUsers())
.expectNext(user1)
.expectNext(user2)
.verifyComplete();
}
@Test
void getUserById_whenUserExists_shouldReturnUser() {
// Arrange
Long userId = 1L;
UserDto expectedUser = new UserDto(1L, "user1", "user1@example.com");
when(userService.getUserById(userId)).thenReturn(Mono.just(expectedUser));
// Act & Assert
StepVerifier.create(userController.getUserById(userId))
.expectNext(expectedUser)
.verifyComplete();
}
@Test
void createUser_shouldReturnCreatedUser() {
// Arrange
UserDto inputDto = new UserDto(1L, "newuser", "new@example.com");
UserDto savedDto = new UserDto(2L, "newuser", "new@example.com");
when(userService.createUser(any(Mono.class))).thenReturn(Mono.just(savedDto));
// Act & Assert
StepVerifier.create(userController.createUser(Mono.just(inputDto)))
.expectNext(savedDto)
.verifyComplete();
}
@Test
void testStreamWithBackpressure() {
Flux<UserDto> flux = userController.getAllUsers();
StepVerifier.create(flux)
.expectSubscription()
.thenRequest(1) // 每次只请求1个元素
.expectNextCount(1)
.thenRequest(2)
.expectNextCount(2)
.thenCancel()
.verify();
}
}
3.2 服务层测试
package com.oyjp.v2.service;
import com.oyjp.v2.exception.ResourceNotFoundException;
import com.oyjp.v2.model.dto.UserDto;
import com.oyjp.v2.model.entity.User;
import com.oyjp.v2.repository.UserRepository;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static org.mockito.Mockito.*;
/**
* 测试类:验证 {@link UserService} 的业务逻辑。
* <p>使用 Mockito 模拟 {@link UserRepository},通过 {@link StepVerifier} 测试响应式流。</p>
*/
@ExtendWith(MockitoExtension.class)
class UserServiceTest {
@Mock
private UserRepository userRepository; // 模拟数据库访问层
@InjectMocks
private UserService userService; // 待测试的服务层
/**
* 测试获取所有用户的场景。
* <p>验证:返回的 {@link Flux} 包含预期的用户数据,并正确转换为 {@link UserDto}。</p>
*/
@Test
void getAllUsers_shouldReturnFluxOfUsers() {
// Arrange: 模拟数据库返回的用户实体
User user1 = new User();
user1.setUsername("user1");
user1.setEmail("user1@example.com");
User user2 = new User();
user2.setUsername("user2");
user2.setEmail("user2@example.com");
// 模拟 userRepository.findAll() 返回两个用户
when(userRepository.findAll()).thenReturn(Flux.just(user1, user2));
// Act: 调用服务层方法
Flux<UserDto> result = userService.getAllUsers();
// Assert: 验证返回的流是否符合预期
StepVerifier.create(result).expectNextMatches(dto -> "user1".equals(dto.getUsername())) // 验证第一个用户的用户名
.expectNextMatches(dto -> "user2".equals(dto.getUsername())) // 验证第二个用户的用户名
.verifyComplete(); // 验证流正常结束
}
/**
* 测试根据 ID 获取用户的场景(用户存在)。
* <p>验证:返回的 {@link Mono} 包含正确的 {@link UserDto} 数据。</p>
*/
@Test
void getUserById_whenUserExists_shouldReturnUser() {
// Arrange: 模拟数据库返回的用户实体
Long userId = 1L;
User user = new User();
user.setId(userId);
user.setUsername("existing");
user.setEmail("existing@example.com");
// 模拟 userRepository.findById() 返回该用户
when(userRepository.findById(userId)).thenReturn(Mono.just(user));
// Act: 调用服务层方法
Mono<UserDto> result = userService.getUserById(userId);
// Assert: 验证返回的 Mono 是否符合预期
StepVerifier.create(result).expectNextMatches(dto -> "existing".equals(dto.getUsername())) // 验证用户名匹配
.verifyComplete(); // 验证流正常结束
}
/**
* 测试根据 ID 获取用户的场景(用户不存在)。
* <p>验证:抛出 {@link ResourceNotFoundException} 异常。</p>
*/
@Test
void getUserById_whenUserNotExists_shouldThrowException() {
// Arrange: 模拟用户不存在的情况
Long nonExistingId = 99L;
when(userRepository.findById(nonExistingId)).thenReturn(Mono.empty());
// Act: 调用服务层方法
Mono<UserDto> result = userService.getUserById(nonExistingId);
// Assert: 验证是否抛出预期的异常
StepVerifier.create(result).expectErrorMatches(ex -> ex instanceof ResourceNotFoundException && "User not found".equals(ex.getMessage())) // 验证异常类型和消息
.verify(); // 验证错误信号
}
}
这个模板提供了一个完整的、生产就绪的 Spring WebFlux 应用程序结构,遵循了响应式编程的最佳实践,并确保了高性能、可维护性和可扩展性。
通过以上步骤,可在 Spring Boot 中高效构建响应式应用,充分发挥异步非阻塞架构的优势。
四、五.总结
WebFlux 不是 Spring MVC 的“替代品”,而是“补充”——它通过异步非阻塞模型解决 IO 密集型场景的高并发瓶颈,但需依赖响应式生态,且开发/调试成本更高。选型时需明确业务场景(IO 密集 vs CPU 密集),避免为“响应式”而盲目技术选型。