【AI】SpringAI 第二弹:基于多模型实现流式输出

发布于:2025-05-18 ⋅ 阅读:(19) ⋅ 点赞:(0)

目录

一、基于多模型实现流式输出

1.1 什么是流式输出

1.2 多模型引入

1.3 代码实现

1.3.1 流式输出的API介绍

1.3.2 Flux 源码分析

二、了解 Reactor 模型

三、SSE 协议


一、基于多模型实现流式输出

1.1 什么是流式输出

流式输出(Streaming Output)是指数据在生成过程中就逐步传输给接收方,而不是等待全部处理完成后再一次性输出。这种模式具有以下优势:

  1. 低延迟​:用户可以立即看到部分结果,无需等待全部处理完成
  2. 资源高效​:可以更早释放部分资源,减少内存占用
  3. 用户体验好​:渐进式的反馈让用户感知到系统正在工作

1.2 多模型引入

1. 添加相关依赖

    <dependency>
        <groupId>org.springframework.ai</groupId>
        <artifactId>spring-ai-starter-model-openai</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.ai</groupId>
        <artifactId>spring-ai-starter-model-ollama</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.ai</groupId>
        <artifactId>spring-ai-starter-model-qianfan</artifactId>
    </dependency>

2. 设置配置文件

spring:
  ai:
    openai:
      api-key: ${自定义}
      base-url: https://dashscope.aliyuncs.com/compatible-mode/
      chat:
        options:
          model: ${自定义}
    ollama:
      base-url: http://127.0.0.1:11434
      chat:
        options:
          model: ${自定义}
    qianfan:
      api-key: ${自定义}
      secret-key: ${自定义}
      chat:
        options:
          model: ${自定义}

1.3 代码实现

1. OllamaController

@RestController
@RequestMapping("/ol")
public class OllamaController {
    
    @Resource
    private OllamaChatModel chatModel;

    @GetMapping(value = "/chat",produces = "text/event-stream")
    public Flux<String> chat(String prompt) {
        return chatModel.stream(prompt);
    }
}

调用结果如下:

2. QianFanController

@RestController
@RequestMapping("/qianfan")
public class QianFanController {

    @Autowired
    private QianFanChatModel chatModel;

    @RequestMapping(value = "/stream",produces = "text/event-stream")
    public Flux<String> chat(@RequestParam("question") String question) {
        return chatModel.stream(question);
    }
}

调用结果如下:

3. BaiLianController:通过对接通用的大模型平台,实现同平台的多种大模型切换

ChatClent是对ChatModel的封装

@RestController
@RequestMapping("/bl")
public class BaiLianController {

    @Autowired
    private ChatClient deepseekClient;

    @Autowired
    private ChatClient qwenClient;

    @RequestMapping(value = "/chatDeepSeek",produces = "text/event-stream")
    public Flux<String> chatDeepSeek(String message) {
        return deepseekClient.prompt(message)
                .stream()
                .content();
    }

    @RequestMapping(value = "/chatQwen",produces = "text/event-stream")
    public Flux<String> chatQwen(String message) {
        return qwenClient.prompt(message)
                .stream()
                .content();
    }
}

Flux 是 Spring WebFlux 框架中的一个核心组件,属于响应式编程模型的一部分。它主要用于处理异步、非阻塞的流式数据,能够高效地处理高并发场景。Flux 可以生成和处理一系列的事件或数据, 如流式输出等。

调用 DeepSeek 结果如下:(基于 ChatGPT4模型)

调用 通问千义 结果如下:

1.3.1 流式输出的API介绍

基于 ChatModel:

基于 ChatClient:

1.3.2 Flux 源码分析

要搞清楚这个问题,我们需要看流式输出对象 Flux 的实现源码:

查看 Flux 源码我们发现它是属于 reactor.core.publisher 包下的抽象类:

Reactor Streams 会订阅数据源,当有数据时,Reactor Streams 以分块流的方式 发送给客户端(用户)。

二、了解 Reactor 模型

Reactor 是一种事件驱动的高性能网络编程模型,主要用于处理高并发的网络 I/O 请求。其核心思想是通过一个或多个线程监听事件,并将事件分发给相应的处理程序,从而实现高效的并发处理。

Reactor 模型的主要特征如下:

  1. 事件驱动:所有 I/O 操作都由事件触发并处理。

  2. 非阻塞:操作不会因为 I/O 而挂起,避免了线程等待的开销。

  3. 高效资源利用:通过少量线程处理大量并发连接,提升性能。

  4. 组件分离:将事件监听(Reactor)、事件分发(Dispatcher)和事件处理(Handler)解耦, 使代码结构更清晰。

Reactor 实现方式有三种:

  1. 单线程 Reactor 模型:所有操作在一个线程完成,适用于低并发场景。

  2. 多线程 Reactor 模型:主线程处理连接,子线程池处理 I/O 和业务。

  3. 主从 Reactor 模型:主线程池处理连接,子线程池处理 I/O(进一步优化资源分配)。

三、SSE 协议

除了 Flux 实现之外,我们还可以通过 Server-Sent Events (SSE) 协议实现流式输出,通过单向 的 HTTP 长连接,服务端可以持续推送数据片段(如逐词或逐句)到前端。与 WebSocket 不同, SSE 是轻量级的单向通信协议,适合 AI 对话这类服务端主导的场景,它的具体实现代码如下:

@GetMapping(value = "/streamChat", produces = MediaType.TEXT_EVENT_STREAM_VALUE + ";charset=UTF-8")
    public SseEmitter streamChat(@RequestParam String message) {
        // 创建SSE发射器,设置超时时间(60秒)
        SseEmitter emitter = new SseEmitter(60_000L);

        // 创建Prompt对象
        Prompt prompt = new Prompt(new UserMessage(message));

        // 使用AtomicReference保存订阅以便后续取消
        AtomicReference<Disposable> subscriptionRef = new AtomicReference<>();

        // 订阅流式响应
        subscriptionRef.set(chatModel.stream(prompt).subscribe(
                response -> {
                    try {
                        // 防御性检查
                        if (response == null || response.getResult() == null || response.getResult().getOutput() == null) {
                            return;
                        }

                        String content = response.getResult().getOutput().getText();
                        if (content == null || content.isEmpty()) {
                            return;
                        }

                        // 发送SSE事件
                        SseEmitter.SseEventBuilder event = SseEmitter.event()
                                .data(content)
                                .id(UUID.randomUUID().toString())
                                .comment("Chat response chunk");

                        emitter.send(event);

                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                },
                error -> {

                },
                () -> {

                }
        ));

        // 处理客户端断开连接
        emitter.onCompletion(() -> {
            Disposable subscription = subscriptionRef.get();
            if (subscription != null && !subscription.isDisposed()) {
                subscription.dispose();
            }
        });

        // 处理超时
        emitter.onTimeout(() -> {
            Disposable subscription = subscriptionRef.get();
            if (subscription != null && !subscription.isDisposed()) {
                subscription.dispose();
            }

        });

        // 处理错误
        emitter.onError((throwable) -> {
            Disposable subscription = subscriptionRef.get();
            if (subscription != null && !subscription.isDisposed()) {
                subscription.dispose();
            }
        });

        return emitter;
    }


网站公告

今日签到

点亮在社区的每一天
去签到