Mono 和Flux是响应式编程库 Reactor 的核心组件,主要用于处理异步流和响应式数据流。它们的设计基于 Reactive Streams 规范,提供了非阻塞的方式处理数据。
Mono
Mono 表示0或1个元素的数据流。它适用于返回单个值或没有值的场景,例如:
- 数据库查询的单行结果。
- HTTP 请求的单次响应。
- 异步计算的结果
特点
- 单元素流:Mono 只能发射一次数据,之后可以发射完成信号或错误信号。
- 懒加载:只有在有订阅者时才开始计算或执行流操作。
创建 Mono
import reactor.core.publisher.Mono;
public class MonoExample {
public static void main(String[] args) {
// 创建一个包含数据的 Mono
Mono<String> mono = Mono.just("Hello, Mono!");
// 空的 Mono
Mono<Object> emptyMono = Mono.empty();
// 错误的 Mono
Mono<Object> errorMono = Mono.error(new RuntimeException("Error occurred"));
// 订阅以触发流
mono.subscribe(System.out::println); // 输出: Hello, Mono!
}
}
常用操作符
- map:对数据进行变换
- flatMap:将数据转换为另一个异步流
- doOnNext:在数据发射时执行额外操作
- onErrorResume:处理错误并提供备用流
Mono.just("Reactive Programming")
.map(String::toUpperCase)
.doOnNext(System.out::println)
.onErrorResume(e -> Mono.just("Fallback"))
.subscribe();
Flux
Flux 表示 0到N个元素 的数据流。它适用于需要处理多个元素的场景,例如:
- 数据库查询的多行结果
- 消息队列中的消息流
- 实时事件流
特点
- 多元素流:Flux 可以发射多个元素,之后发射完成信号或错误信号
- 懒加载:与 Mono 一样,只有在订阅时才开始执行流操作
创建 Mono
import reactor.core.publisher.Flux;
public class FluxExample {
public static void main(String[] args) {
// 创建一个包含多个元素的 Flux
Flux<String> flux = Flux.just("A", "B", "C");
// 从数组创建 Flux
Flux<Integer> fluxFromArray = Flux.fromArray(new Integer[]{1, 2, 3});
// 生成范围的 Flux
Flux<Integer> fluxRange = Flux.range(1, 5);
// 订阅以触发流
flux.subscribe(System.out::println); // 输出: A B C
}
}
常用操作符
- filter:过滤数据
- map:对数据进行变换。
- flatMap:将每个元素映射为新的流
- buffer:将元素收集到批次中
- onErrorContinue:忽略错误并继续处理剩余数据
Flux.range(1, 10)
.filter(i -> i % 2 == 0)
.map(i -> "Number: " + i)
.subscribe(System.out::println);
Mono 与 Flux 的关系
- Mono 是 Flux 的特例:Mono 仅处理 0 或 1 个元素,而 Flux 可以处理任意数量的元素。
- 许多操作符(例如 map, filter, flatMap)在 Mono 和 Flux 中是通用的
使用场景比较
特性 | Mono | Flux |
---|---|---|
数据量 | 0或1个元素 | 0到N 个元素 |
使用场景 | 单个对象、HTTP 响应、单次计算等 | 数据列表、流式数据、事件流等 |