【超详解】SSE流式输出

发布于:2025-06-28 ⋅ 阅读:(17) ⋅ 点赞:(0)

目录

一、什么是SSE流式输出?

二、SSE的实现过程

三、前端代码示例(以原生的JS为例)

四、后端代码示例(以PHP为例)

五、总结


一、什么是SSE流式输出?

SSE(即Serve Sent Events)是一种Web应用中实现的单向实时通信技术。通过SSE,服务器可以主动向客户端发送数据更新,不需要客户端不断发送请求。

应用场景:

  • 聊天应用
  • 数据大屏的实时更新
  • 直播互动与弹幕

特点:

  • 单向通信,区别于WebSocket的双向通信。
  • 基于HTTP,简单易用,安全性高。
  • 自动重连,如果连接丢失了,浏览器会自动尝试重新连接。
  •  文本格式,数据以文本流的形式发送,通常是UTF-8编码。

优势:

  • 低能耗:相比轮询(Polling),SSE只需建立一次连接,减少客户端请求频率,降低设备功耗和网络带宽占用。
  • 实时性强:服务器主动推送数据,避免客户端等待下一次轮询周期,适合对实时性要求高的场景(如交易、预警)。
  • 兼容性好:基于HTTP协议,无需额外插件,主流浏览器和客户端均支持,开发成本较低。

二、SSE的实现过程

客户端(client)向服务器发送一个GET请求,带有指定的header,表示可以接收事件流类型,并禁用任何的事件缓存。

服务器(Server)返回一个响应,带有指定的header,表示事件的媒体类型和编码,以及使用分块传输编码来流式传输动态生成的内容。

服务器(Server)在有数据更新时,向客户端发送一个或多个名称:值字段组成的事件,由单个换行符分隔。事件之间由两个换行符分隔。服务器可以发送事件数据、事件类型、事件ID和重试时间等字段。

三、前端代码示例(以原生的JS为例)

// 创建SSE客户端实例

let eventSource = null;

// 连接SSE服务

function connectToSSE() {

  // 避免重复连接

  if (eventSource && eventSource.readyState !== EventSource.CLOSED) {

    eventSource.close();

  }



  // 创建EventSource实例(URL为服务器SSE端点)

  eventSource = new EventSource('http://localhost:3000/sse');

 

  // 监听不同类型的事件

  eventSource.onopen = function(event) {

    console.log('SSE连接已建立:', event);

    updateLog('📌 连接成功,等待服务器推送数据...');

  };

 

  eventSource.onmessage = function(event) {

    // 处理默认message事件(未指定event类型时)

    const data = JSON.parse(event.data);

    console.log('接收到数据:', data);

    updateLog(`🔔 新消息: ${data.type} - ${data.content}`);

  };

 

  // 监听自定义事件类型(如chat-message)

  eventSource.addEventListener('chat-message', function(event) {

    const chatData = JSON.parse(event.data);

    updateLog(`💬 聊天消息 - 发送者: ${chatData.sender}, 内容: ${chatData.message}`);

    renderChatMessage(chatData);

  });

 

  eventSource.onerror = function(event) {

    if (eventSource.readyState === EventSource.CLOSED) {

      console.log('SSE连接已关闭');

      updateLog('🔗 连接已关闭,尝试重新连接...');

    } else {

      console.error('SSE连接错误:', event);

      updateLog(`❌ 连接错误: ${event.status || '未知错误'}`);

    }

  };

 

  // 监听重连事件(原生EventSource自动处理重连,此处仅日志记录)

  eventSource.onreconnect = function(timeout) {

    console.log(`SSE尝试重连,等待 ${timeout}ms...`);

    updateLog(`🔄 正在重连,等待 ${timeout}ms...`);

  };

}



// 关闭SSE连接

function disconnectFromSSE() {

  if (eventSource) {

    eventSource.close();

    eventSource = null;

    updateLog('🔗 已手动断开连接');

  }

}



// 更新日志显示

function updateLog(message) {

  const logElement = document.getElementById('sse-log');

  const newLog = document.createElement('div');

  newLog.innerText = `[${new Date().toLocaleTimeString()}] ${message}`;

  logElement.appendChild(newLog);

  logElement.scrollTop = logElement.scrollHeight;

}



// 渲染聊天消息(示例函数)

function renderChatMessage(data) {

  const chatContainer = document.getElementById('chat-container');

  // 实际项目中可根据data渲染DOM元素

  console.log('渲染聊天消息:', data);

}



// 页面加载完成后连接SSE

document.addEventListener('DOMContentLoaded', connectToSSE);



// 示例:页面提供断开连接按钮

document.getElementById('disconnect-btn').addEventListener('click', disconnectFromSSE);

