目录
一、基于多模型实现流式输出
1.1 什么是流式输出
流式输出(Streaming Output)是指数据在生成过程中就逐步传输给接收方,而不是等待全部处理完成后再一次性输出。这种模式具有以下优势:
- 低延迟:用户可以立即看到部分结果,无需等待全部处理完成
- 资源高效:可以更早释放部分资源,减少内存占用
- 用户体验好:渐进式的反馈让用户感知到系统正在工作
1.2 多模型引入
1. 添加相关依赖
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-starter-model-openai</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-starter-model-ollama</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-starter-model-qianfan</artifactId>
</dependency>
2. 设置配置文件
spring:
ai:
openai:
api-key: ${自定义}
base-url: https://dashscope.aliyuncs.com/compatible-mode/
chat:
options:
model: ${自定义}
ollama:
base-url: http://127.0.0.1:11434
chat:
options:
model: ${自定义}
qianfan:
api-key: ${自定义}
secret-key: ${自定义}
chat:
options:
model: ${自定义}
1.3 代码实现
1. OllamaController
@RestController
@RequestMapping("/ol")
public class OllamaController {
@Resource
private OllamaChatModel chatModel;
@GetMapping(value = "/chat",produces = "text/event-stream")
public Flux<String> chat(String prompt) {
return chatModel.stream(prompt);
}
}
调用结果如下:
2. QianFanController
@RestController
@RequestMapping("/qianfan")
public class QianFanController {
@Autowired
private QianFanChatModel chatModel;
@RequestMapping(value = "/stream",produces = "text/event-stream")
public Flux<String> chat(@RequestParam("question") String question) {
return chatModel.stream(question);
}
}
调用结果如下:
3. BaiLianController:通过对接通用的大模型平台,实现同平台的多种大模型切换
ChatClent是对ChatModel的封装
@RestController
@RequestMapping("/bl")
public class BaiLianController {
@Autowired
private ChatClient deepseekClient;
@Autowired
private ChatClient qwenClient;
@RequestMapping(value = "/chatDeepSeek",produces = "text/event-stream")
public Flux<String> chatDeepSeek(String message) {
return deepseekClient.prompt(message)
.stream()
.content();
}
@RequestMapping(value = "/chatQwen",produces = "text/event-stream")
public Flux<String> chatQwen(String message) {
return qwenClient.prompt(message)
.stream()
.content();
}
}
Flux 是 Spring WebFlux 框架中的一个核心组件,属于响应式编程模型的一部分。它主要用于处理异步、非阻塞的流式数据,能够高效地处理高并发场景。Flux 可以生成和处理一系列的事件或数据, 如流式输出等。
调用 DeepSeek 结果如下:(基于 ChatGPT4模型)
调用 通问千义 结果如下:
1.3.1 流式输出的API介绍
基于 ChatModel:
基于 ChatClient:
1.3.2 Flux 源码分析
要搞清楚这个问题,我们需要看流式输出对象 Flux 的实现源码:
查看 Flux 源码我们发现它是属于 reactor.core.publisher 包下的抽象类:
Reactor Streams 会订阅数据源,当有数据时,Reactor Streams 以分块流的方式 发送给客户端(用户)。
二、了解 Reactor 模型
Reactor 是一种事件驱动的高性能网络编程模型,主要用于处理高并发的网络 I/O 请求。其核心思想是通过一个或多个线程监听事件,并将事件分发给相应的处理程序,从而实现高效的并发处理。
Reactor 模型的主要特征如下:
事件驱动:所有 I/O 操作都由事件触发并处理。
非阻塞:操作不会因为 I/O 而挂起,避免了线程等待的开销。
高效资源利用:通过少量线程处理大量并发连接,提升性能。
组件分离:将事件监听(Reactor)、事件分发(Dispatcher)和事件处理(Handler)解耦, 使代码结构更清晰。
Reactor 实现方式有三种:
单线程 Reactor 模型:所有操作在一个线程完成,适用于低并发场景。
多线程 Reactor 模型:主线程处理连接,子线程池处理 I/O 和业务。
主从 Reactor 模型:主线程池处理连接,子线程池处理 I/O(进一步优化资源分配)。
三、SSE 协议
除了 Flux 实现之外,我们还可以通过 Server-Sent Events (SSE) 协议实现流式输出,通过单向 的 HTTP 长连接,服务端可以持续推送数据片段(如逐词或逐句)到前端。与 WebSocket 不同, SSE 是轻量级的单向通信协议,适合 AI 对话这类服务端主导的场景,它的具体实现代码如下:
@GetMapping(value = "/streamChat", produces = MediaType.TEXT_EVENT_STREAM_VALUE + ";charset=UTF-8")
public SseEmitter streamChat(@RequestParam String message) {
// 创建SSE发射器,设置超时时间(60秒)
SseEmitter emitter = new SseEmitter(60_000L);
// 创建Prompt对象
Prompt prompt = new Prompt(new UserMessage(message));
// 使用AtomicReference保存订阅以便后续取消
AtomicReference<Disposable> subscriptionRef = new AtomicReference<>();
// 订阅流式响应
subscriptionRef.set(chatModel.stream(prompt).subscribe(
response -> {
try {
// 防御性检查
if (response == null || response.getResult() == null || response.getResult().getOutput() == null) {
return;
}
String content = response.getResult().getOutput().getText();
if (content == null || content.isEmpty()) {
return;
}
// 发送SSE事件
SseEmitter.SseEventBuilder event = SseEmitter.event()
.data(content)
.id(UUID.randomUUID().toString())
.comment("Chat response chunk");
emitter.send(event);
} catch (IOException e) {
throw new RuntimeException(e);
}
},
error -> {
},
() -> {
}
));
// 处理客户端断开连接
emitter.onCompletion(() -> {
Disposable subscription = subscriptionRef.get();
if (subscription != null && !subscription.isDisposed()) {
subscription.dispose();
}
});
// 处理超时
emitter.onTimeout(() -> {
Disposable subscription = subscriptionRef.get();
if (subscription != null && !subscription.isDisposed()) {
subscription.dispose();
}
});
// 处理错误
emitter.onError((throwable) -> {
Disposable subscription = subscriptionRef.get();
if (subscription != null && !subscription.isDisposed()) {
subscription.dispose();
}
});
return emitter;
}