目录
一、什么是SSE流式输出?
SSE(即Serve Sent Events)是一种Web应用中实现的单向实时通信技术。通过SSE,服务器可以主动向客户端发送数据更新,不需要客户端不断发送请求。
应用场景:
- 聊天应用
- 数据大屏的实时更新
- 直播互动与弹幕
特点:
- 单向通信,区别于WebSocket的双向通信。
- 基于HTTP,简单易用,安全性高。
- 自动重连,如果连接丢失了,浏览器会自动尝试重新连接。
- 文本格式,数据以文本流的形式发送,通常是UTF-8编码。
优势:
- 低能耗:相比轮询(Polling),SSE只需建立一次连接,减少客户端请求频率,降低设备功耗和网络带宽占用。
- 实时性强:服务器主动推送数据,避免客户端等待下一次轮询周期,适合对实时性要求高的场景(如交易、预警)。
- 兼容性好:基于HTTP协议,无需额外插件,主流浏览器和客户端均支持,开发成本较低。
二、SSE的实现过程
客户端(client)向服务器发送一个GET请求,带有指定的header,表示可以接收事件流类型,并禁用任何的事件缓存。
服务器(Server)返回一个响应,带有指定的header,表示事件的媒体类型和编码,以及使用分块传输编码来流式传输动态生成的内容。
服务器(Server)在有数据更新时,向客户端发送一个或多个名称:值字段组成的事件,由单个换行符分隔。事件之间由两个换行符分隔。服务器可以发送事件数据、事件类型、事件ID和重试时间等字段。
三、前端代码示例(以原生的JS为例)
// 创建SSE客户端实例
let eventSource = null;
// 连接SSE服务
function connectToSSE() {
// 避免重复连接
if (eventSource && eventSource.readyState !== EventSource.CLOSED) {
eventSource.close();
}
// 创建EventSource实例(URL为服务器SSE端点)
eventSource = new EventSource('http://localhost:3000/sse');
// 监听不同类型的事件
eventSource.onopen = function(event) {
console.log('SSE连接已建立:', event);
updateLog('📌 连接成功,等待服务器推送数据...');
};
eventSource.onmessage = function(event) {
// 处理默认message事件(未指定event类型时)
const data = JSON.parse(event.data);
console.log('接收到数据:', data);
updateLog(`🔔 新消息: ${data.type} - ${data.content}`);
};
// 监听自定义事件类型(如chat-message)
eventSource.addEventListener('chat-message', function(event) {
const chatData = JSON.parse(event.data);
updateLog(`💬 聊天消息 - 发送者: ${chatData.sender}, 内容: ${chatData.message}`);
renderChatMessage(chatData);
});
eventSource.onerror = function(event) {
if (eventSource.readyState === EventSource.CLOSED) {
console.log('SSE连接已关闭');
updateLog('🔗 连接已关闭,尝试重新连接...');
} else {
console.error('SSE连接错误:', event);
updateLog(`❌ 连接错误: ${event.status || '未知错误'}`);
}
};
// 监听重连事件(原生EventSource自动处理重连,此处仅日志记录)
eventSource.onreconnect = function(timeout) {
console.log(`SSE尝试重连,等待 ${timeout}ms...`);
updateLog(`🔄 正在重连,等待 ${timeout}ms...`);
};
}
// 关闭SSE连接
function disconnectFromSSE() {
if (eventSource) {
eventSource.close();
eventSource = null;
updateLog('🔗 已手动断开连接');
}
}
// 更新日志显示
function updateLog(message) {
const logElement = document.getElementById('sse-log');
const newLog = document.createElement('div');
newLog.innerText = `[${new Date().toLocaleTimeString()}] ${message}`;
logElement.appendChild(newLog);
logElement.scrollTop = logElement.scrollHeight;
}
// 渲染聊天消息(示例函数)
function renderChatMessage(data) {
const chatContainer = document.getElementById('chat-container');
// 实际项目中可根据data渲染DOM元素
console.log('渲染聊天消息:', data);
}
// 页面加载完成后连接SSE
document.addEventListener('DOMContentLoaded', connectToSSE);
// 示例:页面提供断开连接按钮
document.getElementById('disconnect-btn').addEventListener('click', disconnectFromSSE);
四、后端代码示例(以PHP为例)
<?php
namespace App\Http\Controllers;
use Illuminate\Http\Request;
use Illuminate\Http\Response;
use Illuminate\Support\Str;
use Symfony\Component\HttpFoundation\StreamedResponse;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;
class SSEController extends Controller
{
// 存储所有SSE客户端连接(使用Redis实现分布式存储)
private $clientsKey = 'sse_clients';
/**
* SSE连接端点
*/
public function stream()
{
// 设置响应头
$headers = [
'Content-Type' => 'text/event-stream',
'Cache-Control' => 'no-cache',
'Connection' => 'keep-alive',
'Access-Control-Allow-Origin' => '*', // 生产环境按需配置
];
// 生成唯一客户端ID
$clientId = Str::uuid();
$response = new StreamedResponse(function() use ($clientId) {
// 向客户端发送消息的函数
$sendMessage = function($eventType, $data) use ($clientId) {
// 格式化SSE消息
$message = "event: {$eventType}\n";
$message .= "data: " . json_encode($data) . "\n\n";
// 输出到客户端
echo $message;
ob_flush();
flush();
};
// 将客户端添加到Redis列表
Redis::lPush($this->clientsKey, $clientId);
Log::info("新SSE连接: {$clientId}");
try {
// 发送连接成功消息
$sendMessage('connect', [
'id' => $clientId,
'time' => now()->toDateTimeString(),
'message' => '连接成功'
]);
// 定期发送心跳包保持连接活跃
while (true) {
$sendMessage('heartbeat', [
'time' => now()->toDateTimeString(),
'message' => '保持连接活跃'
]);
sleep(30); // 每30秒发送一次心跳
}
} catch (\Exception $e) {
Log::error("SSE连接错误: {$clientId}, 错误: {$e->getMessage()}");
} finally {
// 客户端断开连接时从Redis移除
Redis::lRem($this->clientsKey, $clientId, 0);
Log::info("SSE连接关闭: {$clientId}");
}
}, 200, $headers);
// 设置响应超时时间(避免PHP默认30秒超时)
$response->setMaxAge(0);
$response->headers->set('Cache-Control', 'no-cache, no-store, max-age=0');
return $response;
}
/**
* 向所有客户端推送数据
*/
public function pushToAll(Request $request)
{
$eventType = $request->input('event_type', 'message');
$data = $request->input('data', []);
// 添加时间戳
$data['timestamp'] = now()->toDateTimeString();
// 从Redis获取所有客户端ID
$clientIds = Redis::lRange($this->clientsKey, 0, -1);
$clientCount = count($clientIds);
if ($clientCount === 0) {
return response()->json([
'status' => 'info',
'message' => '没有活跃的SSE连接'
]);
}
// 使用Redis发布订阅模式推送数据(支持分布式部署)
Redis::publish('sse_events', json_encode([
'event_type' => $eventType,
'data' => $data
]));
return response()->json([
'status' => 'success',
'message' => "已向 {$clientCount} 个客户端推送数据",
'data' => $data
]);
}
/**
* 向指定客户端推送数据
*/
public function pushToClient($clientId, Request $request)
{
$eventType = $request->input('event_type', 'message');
$data = $request->input('data', []);
// 检查客户端是否存在
$clientExists = Redis::lContains($this->clientsKey, $clientId);
if (!$clientExists) {
return response()->json([
'status' => 'error',
'message' => "客户端 {$clientId} 不存在或已断开连接"
], 404);
}
// 添加时间戳
$data['timestamp'] = now()->toDateTimeString();
// 使用Redis发布带客户端ID的消息
Redis::publish('sse_client_' . $clientId, json_encode([
'event_type' => $eventType,
'data' => $data
]));
return response()->json([
'status' => 'success',
'message' => "已向客户端 {$clientId} 推送数据",
'data' => $data
]);
}
}
五、总结
SSE 作为轻量级实时通信方案,在单向数据推送场景中具有显著优势。前端通过EventSource简化连接管理,后端借助 Redis 实现分布式消息分发,可高效应用于聊天、数据监控、通知等场景。与 WebSocket 相比,SSE 更适合服务器主动推送的业务需求,且开发成本更低,是实时 Web 应用的重要技术选择。