SpringAI Alibaba Graph 流式对话

发布于:2025-09-05 ⋅ 阅读:(18) ⋅ 点赞:(0)

Spring AI Graph 项目技术博客

项目概述

本次Demo演示的是一个基于 Spring AI 和 Alibaba Cloud AI Graph 的智能对话系统,展示了如何使用图计算的方式构建 AI 应用流程。项目采用响应式编程和流式处理,实现了高效的 AI 对话服务。

核心架构

1. 图计算架构

使用 StateGraph 来定义 AI 处理流程,通过节点(Node)和边(Edge)的方式组织业务逻辑:

START → query → result → END
  • query 节点: 负责调用大语言模型处理用户查询
  • result 节点: 处理最终结果并输出

2. 流式处理

采用 Server-Sent Events (SSE) 技术实现实时流式响应,用户可以实时看到 AI 的回复过程。

3. 异步处理

使用 AsyncGeneratorFlux 实现异步非阻塞的数据处理,提高系统性能。

核心组件详解

DemoBoxOneGraph - 图定义配置

/**
 * DemoBoxOneGraph - AI对话图计算配置类
 * 
 * 该类负责配置和构建一个基于Spring AI的智能对话流程图。
 * 使用Alibaba Cloud AI Graph框架实现节点编排和状态管理。
 * 
 * 流程图结构:
 * START → query(查询节点) → result(结果节点) → END
 * 
 * @author xinggui
 * @version 1.0
 * @since 2024
 */
@Configuration
@Slf4j
public class DemoBoxOneGraph {

    /**
     * 注入OpenAI聊天模型,用于AI对话生成
     */
    @Resource
    private OpenAiChatModel openAiChatModel;

    /**
     * 创建并配置AI对话状态图
     * 
     * 该方法构建一个完整的对话流程图,包含:
     * 1. 查询节点:调用大语言模型处理用户输入
     * 2. 结果节点:处理和输出最终结果
     * 3. 状态管理:使用ReplaceStrategy策略管理状态更新
     * 4. 流程可视化:生成Mermaid格式的流程图
     * 
     * @param openAiChatModel OpenAI聊天模型实例
     * @return 配置完成的状态图
     * @throws GraphStateException 当图状态配置出现错误时抛出
     */
    @Bean("demoBoxOnesGraph")
    public StateGraph stateGraph(OpenAiChatModel openAiChatModel) throws GraphStateException {
        // 构建聊天客户端,添加日志记录器
        ChatClient chatClient = ChatClient.builder(openAiChatModel)
                .defaultAdvisors(new SimpleLoggerAdvisor())
                .build();

        // 创建状态工厂,定义状态管理策略
        OverAllStateFactory factory = () -> {
            OverAllState state = new OverAllState();
            // 注册查询状态,使用替换策略(新值覆盖旧值)
            state.registerKeyAndStrategy("query", new ReplaceStrategy());
            // 注册结果状态,使用替换策略
            state.registerKeyAndStrategy("result", new ReplaceStrategy());
            return state;
        };

        // 构建状态图,定义节点和边的关系
        StateGraph stateGraph = new StateGraph("demoBoxOne", factory)
                // 添加查询节点,使用异步执行方式
                .addNode("query", AsyncNodeAction.node_async(new DemoBoxOneNode(chatClient)))
                // 添加结果节点,使用异步执行方式
                .addNode("result", AsyncNodeAction.node_async(new DemoBoxOneNode.DemoBoxResultNode()))

                // 定义流程边:START → query → result → END
                .addEdge(StateGraph.START, "query")      // 从开始到查询节点
                .addEdge("query", "result")              // 从查询节点到结果节点
                .addEdge("result", StateGraph.END);      // 从结果节点到结束

        // 生成并打印PlantUML格式的流程图,便于开发和调试
        GraphRepresentation representation = stateGraph.getGraph(GraphRepresentation.Type.MERMAID,
                "expander flow");
        log.info("\n=== expander UML Flow ===");
        log.info(representation.content());
        log.info("==================================\n");

        return stateGraph;
    }
}

关键特性:

  • 状态管理策略:使用 ReplaceStrategy 确保状态正确更新
  • 异步节点:所有节点都配置为异步执行
  • 流程可视化:自动生成 Mermaid 格式的流程图

