1. 添加依赖 (pom.xml
)
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
2. 事件服务 (EventService.java)
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Service
public class EventService {
// 线程安全的Emitter存储
private final ConcurrentMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
// 心跳调度器
private final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
// 事件计数器
private final AtomicInteger eventCounter = new AtomicInteger(0);
public EventService() {
// 启动心跳任务 (每25秒发送一次)
heartbeatExecutor.scheduleAtFixedRate(this::broadcastHeartbeat, 0, 25, TimeUnit.SECONDS);
}
// 客户端订阅
public SseEmitter subscribe() {
String clientId = UUID.randomUUID().toString();
SseEmitter emitter = new SseEmitter(60_000L); // 1分钟超时
// 注册事件处理器
emitter.onCompletion(() -> removeEmitter(clientId));
emitter.onTimeout(() -> {
removeEmitter(clientId);
emitter.complete();
});
emitter.onError(ex -> removeEmitter(clientId));
emitters.put(clientId, emitter);
return emitter;
}
// 广播事件
public void broadcast(String eventName, Object data) {
emitters.forEach((clientId, emitter) -> {
try {
emitter.send(SseEmitter.event()
.id(String.valueOf(eventCounter.incrementAndGet()))
.name(eventName)
.data(data)
);
} catch (IOException | IllegalStateException e) {
removeEmitter(clientId); // 发送失败则移除
}
});
}
// 广播心跳
private void broadcastHeartbeat() {
emitters.forEach((clientId, emitter) -> {
try {
emitter.send(SseEmitter.event()
.comment("heartbeat") // 发送注释类型的心跳
);
} catch (Exception ignored) {
// 心跳失败不移除,等待超时机制处理
}
});
}
// 移除客户端
private void removeEmitter(String clientId) {
SseEmitter emitter = emitters.remove(clientId);
if (emitter != null) {
emitter.complete();
}
}
// 关闭服务 (资源清理)
public void shutdown() {
// 1. 停止心跳线程
heartbeatExecutor.shutdownNow();
// 2. 关闭所有连接
emitters.forEach((id, emitter) -> {
try {
emitter.send(SseEmitter.event()
.name("system")
.data(Map.of("action", "shutdown"))
);
} catch (Exception ignored) {
} finally {
emitter.complete();
}
});
// 3. 清空集合
emitters.clear();
}
}
3. 控制器 (EventController.java)
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@RestController
@RequestMapping("/events")
public class EventController {
private final EventService eventService;
public EventController(EventService eventService) {
this.eventService = eventService;
}
// 客户端订阅入口
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe() {
return eventService.subscribe();
}
// 广播消息入口(这里是模拟消息推送过来,会把该条消息都放入到已订阅的客户端)
@PostMapping("/broadcast")
public void broadcast(@RequestParam String message) {
eventService.broadcast("message", Map.of(
"content", message,
"timestamp", System.currentTimeMillis()
));
}
}
4. 应用配置 (Application.java)
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class SseApplication {
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(SseApplication.class, args);
// 注册优雅关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
EventService eventService = context.getBean(EventService.class);
eventService.shutdown();
System.out.println("SSE资源已清理完成");
}));
}
}
5. 客户端示例 (JavaScript)
<!DOCTYPE html>
<html>
<body>
<h1>SSE客户端</h1>
<div id="messages"></div>
<script>
const messageContainer = document.getElementById('messages');
let eventSource;
function connect() {
eventSource = new EventSource('http://localhost:8080/events');
eventSource.addEventListener('message', (e) => {
const data = JSON.parse(e.data);
addMessage(`消息: ${data.content} [${new Date(data.timestamp).toLocaleTimeString()}]`);
});
eventSource.addEventListener('system', (e) => {
const data = JSON.parse(e.data);
if (data.action === 'shutdown') {
addMessage('系统通知: 服务即将关闭');
eventSource.close();
}
});
eventSource.onerror = (e) => {
addMessage('连接错误,3秒后重连...');
setTimeout(connect, 3000);
};
}
function addMessage(text) {
const p = document.createElement('p');
p.textContent = text;
messageContainer.appendChild(p);
messageContainer.scrollTop = messageContainer.scrollHeight;
}
// 初始连接
connect();
</script>
</body>
</html>
关键机制说明
心跳机制:
每25秒发送一次空注释事件
:heartbeat
防止代理或负载均衡器关闭空闲连接
客户端可通过监听所有事件检测心跳
关闭流程:
客户端重连:
使用事件ID支持断线续传
客户端错误时自动重连
服务端关闭时发送系统通知