前言:本文聚焦 SSE 与 Flux 组合,解析二者如何协作:Flux 处理后端 AI 流式输出,SSE 将片段推向前端,实现 AI 内容逐段实时展示,为流式交互提供高效解决方案。
流式对话的核心需求
流式对话(如 ChatGPT、豆包等 AI 交互)的核心诉求是 实时、流畅、低延迟地展示动态生成的内容”,具体可拆解为以下需求点:
单向持续推送
流式对话的核心数据流向是 "服务器→客户端":用户发送一次提问后,AI 模型在后端逐步生成回答(通常是逐字 / 逐句生成),需要实时将中间结果推送给客户端,而客户端在此过程中无需向服务器发送额外数据,仅需等待结果即可。低延迟的片段化传输
AI 生成回答时,实时返回已生成题目给前端,从而给用户及时的回答,而不是让前端请求一直阻塞等待,最后一起返回。
而SSE与Flux结合恰好针对性地满足了上述需求:
Flux 作为 Java 响应式编程的核心类,负责后端流式数据处理 —— 以异步数据流形式封装 AI 逐字生成的内容,支持片段化数据的逐段发射,完美适配 AI “边生成边输出” 的逻辑,是后端处理流式数据的基础工具。
SSE 则专注于网络传输层面的单向推送 —— 基于 HTTP 长连接,客户端一次连接后,服务器可持续推送数据,与 AI 对话中 “后端→客户端” 的单向数据流完全匹配,高效实现实时传输。
两者配合,Flux 在后端接收并处理 AI 生成的片段,SSE 通过长连接将这些片段实时推送给前端,最终实现 “逐字显示” 的流畅体验,在实时性与资源消耗间达到平衡,是流式对话的理想选择。
Flux技术
Flux 是 Java 响应式编程库(如 Project Reactor)中的核心类,专门用于处理异步、流式的数据序列,可以理解为 “能动态产生多个数据元素的数据流容器”。它的设计非常适合需要逐段生成、实时处理数据的场景(如 AI 流式输出、实时日志等)
在 AI 流式场景中的作用
以 AI 对话为例:
当调用 AI 模型生成回答时,模型并非一次性输出完整内容,而是逐字 / 逐句计算(类似人 “边想边说”)。这些实时生成的片段(如 “你”“好”)会被依次 “发射” 到 Flux 中,形成一个持续的数据流。后端可通过 Flux 的 map
操作将片段转换为 SSE 格式,再推送给前端,实现 “逐字显示” 的效果。
简单说,Flux 是后端 “接住” 并处理 AI 流式输出的 “传送带”,让数据能按生成顺序实时流动、加工,为后续的传输(如通过 SSE 推给前端)提供基础。
我们可以对 Flux 对象进行下列操作:
SSE 技术
基本概念
服务器发送事件(Server-Sent Events)是一种用于从服务器到客户端的 单向、实时 数据传输技术,基于 HTTP协议实现。
它有几个重要的特点:
- 单向通信:SSE 只支持服务器向客户端的单向通信,客户端不能向服务器发送数据。
- 文本格式:SSE 使用 纯文本格式 传输数据,使用 HTTP 响应的
text/event-stream
MIME 类型。 - 保持连接:SSE 通过保持一个持久的 HTTP 连接,实现服务器向客户端推送更新,而不需要客户端频繁轮询。
- 自动重连:如果连接中断,浏览器会自动尝试重新连接,确保数据流的连续性。
SSE 数据格式
SSE 数据流的格式非常简单,使用 event
指定事件名称,用于区分不同类型的消息。每个事件使用 data
字段,作为消息主体,事件以两个换行符结束。还可以使用 id
字段来标识事件,并且 retry
字段可以设置重新连接的时间间隔。
event: 事件名\n // 可选,用于区分不同类型的消息
data: 消息内容\n // 必选,消息主体(可多行,每行以 data: 开头)
id: 消息ID\n // 可选,用于客户端记录最后接收的消息ID(重连时可通过 Last-Event-ID 头传递)
retry: 重连时间(毫秒)\n // 可选,指定客户端重连间隔
\n // 空行表示一条消息结束
示例格式如下:
data: Third message\n
id: 3\n
\n
retry: 10000\n
data: Fourth message\n
\n
SSE 与 Flux 的结合实现AI输出流式对话
后端代码
@GetMapping(value = "/chat/gen/code", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatToGenCode(@RequestParam Long appId,
@RequestParam String message,
HttpServletRequest request) {
// 参数校验
ThrowUtils.throwIf(appId == null || appId <= 0, ErrorCode.PARAMS_ERROR, "应用ID无效");
ThrowUtils.throwIf(StrUtil.isBlank(message), ErrorCode.PARAMS_ERROR, "用户消息不能为空");
// 获取当前登录用户
User loginUser = userService.getLoginUser(request);
// 调用服务生成代码(流式)
Flux<String> contentFlux = appService.chatToGenCode(appId, message, loginUser);
// 转换为 ServerSentEvent 格式
return contentFlux
.map(chunk -> {
// 将内容包装成JSON对象
Map<String, String> wrapper = Map.of("d", chunk);
String jsonData = JSONUtil.toJsonStr(wrapper);
return ServerSentEvent.<String>builder()
.data(jsonData)
.build();
});
}
后端用 Flux 处理 AI 流式输出,通过 SSE 协议推送
重难点解析:
1 @GetMapping(value = "/chat/gen/code", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
produces = MediaType.TEXT_EVENT_STREAM_VALUE
:声明接口返回的数据类型为 text/event-stream
(SSE 协议的标准媒体类型),告诉浏览器:这是一个流式响应,需要保持连接并持续接收数据,而非一次性响应。
2 public Flux<ServerSentEvent<String>> chatToGenCode(...){}
将 AI 生成的流式代码片段,包装成符合 SSE协议的格式,持续推送给前端
设计原因:
- 如果只返回
Flux<String>
:数据是原始字符串,不符合 SSE 协议格式,前端EventSource
无法解析,会认为是无效数据。 - 如果返回单个
ServerSentEvent
:只能推送一次数据,无法实现 “持续流式输出”(失去了流式的核心意义)。 - 只有
Flux<ServerSentEvent<String>>
能同时满足:“持续发射数据”(Flux 的能力) 和 “数据符合 SSE 协议”(ServerSentEvent 的作用),从而实现前端实时接收流式数据的需求。
3 Flux<String> contentFlux = appService.chatToGenCode(appId, message, loginUser);
通过 Flux
实现 "边生成边返回",而非等待 AI 生成完整代码后一次性返回
4 chunk -> {
// 将内容包装成JSON对象
Map<String, String> wrapper = Map.of("d", chunk);
String jsonData = JSONUtil.toJsonStr(wrapper);
chunk
是 AI 生成的单个代码片段(如<div class="login">
)。- 用
Map.of("d", chunk)
将片段包装成{"d": "片段内容"}
的Map结构,再将片段转为 JSON 字符串(键d
可自定义,需与前端解析逻辑对应),方便前端统一解析。
5 return ServerSentEvent.<String>builder()
.data(jsonData)
.build();
使用ServerSentEvent
类,将 JSON 格式的代码片段包装为符合 SSE 协议的事件对象。
6 .concatWith(Mono.just(
// 发送结束事件
ServerSentEvent.<String>builder()
.event("done")
.data("")
.build()
))
在原始流的所有数据发送完毕后,额外追加一个自定义事件名为 done
的SSE事件。
前端代码
// 开始生成
isGenerating.value = true
generationProgress.value = 0
await generateCode(message, aiMessageIndex)
}
// 生成代码 - 使用 EventSource 处理流式响应
const generateCode = async (userMessage: string, aiMessageIndex: number) => {
let eventSource: EventSource | null = null
let streamCompleted = false
try {
// 获取 axios 配置的 baseURL
const baseURL = request.defaults.baseURL || API_BASE_URL
// 构建URL参数
const params = new URLSearchParams({
appId: appId.value || '',
message: userMessage,
stream: 'true',
})
const url = `${baseURL}/app/chat/gen/code?${params}`
// 创建 EventSource 连接
eventSource = new EventSource(url, {
withCredentials: true,
})
let fullContent = ''
// 处理接收到的消息
eventSource.onmessage = function (event) {
if (streamCompleted) return
try {
// 解析JSON包装的数据
const parsed = JSON.parse(event.data)
const content = parsed.d
// 拼接内容
if (content !== undefined && content !== null) {
fullContent += content
messages.value[aiMessageIndex].content = fullContent
messages.value[aiMessageIndex].loading = false
scrollToBottom()
// 更新进度
generationProgress.value = Math.min(90, generationProgress.value + 5)
}
} catch (error) {
console.error('解析消息失败:', error)
handleError(error, aiMessageIndex)
}
}
// 处理done事件
eventSource.addEventListener('done', function () {
if (streamCompleted) return
streamCompleted = true
isGenerating.value = false
generationProgress.value = 100
// 延迟更新预览,确保后端已完成处理
setTimeout(async () => {
await fetchAppInfo()
updatePreview()
}, 1000)
})
前端代码通过
EventSource
与后端 SSE 接口建立连接,通过onmessage
实时接收并解析后端推送的代码片段(与后端d
键对应),通过done
事件监听生成完成信号
重点代码解析
1 eventSource = new EventSource(url, {
withCredentials: true, // 携带 cookies(如登录凭证),与后端用户认证对应
});
创建 EventSource 连接(与后端 SSE 接口建立长连接)
2 eventSource.onmessage = function (event) {
if (streamCompleted) return;
try {
// 解析后端返回的 JSON 数据(与后端 Map.of("d", chunk) 对应)
const parsed = JSON.parse(event.data);
const content = parsed.d; // 键 "d" 与后端一致
// 拼接代码片段,实时更新界面(实现流式显示效果)
if (content !== undefined && content !== null) {
fullContent += content;
messages.value[aiMessageIndex].content = fullContent; // 更新 UI
scrollToBottom(); // 滚动到最新内容
generationProgress.value = Math.min(90, generationProgress.value + 5); // 更新进度
}
} catch (error) {
console.error('解析消息失败:', error);
handleError(error, aiMessageIndex);
}
};
处理后端推送的流式数据(与后端 map 操作生成的 SSE 事件对应)
3 eventSource.addEventListener('done', function () {
if (streamCompleted) return;
streamCompleted = true; // 标记流已完成
isGenerating.value = false; // 关闭生成状态
generationProgress.value = 100; // 进度设为 100%
// 延迟更新预览(确保后端处理完成)
setTimeout(async () => {
await fetchAppInfo();
updatePreview(); // 生成完成后执行后续操作(如预览代码)
}, 1000);
});
处理后端发送的结束事件(与后端 concatWith 中的 done 事件对应)
对应关系
角色 | 技术实现 | 作用 |
---|---|---|
后端数据处理 | Flux<String> |
接收 AI 生成的流式片段(如代码片段) |
后端传输协议 | Flux<ServerSentEvent<String>> |
将 Flux 片段包装为 SSE 格式 |
前端接收协议 | EventSource |
建立 SSE 连接,接收后端推送的流式数据 |
完整工作流程
1.后端用 Flux 处理流式数据
appService.chatToGenCode(...)
调用 AI 模型,返回Flux<String>
—— 这个 Flux 会 “逐段” 发射 AI 生成的代码(比如先返回<html>
, 再返回<body>
, 等等)。- 后端通过
map
操作,将每个代码片段chunk
包装成 SSE 协议要求的格式(ServerSentEvent
对象,包含data
字段),确保符合 SSE 数据规范。
2.后端通过 SSE 协议推送数据
- 接口标注
produces = MediaType.TEXT_EVENT_STREAM_VALUE
,告诉浏览器:这是一个 SSE 流,需要保持连接并接收持续推送。 - 每个
ServerSentEvent
会被转换为 SSE 格式的文本(如data: {"d": "<html>"}
),通过长连接推送给前端。
3.前端用 EventSource 接收并处理
- 前端创建
EventSource
连接到后端接口,建立 SSE 长连接。 - 每收到一个 SSE 消息(
eventSource.onmessage
),就解析出代码片段content
,并实时更新到页面(messages.value[aiMessageIndex].content = fullContent
),实现 “逐段显示” 的流式效果。 - 额外处理
done
事件(生成完成)和business-error
事件(业务错误),完善交互逻辑。
核心配合点
- Flux 负责 “内部流式处理”:在后端接收 AI 生成的片段,通过响应式编程实现高效的异步处理。
- SSE 负责 “外部流式传输”:后端将 Flux 中的片段包装为 SSE 格式,通过 HTTP 长连接推给前端;前端用
EventSource
接收,完成从 “后端数据” 到 “用户界面” 的实时展示。 - 两者结合,既利用了 Flux 对后端流式数据的处理能力,又通过 SSE 协议实现了前端的实时接收,最终达成 “AI 生成内容逐段显示” 的效果。
大功告成!