DemoBoxOneNode - 业务逻辑节点

/**
 * DemoBoxOneNode - AI对话处理节点
 * 
 * 该类实现了图计算中的核心业务逻辑节点,负责:
 * 1. 调用大语言模型处理用户查询
 * 2. 生成流式响应
 * 3. 管理节点间的状态传递
 * 
 * 包含两个内部类:
 * - DemoBoxOneNode: 主要的查询处理节点
 * - DemoBoxResultNode: 结果输出节点
 * 
 * @author xinggui
 * @version 1.0
 * @since 2024
 */
@Slf4j
public class DemoBoxOneNode implements NodeAction {

    /**
     * 聊天客户端,用于与大语言模型进行交互
     */
    private final ChatClient chatClient;

    /**
     * 构造函数,注入聊天客户端
     * 
     * @param chatClient 配置好的聊天客户端实例
     */
    public DemoBoxOneNode(ChatClient chatClient) {
        this.chatClient = chatClient;
    }

    /**
     * 执行节点的主要业务逻辑
     * 
     * 该方法实现了以下功能:
     * 1. 从状态中获取用户查询内容
     * 2. 构建系统提示词和用户查询
     * 3. 调用大语言模型生成流式响应
     * 4. 将响应转换为异步生成器
     * 5. 返回包含生成器的状态映射
     * 
     * @param state 当前图的状态对象,包含所有节点的共享状态
     * @return 包含异步生成器的状态映射
     * @throws Exception 当处理过程中出现错误时抛出
     */
    @Override
    public Map<String, Object> apply(OverAllState state) throws Exception {
        // 从状态中获取用户查询,如果没有则使用空字符串
        String query = (String) state.value("query").orElse("");
        
        // 构建流式聊天请求
        // 系统提示词:定义AI助手的角色和行为
        // 用户查询:将用户问题传递给AI模型
        Flux<ChatResponse> chatResponseFlux = chatClient.prompt()
                .system("你是一个Java架构师,主要回答一些Java架构设计选型技术方面的事情。请接下来保持这样方式来回答客户")
                .user((user) -> user.text("请回答用户的问题:").param("query", query))
                .stream()
                .chatResponse();
        
        // 构建流式聊天生成器
        // 该生成器负责将聊天响应转换为节点输出
        AsyncGenerator<? extends NodeOutput> generator = StreamingChatGenerator.builder()
                .startingNode("data")                    // 设置起始节点名称
                .startingState(state)                     // 设置起始状态
                .mapResult(chatResponse -> {              // 映射聊天响应到节点输出
                    // 提取AI回复的文本内容
                    String text = chatResponse.getResult().getOutput().getText();
                    // 将文本内容包装在context字段中
                    return Map.of("context", text);
                })
                .buildWithChatResponse(chatResponseFlux); // 使用聊天响应流构建生成器

        // 返回包含生成器的状态映射
        // 其他节点可以通过"data"键访问这个生成器
        return Map.of("data", generator);
    }

    /**
     * DemoBoxResultNode - 结果输出节点
     * 
     * 该内部类负责处理最终的结果输出,主要功能:
     * 1. 从状态中提取处理结果
     * 2. 记录日志信息
     * 3. 返回最终结果状态
     */
    static class DemoBoxResultNode implements NodeAction {
        
        /**
         * 执行结果节点的业务逻辑
         * 
         * @param state 当前图的状态对象
         * @return 包含最终结果的状态映射
         * @throws Exception 当处理过程中出现错误时抛出
         */
        @Override
        public Map<String, Object> apply(OverAllState state) throws Exception {
            // 从状态中获取结果内容
            String result = (String) state.value("result").orElse("");
            
            // 记录最终结果到日志
            log.info("最终的结果result是: {}", result);
            
            // 返回包含结果的状态映射
            return Map.of("result", result);
        }
    }
}

核心功能:

  • 大模型调用:集成 OpenAI 模型进行智能对话
  • 流式生成:支持流式响应,实时返回结果
  • 状态传递:在节点间传递处理状态和数据

StreamUtils - 流式处理工具

