Spring Boot 整合 SSE, http长连接

发布于:2025-08-30 ⋅ 阅读:(21) ⋅ 点赞:(0)

1. 什么是 SSE? (30秒)

SSE (Server-Sent Events) 是一种允许服务器通过 HTTP 连接主动向客户端发送实时更新的技术。

  • 特点:基于 HTTP,使用简单,单向通信(服务器 -> 客户端),自动重连。

  • 对比 WebSocket:WebSocket 是双向的,更复杂;SSE 是单向的,更轻量,适用于通知、日志流、实时数据更新等场景。


2. 核心依赖与配置 (30秒)

Spring Boot 从 2.2.x 版本开始提供了对 SSE 的专用支持,主要包含在 spring-boot-starter-web 中,无需引入额外依赖

确保你的 pom.xml 中有:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

3. 三步编写代码 (3分钟)

第一步:创建控制器 (Controller)

创建一个 @RestController,并定义一个方法来产生 SSE 流。

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@RestController
public class SseController {

    // 用于保存所有连接的 SseEmitter,可以根据用户ID等关键字进行存储
    private static final Map<String, SseEmitter> EMITTER_MAP = new ConcurrentHashMap<>();

    /**
     * 用于客户端连接 SSE
     * @param clientId 客户端标识,用于区分不同客户端
     * @return SseEmitter
     */
    @GetMapping(path = "/sse/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter connect(@RequestParam String clientId) {
        // 设置超时时间,0表示永不超时。可以根据需要设置,例如 30_000L (30秒)
        SseEmitter emitter = new SseEmitter(0L);

        // 注册回调函数,当连接完成或出错时,从Map中移除这个Emitter
        emitter.onCompletion(() -> EMITTER_MAP.remove(clientId));
        emitter.onError((e) -> EMITTER_MAP.remove(clientId));
        emitter.onTimeout(() -> EMITTER_MAP.remove(clientId));

        // 将新的 emitter 存入 Map
        EMITTER_MAP.put(clientId, emitter);

        // 可选:发送一个初始连接成功的事件
        try {
            emitter.send(SseEmitter.event()
                    .name("INIT") // 事件名称,可选
                    .data("连接成功 for: " + clientId) // 事件数据
                    .id("1") // 事件ID,可选,用于重连
                    .reconnectTime(5000)); // 重连时间,可选
        } catch (IOException e) {
            e.printStackTrace();
        }

        return emitter;
    }
}
第二步:创建发送消息的方法

在同一个 Controller 中,添加一个 API 来模拟向特定客户端发送消息。

    /**
     * 向指定客户端发送消息
     */
    @GetMapping("/sse/send")
    public String sendMessage(@RequestParam String clientId, @RequestParam String message) {
        SseEmitter emitter = EMITTER_MAP.get(clientId);
        if (emitter != null) {
            try {
                // 构建并发送事件
                emitter.send(SseEmitter.event()
                        .name("MESSAGE") // 事件类型
                        .data(message)   // 事件数据
                        .id("msg-id-" + System.currentTimeMillis())); // ID
            } catch (IOException e) {
                // 发送失败,移除 emitter
                EMITTER_MAP.remove(clientId);
                return "发送失败,客户端可能已断开";
            }
            return "发送成功 to: " + clientId;
        }
        return "客户端不存在";
    }
第三步:编写前端页面进行测试 (1分钟)

在 src/main/resources/static 目录下创建一个 sse-demo.html 文件。

<!DOCTYPE html>
<html>
<head>
    <title>SSE Demo</title>
</head>
<body>
    <h1>SSE 客户端测试</h1>

    <label for="clientId">客户端ID: </label>
    <input type="text" id="clientId" value="test-client-1">
    <button onclick="connectSSE()">连接SSE</button>
    <button onclick="closeSSE()">断开连接</button>

    <hr>
    <label for="message">要发送的消息: </label>
    <input type="text" id="message" value="Hello SSE!">
    <button onclick="sendMessage()">发送消息</button>

    <hr>
    <h3>收到的事件:</h3>
    <div id="messages"></div>

    <script>
        let eventSource;

        function connectSSE() {
            const clientId = document.getElementById('clientId').value;
            // 断开现有连接
            if (eventSource) {
                eventSource.close();
            }

            // 建立新的 SSE 连接
            eventSource = new EventSource(`/sse/connect?clientId=${clientId}`);

            // 监听通用消息(没有指定 event name 的消息)
            eventSource.onmessage = function (event) {
                appendMessage(`[message]: ${event.data}`);
            };

            // 监听特定名称的事件 (例如:MESSAGE)
            eventSource.addEventListener("MESSAGE", function (event) {
                appendMessage(`[MESSAGE]: ${event.data}`);
            });

            // 监听特定名称的事件 (例如:INIT)
            eventSource.addEventListener("INIT", function (event) {
                appendMessage(`[INIT]: ${event.data}`);
            });

            eventSource.onerror = function (err) {
                console.error("SSE error:", err);
                appendMessage('[错误] 连接出错');
            };
        }

        function closeSSE() {
            if (eventSource) {
                eventSource.close();
                appendMessage('[信息] 连接已关闭');
                eventSource = null;
            }
        }

        function sendMessage() {
            const clientId = document.getElementById('clientId').value;
            const message = document.getElementById('message').value;
            fetch(`/sse/send?clientId=${clientId}&message=${encodeURIComponent(message)}`)
                .then(response => response.text())
                .then(data => console.log(data));
        }

        function appendMessage(text) {
            const messageDiv = document.getElementById('messages');
            const p = document.createElement('p');
            p.textContent = `${new Date().toLocaleTimeString()}: ${text}`;
            messageDiv.appendChild(p);
        }
    </script>
</body>
</html>

4. 运行与测试 (1分钟)

  1. 启动应用:运行你的 Spring Boot 应用。

  2. 打开页面:访问 http://localhost:8080/sse-demo.html

  3. 进行测试

    • 输入一个客户端 ID(如 user1),点击 “连接SSE”。前端会收到 [INIT] 事件。

    • 在另一个浏览器标签页或使用 Postman 访问 :  http://localhost:8080/sse/send?clientId=user1&message=你好!

    • 观察第一个标签页,会立即收到 [MESSAGE]: 你好! 的消息。

总结

  • 核心对象SseEmitter

  • 关键注解@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)

  • 流程

    1. 客户端连接 /sse/connect,服务端创建并保存 SseEmitter

    2. 服务端通过 emitter.send() 主动推送消息。

    3. 客户端通过 EventSource API 监听和处理消息。

    4. 连接结束时,服务端需要清理 SseEmitter(通过回调函数)。

现在你已经掌握了 Spring Boot 整合 SSE 的基本方法!在实际项目中,你可能需要将其与业务逻辑、身份认证(如 JWT)以及更强大的连接管理(如使用数据库或 Redis 存储 emitter)相结合。