Spring Boot 2.2.6调用DeepSeek API并通过SSE将流式响应推送给前端的完整实现

发布于:2025-06-29 ⋅ 阅读:(19) ⋅ 点赞:(0)

1. 添加依赖 (pom.xml)

<dependencies>
    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- SSE 支持 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <!-- HTTP客户端 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-json</artifactId>
    </dependency>
</dependencies>

2. 配置类 (WebClientConfig.java)

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;

@Configuration
public class WebClientConfig {
    
    @Bean
    public WebClient webClient() {
        return WebClient.builder()
                .baseUrl("https://api.deepseek.com/v1")
                .defaultHeader("Authorization", "Bearer YOUR_API_KEY") // 替换为你的API密钥
                .build();
    }
}

3. 请求/响应DTO

import lombok.Data;
import java.util.List;

@Data
public class DeepSeekRequest {
    private String model = "deepseek-chat";
    private List<Message> messages;
    private boolean stream = true;

    @Data
    public static class Message {
        private String role;
        private String content;

        public Message(String role, String content) {
            this.role = role;
            this.content = content;
        }
    }
}

@Data
public class DeepSeekResponse {
    private List<Choice> choices;

    @Data
    public static class Choice {
        private Delta delta;
    }

    @Data
    public static class Delta {
        private String content;
    }
}

4. SSE服务实现 (DeepSeekService.java)




import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;

import java.util.Collections;

@Service
public class DeepSeekService {

    private final WebClient webClient;

    public DeepSeekService(WebClient webClient) {
        this.webClient = webClient;
    }

    public Flux<String> streamCompletion(String userMessage) {
        // 使用 FluxProcessor 替代 Sinks
        FluxProcessor<String, String> processor = DirectProcessor.<String>create().serialize();
        FluxSink<String> sink = processor.sink();

        DeepSeekRequest request = new DeepSeekRequest();
        request.setMessages(Collections.singletonList(
                new DeepSeekRequest.Message("user", userMessage)
        ));

        webClient.post()
                .uri("/chat/completions")
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(request)
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(String.class)
                .subscribe(
                        data -> {
                            ObjectMapper objectMapper = new ObjectMapper();
                            try {
                                String jsonString = objectMapper.writeValueAsString(data);
                                sink.next(jsonString);
                            } catch (JsonProcessingException e) {
                                sink.error(e);
                            }
                        },
                        sink::error,
                        sink::complete
                );

        return processor;
    }
}

5. SSE控制器 (SseController.java)

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/sse")
public class SseController {

    private final DeepSeekService deepSeekService;

    public SseController(DeepSeekService deepSeekService) {
        this.deepSeekService = deepSeekService;
    }

    @GetMapping(path = "/deepseek", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter streamDeepSeekResponse(@RequestParam String message) {
        SseEmitter emitter = new SseEmitter(60 * 1000L); // 60秒超时
        
        Flux<String> responseStream = deepSeekService.streamCompletion(message);
        
        responseStream.subscribe(
                content -> {
                    try {
                        // 发送SSE事件
                        emitter.send(SseEmitter.event()
                                .data(content)
                                .name("message"));
                    } catch (Exception e) {
                        emitter.completeWithError(e);
                    }
                },
                emitter::completeWithError,
                emitter::complete
        );
        
        return emitter;
    }
}

6. 前端实现 (HTML + JavaScript)

<!DOCTYPE html>
<html>
<head>
    <title>DeepSeek SSE Demo</title>
</head>
<body>
    <input type="text" id="message" placeholder="输入你的问题">
    <button onclick="startSSE()">开始对话</button>
    <div id="output" style="white-space: pre-wrap; margin-top: 20px;"></div>

    <script>
        let eventSource;
        
        function startSSE() {
            const message = document.getElementById('message').value;
            const outputDiv = document.getElementById('output');
            outputDiv.innerHTML = ''; // 清空之前的内容
            
            if (eventSource) eventSource.close();
            
            // 创建SSE连接
            eventSource = new EventSource(`/sse/deepseek?message=${encodeURIComponent(message)}`);
            
            eventSource.addEventListener("message", (event) => {
                // 实时追加内容
                outputDiv.innerHTML += event.data;
            });
            
            eventSource.addEventListener("error", (err) => {
                console.error("SSE error:", err);
                outputDiv.innerHTML += "\n\n[连接已关闭]";
                eventSource.close();
            });
        }
    </script>
</body>
</html>

关键点说明:

        SSE流式传输

        使用SseEmitter实现服务端推送

        通过text/event-stream内容类型保持长连接

DeepSeek API集成

设置stream=true启用流式响应

         处理data: [DONE]结束标记

         解析JSON响应中的content字段

        响应式编程

        使用WebClient处理HTTP流

        使用Sinks进行背压管理

        Flux实现响应式流处理

        前端实现

        使用EventSource API接收SSE

        实时追加内容到DOM

        处理连接错误和关闭

测试步骤:

1.启动Spring Boot应用

2.访问前端页面(默认端口8080)

3.输入问题并点击按钮

4.查看实时输出的思考过程

注意事项:

1.替换YOUR_API_KEY为实际的DeepSeek API密钥

2.生产环境建议:

3.添加JSON解析库(如Jackson)处理响应

4.增加错误处理和重试机制

5.添加API速率限制

6.实现更健壮的SSE连接管理

此实现能让前端实时接收并显示DeepSeek API返回的流式响应,实现"思考过程"的逐字显示效果。


网站公告

今日签到

点亮在社区的每一天
去签到