websocket用于控制在当前页只允许一个用户进行操作,其他用户等待

发布于:2025-09-04 ⋅ 阅读:(20) ⋅ 点赞:(0)

websocket用于控制用户进入当前页的可操作性

需求

页面只允许一个用户操作,后续进入的用户进行排队。

功能描述

  1. 用户进入当前页,排在第一位则放行操作。
  2. 用户进入当前页,不是排在第一位则等待。
  3. 第一位用户操作完毕后,关闭页面,第一位用户弹出队列,许可第二位用户进行操作。
  4. 页面占用有时间限制,超出阈值,用户需要退出重进,提示超时。
  5. 页面排队数量限制,若用户进入页面,超出排队长度,则提示队列超长。

使用场景

从数据台账进入某一条数据,该数据页面用户可操作。操作后后台存在异步任务,在该条数据存在后台任务时,不允许用户操作。一条数据只允许第一个进入该数据操作页面的用户对其进行操作,其他用户需要等待,直到自己排在第一位时,放行操作。

代码实现

  1. 消息实体类
@Data
public class SocketResponse {
    //当前排队位次 1 表示第一位 可以放行操作
    private Integer number;

    //异步任务信号量 true表示无异步任务可放心  false表示有异步任务不放行操作
    private Boolean taskSignal;

    //超时信号量 true表示超时 false 表示未超时
    private Boolean timeout;

    //超过最大连接数
    private Boolean overMaxConnectNum;

    //重试信号 true退出重试
    private Boolean retrySignal;

    //放行信号 true放行 false 不放行
    private Boolean signal;

    //当不能放行时给出原因
    private String message;
}
  1. 配置参数读取
    从yml中读取相关配置参数
@Data
@Component
@ConfigurationProperties(prefix = "xxx.web-socket")
public class WebSocketProperties {
    private int timeout;
    private int maxConnect;
    private int scheduleCleanDuration;
}
  1. 参数配置示例
xxx:
  web-socket:
    schedule-clean-duration: 15 #单位是分钟
    timeout: 24 #超时时间 单位 小时
    max-connect: 10 #每个表单实例允许的最大连接数量
  1. 长连接控制器
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson2.JSON;
import com.xxx.WebSocketProperties;
import com.xxx.SocketResponse;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * WebSocket的操作类
 */
@Component
@Slf4j
/**
 * html页面与之关联的接口
 * var reqUrl = "http://localhost:8081/websocket/" + cid;
 * socket = new WebSocket(reqUrl.replace("http", "ws"));
 */
@ServerEndpoint("/xxxWebsocket/{businessId}")
public class XXXWebSocketServer {

	public static final Long ONEHOURMILLISECONDS = 60 * 60 * 1000L;
    /**
     * 存放所有在线的客户端
     */
    private static final Map<String, Queue<Session>> onlineSessionClientMap = new ConcurrentHashMap<>();

//读取配置参数 按需自行设计存取方式 
//我这里是存在yml文件中,参数有  session超时清理定时任务执行时间间隔,占用超时时间,单页面最大排队数
    private static final WebSocketProperties properties = SpringUtil.getBean(WebSocketProperties.class); 

//后台任务执行标记 key为businessId 值为放行信号量  无任务在执行放行 true  有任务在不放行 false
    public static final Map<String, Boolean> taskSignalMap = new ConcurrentHashMap<>();

    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    static{
        scheduler.scheduleAtFixedRate(() -> {
            log.info("长连接定时任务清理启动");
            WebSocketServer server = SpringUtil.getBean(WebSocketServer.class);
            AtomicInteger number = new AtomicInteger();
            try {
                onlineSessionClientMap.forEach((businessId, sessions) -> {
                    sessions.removeIf(session -> {
                        long currentTime = System.currentTimeMillis();
                        long createTime = (long) session.getUserProperties().getOrDefault("creationTime", 0L);
                        if(currentTime - createTime > properties.getTimeout() * ONEHOURMILLISECONDS)
                        {
                            number.getAndIncrement();
                            server.sendToOneTimeOut(session, businessId);
                            return true;
                        }
                        return false;
                    });
                    if(sessions.isEmpty()){
                        onlineSessionClientMap.remove(businessId);
                    }
                });
            } catch (Exception e) {
                log.error("长连接定时任务执行异常", e);
            }
            log.info("长连接定时任务清理完成, 清理连接数量:{}", number);
        }, 0, properties.getScheduleCleanDuration(), TimeUnit.MINUTES);
    }

