1.前端
在utils 新建 websocket.ts
import { ElMessage } from 'element-plus';
import emitter, { messages } from '/@/utils/mitt';
import { useUserInfo } from '/@/stores/userInfo';
/**
* @Description WebSocket 状态管理类
* @Date 2025-09-01
* @Author zangqi
**/
export interface WebSocketState {
ws: WebSocket | null;
lockReconnect: boolean;
timeout: number;
timeoutObj: number | null;
serverTimeoutObj: number | null;
timeoutNum: number | null;
}
// WebSocket 配置接口
export interface WebSocketConfig {
url: string;
userId: string;
token: string;
timeout?: number;
}
// 创建 WebSocket 管理类
export class WebSocketManager {
private state: WebSocketState;
private config: WebSocketConfig;
private reconnectCallback?: () => void;
constructor(config: WebSocketConfig) {
this.config = {
timeout: 60000,
...config
};
this.state = {
ws: null,
lockReconnect: false,
timeout: this.config.timeout!,
timeoutObj: null,
serverTimeoutObj: null,
timeoutNum: null
};
}
// 设置重连回调
public setReconnectCallback(callback: () => void): void {
this.reconnectCallback = callback;
}
// 初始化 WebSocket 连接
public init(): void {
console.log("初始化 WebSocket");
if (!('WebSocket' in window)) {
ElMessage.error('浏览器不支持 WebSocket');
return;
}
try {
const wsUrl = `${this.config.url}websocket/${this.config.userId}?access_token=${this.config.token}`;
this.state.ws = new WebSocket(wsUrl);
this.state.ws.onopen = (event: Event) => {
console.log('SOCKET 链接:', event);
this.checkHeatBeat();
};
this.state.ws.onmessage = (event: MessageEvent) => {
console.log('SOCKET 接收信息:', event);
const content = event.data;
if (content === "心跳检测") {
console.log("websocket 接收到服务器信息,心跳重置");
this.reset();
} else {
if (content === "连接成功") {
console.log("websocket ping...");
} else {
try {
const json = JSON.parse(content);
console.log("发送事件");
emitter.emit(messages.system, json);
} catch (e) {
console.error('解析 WebSocket 消息失败:', e);
}
}
}
};
this.state.ws.onerror = (event: Event) => {
console.log('websocket 错误信息:', event);
this.reconnect();
};
this.state.ws.onclose = (event: CloseEvent) => {
console.log('websocket 已关闭连接:', event);
this.reconnect();
};
} catch (error) {
console.error('WebSocket 初始化失败:', error);
this.reconnect();
}
}
// 心跳检测
private checkHeatBeat(): void {
if (this.state.timeoutObj) clearTimeout(this.state.timeoutObj);
if (this.state.serverTimeoutObj) clearTimeout(this.state.serverTimeoutObj);
this.state.timeoutObj = window.setTimeout(() => {
if (this.state.ws && this.state.ws.readyState === WebSocket.OPEN) {
console.log("websocket 心跳检测...");
const userInfoStore = useUserInfo();
const userInfos = userInfoStore.userInfos;
const message = {
"form": userInfos.id,
"username": userInfos.username,
"content": "心跳检测",
"to": userInfos.id,
};
try {
this.state.ws.send(JSON.stringify(message));
} catch (error) {
console.error('发送心跳失败:', error);
this.reconnect();
}
} else {
console.log("websocket 断线重连中...");
this.reconnect();
}
this.state.serverTimeoutObj = window.setTimeout(() => {
if (this.state.ws) {
this.state.ws.close();
}
}, this.state.timeout);
}, this.state.timeout);
}
// 重连机制
private reconnect(): void {
if (this.state.lockReconnect) {
return;
}
this.state.lockReconnect = true;
if (this.state.timeoutNum) clearTimeout(this.state.timeoutNum);
this.state.timeoutNum = window.setTimeout(() => {
// 执行重连回调(如果提供)
if (this.reconnectCallback) {
this.reconnectCallback();
}
// 重新初始化连接
this.init();
this.state.lockReconnect = false;
}, 5000);
}
// 重置心跳
private reset(): void {
if (this.state.timeoutObj) clearTimeout(this.state.timeoutObj);
if (this.state.serverTimeoutObj) clearTimeout(this.state.serverTimeoutObj);
this.checkHeatBeat();
}
// 发送消息
public send(message: any): boolean {
if (this.state.ws && this.state.ws.readyState === WebSocket.OPEN) {
try {
this.state.ws.send(JSON.stringify(message));
return true;
} catch (error) {
console.error('发送消息失败:', error);
return false;
}
}
return false;
}
// 关闭连接
public close(): void {
if (this.state.ws) {
this.state.ws.close();
}
// 清除所有定时器
if (this.state.timeoutObj) clearTimeout(this.state.timeoutObj);
if (this.state.serverTimeoutObj) clearTimeout(this.state.serverTimeoutObj);
if (this.state.timeoutNum) clearTimeout(this.state.timeoutNum);
}
// 获取连接状态
public getReadyState(): number {
return this.state.ws ? this.state.ws.readyState : WebSocket.CLOSED;
}
// 检查是否连接
public isConnected(): boolean {
return this.state.ws !== null && this.state.ws.readyState === WebSocket.OPEN;
}
}
// 创建 WebSocket 管理器实例的工厂函数
export function createWebSocketManager(userId: string, token: string, url?: string): WebSocketManager {
const config: WebSocketConfig = {
url: url || import.meta.env.VITE_SOCKET_URL,
userId: userId,
token: token
};
return new WebSocketManager(config);
}
// 默认导出
export default WebSocketManager;
2.使用
import WebSocketManager, { createWebSocketManager } from '/@/utils/websocket';
const { userInfos } = storeToRefs(stores);
let websocketManager: WebSocketManager | null = null;
// 初始化页面 WebSocket
const initWebsocket = () => {
if (userInfos.value && userInfos.value.id) {
websocketManager = createWebSocketManager(
userInfos.value.id,
Session.get('token')
);
// 设置重连回调
websocketManager.setReconnectCallback(() => {
console.log('WebSocket 重连中...');
});
websocketManager.init();
}
};
// 页面加载时
onMounted(() => {
//初始化页面websocket
initWebsocket();
});
2.后端
1.新建一个配置类 WebSocketConfig
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
2.新建一个服务service WebSocketServer
package cn.codesys.notification.center.service;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.idev.excel.util.StringUtils;
import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author zangqi
* @Description
* @data 2025/9/1 15:32
*/
@Slf4j
@ServerEndpoint(value = "/websocket/{userId}")
@Component
public class WebSocketServer {
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收userId
*/
private String userId = "";
/**
* 连接建立成
* 功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
//加入set中
webSocketMap.put(userId, this);
} else {
//加入set中
webSocketMap.put(userId, this);
}
log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
sendMessage("连接成功");
}
/**
* 连接关闭
* 调用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
}
log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
}
/**
* 收到客户端消
* 息后调用的方法
*
* @param message 客户端发送过来的消息
**/
@OnMessage
public void onMessage(String message, Session session) {
log.info("用户消息:" + userId + ",报文:" + message);
JSONObject jsonObject = JSONUtil.parseObj(message);
String content = (String) jsonObject.get("content");
String form = (String) jsonObject.get("form");
sendMsgByUserId(content, form);
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务
* 器主动推送
*/
public void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 发送自定义消息
*
**/
public static void sendMsgByUserId(String message, String userId) {
log.info("发送消息到:" + userId + ",报文:" + message);
if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
webSocketMap.get(userId).sendMessage(message);
} else {
log.error("用户" + userId + ",不在线!");
}
}
/**
* 群发自定义消息
*
**/
public static void sendMsgByUserIdList(String message, List<String> userList) {
userList.forEach(userId -> {
log.info("发送消息到:" + userId + ",报文:" + message);
if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
webSocketMap.get(userId).sendMessage(message);
} else {
log.error("用户" + userId + ",不在线!");
}
});
}
/**
* 获得此时的
* 在线人数
*
* @return
*/
public static synchronized int getOnlineCount() {
return webSocketMap.size();
}
}
3.推送消息
webSocketServer.sendMsgByUserIdList(JSONUtil.toJsonStr(ncNotify), userIds);