基于vue3和springboot框架集成websocket

发布于:2025-09-03 ⋅ 阅读:(12) ⋅ 点赞:(0)

1.前端

在utils 新建 websocket.ts

import { ElMessage } from 'element-plus';
import emitter, { messages } from '/@/utils/mitt';
import { useUserInfo } from '/@/stores/userInfo';

/**
 * @Description WebSocket 状态管理类
 * @Date        2025-09-01
 * @Author      zangqi
 **/
export interface WebSocketState {
    ws: WebSocket | null;
    lockReconnect: boolean;
    timeout: number;
    timeoutObj: number | null;
    serverTimeoutObj: number | null;
    timeoutNum: number | null;
}

// WebSocket 配置接口
export interface WebSocketConfig {
    url: string;
    userId: string;
    token: string;
    timeout?: number;
}

// 创建 WebSocket 管理类
export class WebSocketManager {
    private state: WebSocketState;
    private config: WebSocketConfig;
    private reconnectCallback?: () => void;

    constructor(config: WebSocketConfig) {
        this.config = {
            timeout: 60000,
            ...config
        };

        this.state = {
            ws: null,
            lockReconnect: false,
            timeout: this.config.timeout!,
            timeoutObj: null,
            serverTimeoutObj: null,
            timeoutNum: null
        };
    }

    // 设置重连回调
    public setReconnectCallback(callback: () => void): void {
        this.reconnectCallback = callback;
    }
    // 初始化 WebSocket 连接
    public init(): void {
        console.log("初始化 WebSocket");
        if (!('WebSocket' in window)) {
            ElMessage.error('浏览器不支持 WebSocket');
            return;
        }

        try {
            const wsUrl = `${this.config.url}websocket/${this.config.userId}?access_token=${this.config.token}`;
            this.state.ws = new WebSocket(wsUrl);

            this.state.ws.onopen = (event: Event) => {
                console.log('SOCKET 链接:', event);
                this.checkHeatBeat();
            };

            this.state.ws.onmessage = (event: MessageEvent) => {
                console.log('SOCKET 接收信息:', event);
                const content = event.data;

                if (content === "心跳检测") {
                    console.log("websocket 接收到服务器信息,心跳重置");
                    this.reset();
                } else {
                    if (content === "连接成功") {
                        console.log("websocket ping...");
                    } else {
                        try {
                            const json = JSON.parse(content);
                            console.log("发送事件");
                            emitter.emit(messages.system, json);
                        } catch (e) {
                            console.error('解析 WebSocket 消息失败:', e);
                        }
                    }
                }
            };

            this.state.ws.onerror = (event: Event) => {
                console.log('websocket 错误信息:', event);
                this.reconnect();
            };

            this.state.ws.onclose = (event: CloseEvent) => {
                console.log('websocket 已关闭连接:', event);
                this.reconnect();
            };
        } catch (error) {
            console.error('WebSocket 初始化失败:', error);
            this.reconnect();
        }
    }

    // 心跳检测
    private checkHeatBeat(): void {
        if (this.state.timeoutObj) clearTimeout(this.state.timeoutObj);
        if (this.state.serverTimeoutObj) clearTimeout(this.state.serverTimeoutObj);

        this.state.timeoutObj = window.setTimeout(() => {
            if (this.state.ws && this.state.ws.readyState === WebSocket.OPEN) {
                console.log("websocket 心跳检测...");

                const userInfoStore = useUserInfo();
                const userInfos = userInfoStore.userInfos;

                const message = {
                    "form": userInfos.id,
                    "username": userInfos.username,
                    "content": "心跳检测",
                    "to": userInfos.id,
                };

                try {
                    this.state.ws.send(JSON.stringify(message));
                } catch (error) {
                    console.error('发送心跳失败:', error);
                    this.reconnect();
                }
            } else {
                console.log("websocket 断线重连中...");
                this.reconnect();
            }

            this.state.serverTimeoutObj = window.setTimeout(() => {
                if (this.state.ws) {
                    this.state.ws.close();
                }
            }, this.state.timeout);
        }, this.state.timeout);
    }

    // 重连机制
    private reconnect(): void {
        if (this.state.lockReconnect) {
            return;
        }

        this.state.lockReconnect = true;

        if (this.state.timeoutNum) clearTimeout(this.state.timeoutNum);

        this.state.timeoutNum = window.setTimeout(() => {
            // 执行重连回调(如果提供)
            if (this.reconnectCallback) {
                this.reconnectCallback();
            }

            // 重新初始化连接
            this.init();
            this.state.lockReconnect = false;
        }, 5000);
    }