    /**
     * 连接sid和连接会话
     */
    private String businessId;
    private Session session;

    /**
     * 连接建立成功调用的方法。由前端<code>new WebSocket</code>触发
     *
     * @param businessId     每次页面建立连接时传入到服务端的id,比如用户id等。可以自定义。
     * @param session 与某个客户端的连接会话,需要通过它来给客户端发送消息
     */
    @OnOpen
    @ApiOperation(value = "连接建立成功调用的方法。由前端new WebSocket触发")
    public void onOpen(@PathParam("businessId") String businessId, Session session) {
        Queue<Session> q = onlineSessionClientMap.get(businessId);
        this.businessId = businessId;
        this.session = session;
        if(q == null || q.size() < properties.getMaxConnect()){

            onlineSessionClientMap.putIfAbsent(businessId, new ConcurrentLinkedQueue<>());
            onlineSessionClientMap.get(businessId).offer(session);//进队
            log.info("businessId {}, sessionId {} 长连接建立", businessId, session.getId());

            session.getUserProperties().put("creationTime", System.currentTimeMillis());
            sendToOne(false);
        }else{
            sendToOne(true);
        }
    }

    /**
     * 连接关闭调用的方法。由前端<code>socket.close()</code>触发
     *
     * @param businessId
     * @param session
     */
    @OnClose
    @ApiOperation(value = "连接关闭调用的方法。由前端<code>socket.close()</code>触发")
    public void onClose(@PathParam("businessId") String businessId, Session session) {
        // 从 Map中移除
        Queue<Session> sessions = onlineSessionClientMap.get(businessId);
        if(CollUtil.isNotEmpty(sessions)) {
            sessions.remove(session);
        }
        log.info("businessId {}, sessionId {} 长连接关闭", businessId, session.getId());
        //群发消息
        sendToAll(businessId);
    }

