Spring AI Graph 项目技术博客
项目概述
本次Demo演示的是一个基于 Spring AI 和 Alibaba Cloud AI Graph 的智能对话系统,展示了如何使用图计算的方式构建 AI 应用流程。项目采用响应式编程和流式处理,实现了高效的 AI 对话服务。
核心架构
1. 图计算架构
使用 StateGraph
来定义 AI 处理流程,通过节点(Node)和边(Edge)的方式组织业务逻辑:
START → query → result → END
- query 节点: 负责调用大语言模型处理用户查询
- result 节点: 处理最终结果并输出
2. 流式处理
采用 Server-Sent Events (SSE) 技术实现实时流式响应,用户可以实时看到 AI 的回复过程。
3. 异步处理
使用 AsyncGenerator
和 Flux
实现异步非阻塞的数据处理,提高系统性能。
核心组件详解
DemoBoxOneGraph - 图定义配置
/**
* DemoBoxOneGraph - AI对话图计算配置类
*
* 该类负责配置和构建一个基于Spring AI的智能对话流程图。
* 使用Alibaba Cloud AI Graph框架实现节点编排和状态管理。
*
* 流程图结构:
* START → query(查询节点) → result(结果节点) → END
*
* @author xinggui
* @version 1.0
* @since 2024
*/
@Configuration
@Slf4j
public class DemoBoxOneGraph {
/**
* 注入OpenAI聊天模型,用于AI对话生成
*/
@Resource
private OpenAiChatModel openAiChatModel;
/**
* 创建并配置AI对话状态图
*
* 该方法构建一个完整的对话流程图,包含:
* 1. 查询节点:调用大语言模型处理用户输入
* 2. 结果节点:处理和输出最终结果
* 3. 状态管理:使用ReplaceStrategy策略管理状态更新
* 4. 流程可视化:生成Mermaid格式的流程图
*
* @param openAiChatModel OpenAI聊天模型实例
* @return 配置完成的状态图
* @throws GraphStateException 当图状态配置出现错误时抛出
*/
@Bean("demoBoxOnesGraph")
public StateGraph stateGraph(OpenAiChatModel openAiChatModel) throws GraphStateException {
// 构建聊天客户端,添加日志记录器
ChatClient chatClient = ChatClient.builder(openAiChatModel)
.defaultAdvisors(new SimpleLoggerAdvisor())
.build();
// 创建状态工厂,定义状态管理策略
OverAllStateFactory factory = () -> {
OverAllState state = new OverAllState();
// 注册查询状态,使用替换策略(新值覆盖旧值)
state.registerKeyAndStrategy("query", new ReplaceStrategy());
// 注册结果状态,使用替换策略
state.registerKeyAndStrategy("result", new ReplaceStrategy());
return state;
};
// 构建状态图,定义节点和边的关系
StateGraph stateGraph = new StateGraph("demoBoxOne", factory)
// 添加查询节点,使用异步执行方式
.addNode("query", AsyncNodeAction.node_async(new DemoBoxOneNode(chatClient)))
// 添加结果节点,使用异步执行方式
.addNode("result", AsyncNodeAction.node_async(new DemoBoxOneNode.DemoBoxResultNode()))
// 定义流程边:START → query → result → END
.addEdge(StateGraph.START, "query") // 从开始到查询节点
.addEdge("query", "result") // 从查询节点到结果节点
.addEdge("result", StateGraph.END); // 从结果节点到结束
// 生成并打印PlantUML格式的流程图,便于开发和调试
GraphRepresentation representation = stateGraph.getGraph(GraphRepresentation.Type.MERMAID,
"expander flow");
log.info("\n=== expander UML Flow ===");
log.info(representation.content());
log.info("==================================\n");
return stateGraph;
}
}
关键特性:
- 状态管理策略:使用
ReplaceStrategy
确保状态正确更新 - 异步节点:所有节点都配置为异步执行
- 流程可视化:自动生成 Mermaid 格式的流程图
DemoBoxOneNode - 业务逻辑节点
/**
* DemoBoxOneNode - AI对话处理节点
*
* 该类实现了图计算中的核心业务逻辑节点,负责:
* 1. 调用大语言模型处理用户查询
* 2. 生成流式响应
* 3. 管理节点间的状态传递
*
* 包含两个内部类:
* - DemoBoxOneNode: 主要的查询处理节点
* - DemoBoxResultNode: 结果输出节点
*
* @author xinggui
* @version 1.0
* @since 2024
*/
@Slf4j
public class DemoBoxOneNode implements NodeAction {
/**
* 聊天客户端,用于与大语言模型进行交互
*/
private final ChatClient chatClient;
/**
* 构造函数,注入聊天客户端
*
* @param chatClient 配置好的聊天客户端实例
*/
public DemoBoxOneNode(ChatClient chatClient) {
this.chatClient = chatClient;
}
/**
* 执行节点的主要业务逻辑
*
* 该方法实现了以下功能:
* 1. 从状态中获取用户查询内容
* 2. 构建系统提示词和用户查询
* 3. 调用大语言模型生成流式响应
* 4. 将响应转换为异步生成器
* 5. 返回包含生成器的状态映射
*
* @param state 当前图的状态对象,包含所有节点的共享状态
* @return 包含异步生成器的状态映射
* @throws Exception 当处理过程中出现错误时抛出
*/
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 从状态中获取用户查询,如果没有则使用空字符串
String query = (String) state.value("query").orElse("");
// 构建流式聊天请求
// 系统提示词:定义AI助手的角色和行为
// 用户查询:将用户问题传递给AI模型
Flux<ChatResponse> chatResponseFlux = chatClient.prompt()
.system("你是一个Java架构师,主要回答一些Java架构设计选型技术方面的事情。请接下来保持这样方式来回答客户")
.user((user) -> user.text("请回答用户的问题:").param("query", query))
.stream()
.chatResponse();
// 构建流式聊天生成器
// 该生成器负责将聊天响应转换为节点输出
AsyncGenerator<? extends NodeOutput> generator = StreamingChatGenerator.builder()
.startingNode("data") // 设置起始节点名称
.startingState(state) // 设置起始状态
.mapResult(chatResponse -> { // 映射聊天响应到节点输出
// 提取AI回复的文本内容
String text = chatResponse.getResult().getOutput().getText();
// 将文本内容包装在context字段中
return Map.of("context", text);
})
.buildWithChatResponse(chatResponseFlux); // 使用聊天响应流构建生成器
// 返回包含生成器的状态映射
// 其他节点可以通过"data"键访问这个生成器
return Map.of("data", generator);
}
/**
* DemoBoxResultNode - 结果输出节点
*
* 该内部类负责处理最终的结果输出,主要功能:
* 1. 从状态中提取处理结果
* 2. 记录日志信息
* 3. 返回最终结果状态
*/
static class DemoBoxResultNode implements NodeAction {
/**
* 执行结果节点的业务逻辑
*
* @param state 当前图的状态对象
* @return 包含最终结果的状态映射
* @throws Exception 当处理过程中出现错误时抛出
*/
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
// 从状态中获取结果内容
String result = (String) state.value("result").orElse("");
// 记录最终结果到日志
log.info("最终的结果result是: {}", result);
// 返回包含结果的状态映射
return Map.of("result", result);
}
}
}
核心功能:
- 大模型调用:集成 OpenAI 模型进行智能对话
- 流式生成:支持流式响应,实时返回结果
- 状态传递:在节点间传递处理状态和数据
StreamUtils - 流式处理工具
/**
* StreamUtils - 流式处理工具类
*
* 该类负责将异步生成器的流式输出转换为Server-Sent Events (SSE)格式,
* 实现实时流式响应,支持客户端实时接收AI对话的生成过程。
*
* 主要功能:
* 1. 异步处理流式输出
* 2. 转换为SSE事件流
* 3. 过滤和格式化输出内容
* 4. 错误处理和资源管理
*
* @author xinggui
* @version 1.0
* @since 2024
*/
@Slf4j
public class StreamUtils {
/**
* 单线程执行器,用于处理异步流式输出
* 使用单线程确保事件处理的顺序性和一致性
*/
private static final ExecutorService executor = Executors.newSingleThreadExecutor();
/**
* 处理异步生成器的流式输出,转换为SSE事件流
*
* 该方法实现了以下核心功能:
* 1. 异步处理AsyncGenerator的输出
* 2. 区分不同类型的节点输出(流式输出和普通输出)
* 3. 过滤掉不需要的中间节点事件
* 4. 将输出转换为JSON格式的SSE事件
* 5. 优雅处理完成和异常情况
*
* @param resultFuture 异步生成器,包含图计算的流式输出
* @param sink SSE事件接收器,用于向客户端发送事件流
*/
public static void processStream(AsyncGenerator<NodeOutput> resultFuture, Sinks.Many<ServerSentEvent<String>> sink) {
// 提交任务到执行器,异步处理流式输出
executor.submit(() -> {
// 遍历异步生成器的所有输出
resultFuture.forEachAsync(output -> {
try {
// 获取当前输出节点的名称
String nodeName = output.node();
String content;
// 判断输出类型,处理流式输出和普通输出
if (output instanceof StreamingOutput streamingOutput) {
// 处理流式输出(如聊天响应)
// 提取聊天响应的元数据信息
ChatResponse chatResponse = streamingOutput.chatResponse();
// 将元数据转换为JSON格式
content = JSON.toJSONString(Map.of(nodeName, chatResponse.getMetadata()));
} else {
// 处理普通节点输出
JSONObject nodeOutput = new JSONObject();
// 提取节点状态数据
nodeOutput.put("data", output.state().data());
// 记录节点名称
nodeOutput.put("node", nodeName);
// 转换为JSON字符串
content = JSON.toJSONString(nodeOutput);
}
// 过滤掉"query"节点的事件,避免发送不必要的中间状态
// 只发送有意义的输出事件给客户端
if (!nodeName.equalsIgnoreCase("query")) {
// 创建SSE事件并发送给客户端
sink.tryEmitNext(ServerSentEvent.builder(content).build());
}
// 当节点名是"query"时,直接跳过,不发送任何SSE事件
} catch (Exception e) {
// 记录处理过程中的错误
log.error("error", e);
}
}).thenAccept(v -> {
// 所有输出处理完成后,发送完成事件
sink.tryEmitComplete();
}).exceptionally(e -> {
// 处理过程中出现异常时的错误处理
log.error("error", e);
return null;
});
});
}
}
主要职责:
- 流式转换:将
AsyncGenerator
转换为 SSE 事件流 - 事件过滤:过滤掉不需要的中间节点事件
- 错误处理:优雅处理流式处理中的异常情况
技术亮点
1. 响应式编程
- 使用 Spring WebFlux 和 Project Reactor
- 非阻塞 I/O 处理
- 背压控制机制
2. 流式 AI 响应
- 实时流式输出
- 支持长对话场景
- 客户端实时接收响应
3. 图计算模式
- 清晰的数据流向
- 易于扩展和维护
- 支持复杂的业务逻辑编排
使用方法
1. 调用接口
- 流式调用:
/code6
(返回 SSE 流)
总结
Spring AI Graph 这个流式响应的Demo展示了AI 应用开发的最佳实践,通过图计算、流式处理和响应式编程的结合,构建了高性能、可扩展的智能对话系统。这种架构模式特别适合需要复杂业务逻辑编排的 AI 应用场景。