/**
 * StreamUtils - 流式处理工具类
 * 
 * 该类负责将异步生成器的流式输出转换为Server-Sent Events (SSE)格式,
 * 实现实时流式响应,支持客户端实时接收AI对话的生成过程。
 * 
 * 主要功能:
 * 1. 异步处理流式输出
 * 2. 转换为SSE事件流
 * 3. 过滤和格式化输出内容
 * 4. 错误处理和资源管理
 * 
 * @author xinggui
 * @version 1.0
 * @since 2024
 */
@Slf4j
public class StreamUtils {

    /**
     * 单线程执行器,用于处理异步流式输出
     * 使用单线程确保事件处理的顺序性和一致性
     */
    private static final ExecutorService executor = Executors.newSingleThreadExecutor();

    /**
     * 处理异步生成器的流式输出,转换为SSE事件流
     * 
     * 该方法实现了以下核心功能:
     * 1. 异步处理AsyncGenerator的输出
     * 2. 区分不同类型的节点输出(流式输出和普通输出)
     * 3. 过滤掉不需要的中间节点事件
     * 4. 将输出转换为JSON格式的SSE事件
     * 5. 优雅处理完成和异常情况
     * 
     * @param resultFuture 异步生成器,包含图计算的流式输出
     * @param sink SSE事件接收器,用于向客户端发送事件流
     */
    public static void processStream(AsyncGenerator<NodeOutput> resultFuture, Sinks.Many<ServerSentEvent<String>> sink) {
        // 提交任务到执行器,异步处理流式输出
        executor.submit(() -> {
            // 遍历异步生成器的所有输出
            resultFuture.forEachAsync(output -> {
                try {
                    // 获取当前输出节点的名称
                    String nodeName = output.node();

                    String content;
                    // 判断输出类型,处理流式输出和普通输出
                    if (output instanceof StreamingOutput streamingOutput) {
                        // 处理流式输出(如聊天响应)
                        // 提取聊天响应的元数据信息
                        ChatResponse chatResponse = streamingOutput.chatResponse();
                        // 将元数据转换为JSON格式
                        content = JSON.toJSONString(Map.of(nodeName, chatResponse.getMetadata()));
                    } else {
                        // 处理普通节点输出
                        JSONObject nodeOutput = new JSONObject();
                        // 提取节点状态数据
                        nodeOutput.put("data", output.state().data());
                        // 记录节点名称
                        nodeOutput.put("node", nodeName);
                        // 转换为JSON字符串
                        content = JSON.toJSONString(nodeOutput);
                    }
                    
                    // 过滤掉"query"节点的事件,避免发送不必要的中间状态
                    // 只发送有意义的输出事件给客户端
                    if (!nodeName.equalsIgnoreCase("query")) {
                        // 创建SSE事件并发送给客户端
                        sink.tryEmitNext(ServerSentEvent.builder(content).build());
                    }
                    // 当节点名是"query"时,直接跳过,不发送任何SSE事件
                    
                } catch (Exception e) {
                    // 记录处理过程中的错误
                    log.error("error", e);
                }
            }).thenAccept(v -> {
                // 所有输出处理完成后,发送完成事件
                sink.tryEmitComplete();
            }).exceptionally(e -> {
                // 处理过程中出现异常时的错误处理
                log.error("error", e);
                return null;
            });
        });
    }
}

主要职责:

  • 流式转换:将 AsyncGenerator 转换为 SSE 事件流
  • 事件过滤:过滤掉不需要的中间节点事件
  • 错误处理:优雅处理流式处理中的异常情况

技术亮点

1. 响应式编程

  • 使用 Spring WebFlux 和 Project Reactor
  • 非阻塞 I/O 处理
  • 背压控制机制

2. 流式 AI 响应

  • 实时流式输出
  • 支持长对话场景
  • 客户端实时接收响应

3. 图计算模式

  • 清晰的数据流向
  • 易于扩展和维护
  • 支持复杂的业务逻辑编排

使用方法

1. 调用接口

  • 流式调用: /code6 (返回 SSE 流)

总结

Spring AI Graph 这个流式响应的Demo展示了AI 应用开发的最佳实践,通过图计算、流式处理和响应式编程的结合,构建了高性能、可扩展的智能对话系统。这种架构模式特别适合需要复杂业务逻辑编排的 AI 应用场景。


网站公告

今日签到

点亮在社区的每一天
去签到