    // 重置心跳
    private reset(): void {
        if (this.state.timeoutObj) clearTimeout(this.state.timeoutObj);
        if (this.state.serverTimeoutObj) clearTimeout(this.state.serverTimeoutObj);
        this.checkHeatBeat();
    }

    // 发送消息
    public send(message: any): boolean {
        if (this.state.ws && this.state.ws.readyState === WebSocket.OPEN) {
            try {
                this.state.ws.send(JSON.stringify(message));
                return true;
            } catch (error) {
                console.error('发送消息失败:', error);
                return false;
            }
        }
        return false;
    }

    // 关闭连接
    public close(): void {
        if (this.state.ws) {
            this.state.ws.close();
        }

        // 清除所有定时器
        if (this.state.timeoutObj) clearTimeout(this.state.timeoutObj);
        if (this.state.serverTimeoutObj) clearTimeout(this.state.serverTimeoutObj);
        if (this.state.timeoutNum) clearTimeout(this.state.timeoutNum);
    }

    // 获取连接状态
    public getReadyState(): number {
        return this.state.ws ? this.state.ws.readyState : WebSocket.CLOSED;
    }

    // 检查是否连接
    public isConnected(): boolean {
        return this.state.ws !== null && this.state.ws.readyState === WebSocket.OPEN;
    }
}

// 创建 WebSocket 管理器实例的工厂函数
export function createWebSocketManager(userId: string, token: string, url?: string): WebSocketManager {
    const config: WebSocketConfig = {
        url: url || import.meta.env.VITE_SOCKET_URL,
        userId: userId,
        token: token
    };

    return new WebSocketManager(config);
}

// 默认导出
export default WebSocketManager;

2.使用

import WebSocketManager, { createWebSocketManager } from '/@/utils/websocket';
const { userInfos } = storeToRefs(stores);
let websocketManager: WebSocketManager | null = null;
// 初始化页面 WebSocket
const initWebsocket = () => {
  if (userInfos.value && userInfos.value.id) {
    websocketManager = createWebSocketManager(
        userInfos.value.id,
        Session.get('token')
    );
    // 设置重连回调
    websocketManager.setReconnectCallback(() => {
      console.log('WebSocket 重连中...');
    });
    websocketManager.init();
  }
};
// 页面加载时
onMounted(() => {
  //初始化页面websocket
  initWebsocket();
});

2.后端

1.新建一个配置类 WebSocketConfig

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

2.新建一个服务service WebSocketServer

package cn.codesys.notification.center.service;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.idev.excel.util.StringUtils;
import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author zangqi
 * @Description
 * @data 2025/9/1 15:32
 */
@Slf4j
@ServerEndpoint(value = "/websocket/{userId}")
@Component
public class WebSocketServer {
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 接收userId
     */
    private String userId = "";

    /**
     * 连接建立成
     * 功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        this.userId = userId;
        if (webSocketMap.containsKey(userId)) {
            webSocketMap.remove(userId);
            //加入set中
            webSocketMap.put(userId, this);
        } else {
            //加入set中
            webSocketMap.put(userId, this);
        }
        log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
        sendMessage("连接成功");
    }

    /**
     * 连接关闭
     * 调用的方法
     */
    @OnClose
    public void onClose() {
        if (webSocketMap.containsKey(userId)) {
            webSocketMap.remove(userId);
        }
        log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
    }

    /**
     * 收到客户端消
     * 息后调用的方法
     *
     * @param message 客户端发送过来的消息
     **/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:" + userId + ",报文:" + message);
        JSONObject jsonObject = JSONUtil.parseObj(message);
        String content = (String) jsonObject.get("content");
        String form = (String) jsonObject.get("form");
        sendMsgByUserId(content, form);
    }


    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 实现服务
     * 器主动推送
     */
    public void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发送自定义消息
     *
     **/
    public static void sendMsgByUserId(String message, String userId) {
        log.info("发送消息到:" + userId + ",报文:" + message);
        if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
            webSocketMap.get(userId).sendMessage(message);
        } else {
            log.error("用户" + userId + ",不在线!");
        }
    }

    /**
     * 群发自定义消息
     *
     **/
    public static void sendMsgByUserIdList(String message, List<String> userList) {
        userList.forEach(userId -> {
            log.info("发送消息到:" + userId + ",报文:" + message);
            if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
                webSocketMap.get(userId).sendMessage(message);
            } else {
                log.error("用户" + userId + ",不在线!");
            }
        });
    }

    /**
     * 获得此时的
     * 在线人数
     *
     * @return
     */
    public static synchronized int getOnlineCount() {
        return webSocketMap.size();
    }
}

3.推送消息

 webSocketServer.sendMsgByUserIdList(JSONUtil.toJsonStr(ncNotify), userIds);

网站公告

今日签到

点亮在社区的每一天
去签到