websocket用于控制用户进入当前页的可操作性
需求
页面只允许一个用户操作,后续进入的用户进行排队。
功能描述
- 用户进入当前页,排在第一位则放行操作。
- 用户进入当前页,不是排在第一位则等待。
- 第一位用户操作完毕后,关闭页面,第一位用户弹出队列,许可第二位用户进行操作。
- 页面占用有时间限制,超出阈值,用户需要退出重进,提示超时。
- 页面排队数量限制,若用户进入页面,超出排队长度,则提示队列超长。
使用场景
从数据台账进入某一条数据,该数据页面用户可操作。操作后后台存在异步任务,在该条数据存在后台任务时,不允许用户操作。一条数据只允许第一个进入该数据操作页面的用户对其进行操作,其他用户需要等待,直到自己排在第一位时,放行操作。
代码实现
- 消息实体类
@Data
public class SocketResponse {
//当前排队位次 1 表示第一位 可以放行操作
private Integer number;
//异步任务信号量 true表示无异步任务可放心 false表示有异步任务不放行操作
private Boolean taskSignal;
//超时信号量 true表示超时 false 表示未超时
private Boolean timeout;
//超过最大连接数
private Boolean overMaxConnectNum;
//重试信号 true退出重试
private Boolean retrySignal;
//放行信号 true放行 false 不放行
private Boolean signal;
//当不能放行时给出原因
private String message;
}
- 配置参数读取
从yml中读取相关配置参数
@Data
@Component
@ConfigurationProperties(prefix = "xxx.web-socket")
public class WebSocketProperties {
private int timeout;
private int maxConnect;
private int scheduleCleanDuration;
}
- 参数配置示例
xxx:
web-socket:
schedule-clean-duration: 15 #单位是分钟
timeout: 24 #超时时间 单位 小时
max-connect: 10 #每个表单实例允许的最大连接数量
- 长连接控制器
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson2.JSON;
import com.xxx.WebSocketProperties;
import com.xxx.SocketResponse;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* WebSocket的操作类
*/
@Component
@Slf4j
/**
* html页面与之关联的接口
* var reqUrl = "http://localhost:8081/websocket/" + cid;
* socket = new WebSocket(reqUrl.replace("http", "ws"));
*/
@ServerEndpoint("/xxxWebsocket/{businessId}")
public class XXXWebSocketServer {
public static final Long ONEHOURMILLISECONDS = 60 * 60 * 1000L;
/**
* 存放所有在线的客户端
*/
private static final Map<String, Queue<Session>> onlineSessionClientMap = new ConcurrentHashMap<>();
//读取配置参数 按需自行设计存取方式
//我这里是存在yml文件中,参数有 session超时清理定时任务执行时间间隔,占用超时时间,单页面最大排队数
private static final WebSocketProperties properties = SpringUtil.getBean(WebSocketProperties.class);
//后台任务执行标记 key为businessId 值为放行信号量 无任务在执行放行 true 有任务在不放行 false
public static final Map<String, Boolean> taskSignalMap = new ConcurrentHashMap<>();
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
static{
scheduler.scheduleAtFixedRate(() -> {
log.info("长连接定时任务清理启动");
WebSocketServer server = SpringUtil.getBean(WebSocketServer.class);
AtomicInteger number = new AtomicInteger();
try {
onlineSessionClientMap.forEach((businessId, sessions) -> {
sessions.removeIf(session -> {
long currentTime = System.currentTimeMillis();
long createTime = (long) session.getUserProperties().getOrDefault("creationTime", 0L);
if(currentTime - createTime > properties.getTimeout() * ONEHOURMILLISECONDS)
{
number.getAndIncrement();
server.sendToOneTimeOut(session, businessId);
return true;
}
return false;
});
if(sessions.isEmpty()){
onlineSessionClientMap.remove(businessId);
}
});
} catch (Exception e) {
log.error("长连接定时任务执行异常", e);
}
log.info("长连接定时任务清理完成, 清理连接数量:{}", number);
}, 0, properties.getScheduleCleanDuration(), TimeUnit.MINUTES);
}
/**
* 连接sid和连接会话
*/
private String businessId;
private Session session;
/**
* 连接建立成功调用的方法。由前端<code>new WebSocket</code>触发
*
* @param businessId 每次页面建立连接时传入到服务端的id,比如用户id等。可以自定义。
* @param session 与某个客户端的连接会话,需要通过它来给客户端发送消息
*/
@OnOpen
@ApiOperation(value = "连接建立成功调用的方法。由前端new WebSocket触发")
public void onOpen(@PathParam("businessId") String businessId, Session session) {
Queue<Session> q = onlineSessionClientMap.get(businessId);
this.businessId = businessId;
this.session = session;
if(q == null || q.size() < properties.getMaxConnect()){
onlineSessionClientMap.putIfAbsent(businessId, new ConcurrentLinkedQueue<>());
onlineSessionClientMap.get(businessId).offer(session);//进队
log.info("businessId {}, sessionId {} 长连接建立", businessId, session.getId());
session.getUserProperties().put("creationTime", System.currentTimeMillis());
sendToOne(false);
}else{
sendToOne(true);
}
}
/**
* 连接关闭调用的方法。由前端<code>socket.close()</code>触发
*
* @param businessId
* @param session
*/
@OnClose
@ApiOperation(value = "连接关闭调用的方法。由前端<code>socket.close()</code>触发")
public void onClose(@PathParam("businessId") String businessId, Session session) {
// 从 Map中移除
Queue<Session> sessions = onlineSessionClientMap.get(businessId);
if(CollUtil.isNotEmpty(sessions)) {
sessions.remove(session);
}
log.info("businessId {}, sessionId {} 长连接关闭", businessId, session.getId());
//群发消息
sendToAll(businessId);
}
/**
* 收到客户端消息后调用的方法。由前端<code>socket.send</code>触发
* * 当服务端执行toSession.getAsyncRemote().sendText(xxx)后,前端的socket.onmessage得到监听。
*
* @param message
* @param session
*/
@OnMessage
public void onMessage(String message, Session session) {
// /**
// * html界面传递来得数据格式,可以自定义.
// * {"sid":"user-1","message":"hello websocket"}
// */
// JSONObject jsonObject = JSON.parseObject(message);
// String toSid = jsonObject.getString("sid");
// String msg = jsonObject.getString("message");
// log.info("服务端收到客户端消息 ==> fromSid = {}, toSid = {}, message = {}", sid, toSid, message);
//
// /**
// * 模拟约定:如果未指定sid信息,则群发,否则就单独发送
// */
// if (toSid == null || toSid == "" || "".equalsIgnoreCase(toSid)) {
// sendToAll(msg);
// } else {
// sendToOne(toSid, msg);
// }
}
/**
* 发生错误调用的方法
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
// 记录详细错误
log.error("WebSocket发生错误,businessId={}", businessId, error);
// 可选:尝试安全地通知客户端
sendAsyncRemote(session, null, "异常信息:"+error.getMessage());
}
/**
* 群发消息
*
*/
public void sendToAll(String businessId) {
Queue<Session> sessions = onlineSessionClientMap.get(businessId);
if(CollUtil.isNotEmpty(sessions))
{
Boolean taskSignal = taskSignalMap.getOrDefault(businessId, true);
int[] indexHolder = {0}; // 用于模拟索引递增
sessions.forEach(session -> {
SocketResponse socketResponse = new SocketResponse();
socketResponse.setNumber(++indexHolder[0]);
socketResponse.setTaskSignal(BooleanUtil.isTrue(taskSignal));
dealWithSocketResponse(socketResponse);
boolean res = sendBasicRemote(session, businessId, JSON.toJSONString(socketResponse));
if(!res)
{//补位
indexHolder[0]--;
}
});
}
}
/**
* 指定发送消息 清理超时session
*
*/
private void sendToOne(boolean connectNumberOver) {
Queue<Session> q = onlineSessionClientMap.get(businessId);
int size = 1;
if(q != null)
{
size = q.size();
}
SocketResponse socketResponse = new SocketResponse();
socketResponse.setNumber(size);
socketResponse.setOverMaxConnectNum(connectNumberOver);
Boolean taskSignal = taskSignalMap.getOrDefault(businessId, true);
socketResponse.setTaskSignal(BooleanUtil.isTrue(taskSignal));
dealWithSocketResponse(socketResponse);
//发送消息
sendBasicRemote(session, businessId, JSON.toJSONString(socketResponse));
}
/**
* 消息发送异常的清理掉连接
* 仅仅移除元素
* @param businessId
* @param session
*/
private void onCloseOnly(String businessId, Session session) {
// 从 Map中移除
Queue<Session> sessions = onlineSessionClientMap.get(businessId);
if(CollUtil.isNotEmpty(sessions)) {
sessions.remove(session);
}
}
/**
* 异步消息发送
* 成功发送返回true,发送失败返回false
*/
private boolean sendAsyncRemote(Session session, String businessId, String message) {
boolean[] result = new boolean[1];
result[0] = true;
if (session != null && session.isOpen()) {
session.getAsyncRemote().sendText(message, res -> {
if(!res.isOK())
{
result[0] = false;
onCloseOnly(businessId, session);
log.error("businessId {} 发送消息失败:{},消息内容 {}", businessId, res.getException().getMessage(), message);
}
});
}
return result[0];
}
/**
* 同步消息发送
* 成功发送返回true,发送失败返回false
*/
private boolean sendBasicRemote(Session session, String businessId, String message) {
boolean[] result = new boolean[1];
result[0] = true;
if (session != null && session.isOpen()) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
result[0] = false;
onCloseOnly(businessId, session);
log.error("businessId {} 发送消息失败:{},消息内容 {}", businessId, e.getMessage(), message);
}
}
return result[0];
}
/**
* 指定发送消息
*
*/
private void sendToOneTimeOut(Session session, String businessId) {
SocketResponse socketResponse = new SocketResponse();
socketResponse.setTimeout(true);
dealWithSocketResponse(socketResponse);
//发送消息
sendAsyncRemote(session, businessId, JSON.toJSONString(socketResponse));
}
/**
* 完善响应体
*/
private void dealWithSocketResponse(SocketResponse res) {
if(BooleanUtil.isTrue(res.getOverMaxConnectNum())) {
//当前页面连接数量过多
res.setSignal(false);
res.setRetrySignal(true);
res.setMessage("当前页面连接过多,连接建立失败");
} else if(BooleanUtil.isTrue(res.getTimeout())){
//队列前面有人 不放行
res.setSignal(false);
res.setRetrySignal(true);
res.setMessage("连接超时请退出重进");
} else if(BooleanUtil.isFalse(res.getTaskSignal())) {
//任务信号量不放行
res.setSignal(false);
res.setMessage("当前页面存在异步任务请稍后");
} else if(!Objects.equals(1, res.getNumber())){
//队列前面有人 不放行
res.setSignal(false);
res.setMessage("当前排在第"+res.getNumber()+"位 请稍后");
} else{
//放行
res.setSignal(true);
}
}
}
- 后台调用示例
//关闭通行
XXXWebSocketServer.taskSignalMap.put(data.getBusinessId(), false);
//消息广播
XXXWebSocketServer.sendToAll(data.getBusinessId());
//异步任务
commonTaskExecutor.submit(() -> {
try {
//实际任务
}catch (Exception e) {
log.error("异步线程{}执行失败", Thread.currentThread().getName(), e);
}finally{
//放开通行
XXXWebSocketServer.taskSignalMap.put(data.getBusinessId(), true);
//消息广播
XXXWebSocketServer.sendToAll(data.getBusinessId());
}
});