    /**
     * 收到客户端消息后调用的方法。由前端<code>socket.send</code>触发
     * * 当服务端执行toSession.getAsyncRemote().sendText(xxx)后,前端的socket.onmessage得到监听。
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
//        /**
//         * html界面传递来得数据格式,可以自定义.
//         * {"sid":"user-1","message":"hello websocket"}
//         */
//        JSONObject jsonObject = JSON.parseObject(message);
//        String toSid = jsonObject.getString("sid");
//        String msg = jsonObject.getString("message");
//        log.info("服务端收到客户端消息 ==> fromSid = {}, toSid = {}, message = {}", sid, toSid, message);
//
//        /**
//         * 模拟约定:如果未指定sid信息,则群发,否则就单独发送
//         */
//        if (toSid == null || toSid == "" || "".equalsIgnoreCase(toSid)) {
//            sendToAll(msg);
//        } else {
//            sendToOne(toSid, msg);
//        }
    }

    /**
     * 发生错误调用的方法
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        // 记录详细错误
        log.error("WebSocket发生错误,businessId={}", businessId, error);

        // 可选:尝试安全地通知客户端
        sendAsyncRemote(session, null, "异常信息:"+error.getMessage());
    }

    /**
     * 群发消息
     *
     */
    public void sendToAll(String businessId) {
        Queue<Session> sessions = onlineSessionClientMap.get(businessId);
        if(CollUtil.isNotEmpty(sessions))
        {
            Boolean taskSignal = taskSignalMap.getOrDefault(businessId, true);
            int[] indexHolder = {0}; // 用于模拟索引递增
            sessions.forEach(session -> {
                SocketResponse socketResponse = new SocketResponse();
                socketResponse.setNumber(++indexHolder[0]);
                socketResponse.setTaskSignal(BooleanUtil.isTrue(taskSignal));
                dealWithSocketResponse(socketResponse);

                boolean res = sendBasicRemote(session, businessId, JSON.toJSONString(socketResponse));

                if(!res)
                {//补位
                    indexHolder[0]--;
                }
            });
        }
    }

    /**
     * 指定发送消息 清理超时session
     *
     */
    private void sendToOne(boolean connectNumberOver) {
        Queue<Session> q = onlineSessionClientMap.get(businessId);
        int size = 1;
        if(q != null)
        {
            size = q.size();
        }
        SocketResponse socketResponse = new SocketResponse();
        socketResponse.setNumber(size);
        socketResponse.setOverMaxConnectNum(connectNumberOver);
        Boolean taskSignal = taskSignalMap.getOrDefault(businessId, true);
        socketResponse.setTaskSignal(BooleanUtil.isTrue(taskSignal));
        dealWithSocketResponse(socketResponse);

        //发送消息
        sendBasicRemote(session, businessId, JSON.toJSONString(socketResponse));
    }
    /**
     * 消息发送异常的清理掉连接
     * 仅仅移除元素
     * @param businessId
     * @param session
     */
    private void onCloseOnly(String businessId, Session session) {
        // 从 Map中移除
        Queue<Session> sessions = onlineSessionClientMap.get(businessId);
        if(CollUtil.isNotEmpty(sessions)) {
            sessions.remove(session);
        }
    }

    /**
     * 异步消息发送
     * 成功发送返回true,发送失败返回false
     */
    private boolean sendAsyncRemote(Session session, String businessId, String message) {
        boolean[] result = new boolean[1];
        result[0] = true;
        if (session != null && session.isOpen()) {
            session.getAsyncRemote().sendText(message, res -> {
                if(!res.isOK())
                {
                    result[0] = false;
                    onCloseOnly(businessId, session);
                    log.error("businessId {} 发送消息失败:{},消息内容 {}", businessId, res.getException().getMessage(), message);
                }
            });
        }

        return result[0];
    }

    /**
     * 同步消息发送
     * 成功发送返回true,发送失败返回false
     */
    private boolean sendBasicRemote(Session session, String businessId, String message) {
        boolean[] result = new boolean[1];
        result[0] = true;
        if (session != null && session.isOpen()) {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                result[0] = false;
                onCloseOnly(businessId, session);
                log.error("businessId {} 发送消息失败:{},消息内容 {}", businessId, e.getMessage(), message);
            }
        }

        return result[0];
    }

    /**
     * 指定发送消息
     *
     */
    private void sendToOneTimeOut(Session session, String businessId) {
        SocketResponse socketResponse = new SocketResponse();
        socketResponse.setTimeout(true);
        dealWithSocketResponse(socketResponse);

        //发送消息
        sendAsyncRemote(session, businessId, JSON.toJSONString(socketResponse));
    }

    /**
     * 完善响应体
     */
    private void dealWithSocketResponse(SocketResponse res) {
        if(BooleanUtil.isTrue(res.getOverMaxConnectNum())) {
            //当前页面连接数量过多
            res.setSignal(false);
            res.setRetrySignal(true);
            res.setMessage("当前页面连接过多,连接建立失败");
        } else if(BooleanUtil.isTrue(res.getTimeout())){
            //队列前面有人 不放行
            res.setSignal(false);
            res.setRetrySignal(true);
            res.setMessage("连接超时请退出重进");
        } else if(BooleanUtil.isFalse(res.getTaskSignal())) {
            //任务信号量不放行
            res.setSignal(false);
            res.setMessage("当前页面存在异步任务请稍后");
        }  else if(!Objects.equals(1, res.getNumber())){
            //队列前面有人 不放行
            res.setSignal(false);
            res.setMessage("当前排在第"+res.getNumber()+"位 请稍后");
        } else{
            //放行
            res.setSignal(true);
        }
    }

}
  1. 后台调用示例
//关闭通行
XXXWebSocketServer.taskSignalMap.put(data.getBusinessId(), false);
//消息广播
XXXWebSocketServer.sendToAll(data.getBusinessId());
//异步任务
commonTaskExecutor.submit(() -> {
    try {
    	//实际任务
    }catch (Exception e) {
        log.error("异步线程{}执行失败", Thread.currentThread().getName(), e);
    }finally{
    	//放开通行
        XXXWebSocketServer.taskSignalMap.put(data.getBusinessId(), true);
        //消息广播
        XXXWebSocketServer.sendToAll(data.getBusinessId());
    }
});

网站公告

今日签到

点亮在社区的每一天
去签到