四、后端代码示例(以PHP为例)

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;

use Illuminate\Http\Response;

use Illuminate\Support\Str;

use Symfony\Component\HttpFoundation\StreamedResponse;

use Illuminate\Support\Facades\Log;

use Illuminate\Support\Facades\Redis;



class SSEController extends Controller

{

    // 存储所有SSE客户端连接(使用Redis实现分布式存储)

    private $clientsKey = 'sse_clients';

   

    /**

     * SSE连接端点

     */

    public function stream()

    {

        // 设置响应头

        $headers = [

            'Content-Type' => 'text/event-stream',

            'Cache-Control' => 'no-cache',

            'Connection' => 'keep-alive',

            'Access-Control-Allow-Origin' => '*', // 生产环境按需配置

        ];

       

        // 生成唯一客户端ID

        $clientId = Str::uuid();

        $response = new StreamedResponse(function() use ($clientId) {

            // 向客户端发送消息的函数

            $sendMessage = function($eventType, $data) use ($clientId) {

                // 格式化SSE消息

                $message = "event: {$eventType}\n";

                $message .= "data: " . json_encode($data) . "\n\n";

               

                // 输出到客户端

                echo $message;

                ob_flush();

                flush();

            };

           

            // 将客户端添加到Redis列表

            Redis::lPush($this->clientsKey, $clientId);

            Log::info("新SSE连接: {$clientId}");

           

            try {

                // 发送连接成功消息

                $sendMessage('connect', [

                    'id' => $clientId,

                    'time' => now()->toDateTimeString(),

                    'message' => '连接成功'

                ]);

               

                // 定期发送心跳包保持连接活跃

                while (true) {

                    $sendMessage('heartbeat', [

                        'time' => now()->toDateTimeString(),

                        'message' => '保持连接活跃'

                    ]);

                    sleep(30); // 每30秒发送一次心跳

                }

            } catch (\Exception $e) {

                Log::error("SSE连接错误: {$clientId}, 错误: {$e->getMessage()}");

            } finally {

                // 客户端断开连接时从Redis移除

                Redis::lRem($this->clientsKey, $clientId, 0);

                Log::info("SSE连接关闭: {$clientId}");

            }

        }, 200, $headers);

       

        // 设置响应超时时间(避免PHP默认30秒超时)

        $response->setMaxAge(0);

        $response->headers->set('Cache-Control', 'no-cache, no-store, max-age=0');

       

        return $response;

    }

   

    /**

     * 向所有客户端推送数据

     */

    public function pushToAll(Request $request)

    {

        $eventType = $request->input('event_type', 'message');

        $data = $request->input('data', []);

       

        // 添加时间戳

        $data['timestamp'] = now()->toDateTimeString();

       

        // 从Redis获取所有客户端ID

        $clientIds = Redis::lRange($this->clientsKey, 0, -1);

        $clientCount = count($clientIds);

       

        if ($clientCount === 0) {

            return response()->json([

                'status' => 'info',

                'message' => '没有活跃的SSE连接'

            ]);

        }

       

        // 使用Redis发布订阅模式推送数据(支持分布式部署)

        Redis::publish('sse_events', json_encode([

            'event_type' => $eventType,

            'data' => $data

        ]));

       

        return response()->json([

            'status' => 'success',

            'message' => "已向 {$clientCount} 个客户端推送数据",

            'data' => $data

        ]);

    }

   

    /**

     * 向指定客户端推送数据

     */

    public function pushToClient($clientId, Request $request)

    {

        $eventType = $request->input('event_type', 'message');

        $data = $request->input('data', []);

       

        // 检查客户端是否存在

        $clientExists = Redis::lContains($this->clientsKey, $clientId);

        if (!$clientExists) {

            return response()->json([

                'status' => 'error',

                'message' => "客户端 {$clientId} 不存在或已断开连接"

            ], 404);

        }

       

        // 添加时间戳

        $data['timestamp'] = now()->toDateTimeString();

       

        // 使用Redis发布带客户端ID的消息

        Redis::publish('sse_client_' . $clientId, json_encode([

            'event_type' => $eventType,

            'data' => $data

        ]));

       

        return response()->json([

            'status' => 'success',

            'message' => "已向客户端 {$clientId} 推送数据",

            'data' => $data

        ]);

    }

}

五、总结

SSE 作为轻量级实时通信方案,在单向数据推送场景中具有显著优势。前端通过EventSource简化连接管理,后端借助 Redis 实现分布式消息分发,可高效应用于聊天、数据监控、通知等场景。与 WebSocket 相比,SSE 更适合服务器主动推送的业务需求,且开发成本更低,是实时 Web 应用的重要技术选择。