一、这个管理系统是基于若依框架,配置webSocKet的maven依赖
<!--websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
二、配置类配置webSocket的端点和相关的参数
1、WebSocketConfig - webSocket配置类
注意:ws://yourdomain:port/ws/order?token=yourTokenValue。
可以使用cpolar 工具把IP地址解析成可访问的域名。
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private WebSocketHandler webSocketHandler;
/**
* 注册websocket的端点
* 客户端连接格式: ws://yourdomain:port/ws/order?token=yourTokenValue
* token参数必须提供,系统会通过token从Redis获取对应的openId用于用户识别
* @param registry WebSocketHandlerRegistry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler, "/ws/order")
.setAllowedOrigins("*"); // 允许跨域访问
}
/**
* 配置WebSocket服务器的参数
* 包括:连接超时时间、心跳超时时间、最大消息大小等
* @return ServletServerContainerFactoryBean
*/
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
// 设置异步发送超时时间为25秒
container.setAsyncSendTimeout(25000L);
// 设置最大会话空闲时间为60秒
container.setMaxSessionIdleTimeout(60000L);
// 设置最大文本消息缓冲区大小为8KB
container.setMaxTextMessageBufferSize(8192);
// 设置最大二进制消息缓冲区大小为8KB
container.setMaxBinaryMessageBufferSize(8192);
return container;
}
}
2、WebSocketHandler - webSocket处理器
@Component
@Slf4j
public class WebSocketHandler extends TextWebSocketHandler {
@Autowired
private StringRedisTemplate stringRedisTemplate;
// 用线程安全的集合来管理所有连接的 WebSocket 会话
private static final Set<WebSocketSession> sessions = new CopyOnWriteArraySet<>();
// 使用ConcurrentHashMap来存储openId到session的映射关系
private static final Map<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();
// 使用ConcurrentHashMap来存储session到openId的映射关系(反向映射)
private static final Map<WebSocketSession, String> sessionUsers = new ConcurrentHashMap<>();
// 记录每个session最后一次活跃时间
private static final Map<String, Long> sessionLastActiveTime = new ConcurrentHashMap<>();
// 心跳检查的定时任务执行器
private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
// 心跳超时时间,单位毫秒
private static final long HEARTBEAT_TIMEOUT = 50000L; // 50秒
// 用于解析JSON的对象映射器
private static final ObjectMapper objectMapper = new ObjectMapper();
/**
* 构造方法,启动心跳检测任务
*/
public WebSocketHandler() {
// 每15秒检查一次心跳
heartbeatScheduler.scheduleAtFixedRate(this::checkHeartbeats, 15, 15, TimeUnit.SECONDS);
}
/**
* 心跳检查方法,清理那些超时的连接
*/
private void checkHeartbeats() {
long currentTime = System.currentTimeMillis();
for (Map.Entry<String, Long> entry : sessionLastActiveTime.entrySet()) {
String openId = entry.getKey();
long lastActive = entry.getValue();
// 如果超过超时时间没有活动,则关闭会话
if (currentTime - lastActive > HEARTBEAT_TIMEOUT) {
WebSocketSession session = userSessions.get(openId);
if (session != null && session.isOpen()) {
try {
log.warn("会话心跳超时,主动断开连接 - openId: {}, 上次活跃: {}ms前",
openId, currentTime - lastActive);
session.close(CloseStatus.NORMAL);
} catch (IOException e) {
log.error("关闭超时WebSocket会话异常 - openId: {}, 错误: {}", openId, e.getMessage());
} finally {
// 确保从会话映射中移除
sessions.remove(session);
sessionUsers.remove(session);
userSessions.remove(openId);
sessionLastActiveTime.remove(openId);
}
} else {
// 会话已关闭或不存在,直接清理
userSessions.remove(openId);
sessionLastActiveTime.remove(openId);
}
}
}
}
/**
* 新客户端连接时,加入到 sessions 集合中
* @param session 会话
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
// 从URL中获取token参数,格式应为 /ws/order?token=xxx
String token = extractToken(session);
if (token != null) {
// 从Redis中获取对应的openId
String openId = getOpenIdFromToken(token);
if (openId != null) {
userSessions.put(openId, session);
sessionUsers.put(session, openId);
sessionLastActiveTime.put(openId, System.currentTimeMillis()); // 记录初始活跃时间
log.info("WebSocket连接已建立 - token: {}, openId: {}, 当前连接数: {}",
token, openId, sessions.size());
} else {
log.warn("找不到token对应的openId,token可能已过期 - token: {}", token);
// 可以选择关闭这个无效的连接
session.close(CloseStatus.NOT_ACCEPTABLE);
}
} else {
log.warn("WebSocket连接未提供token参数,无法识别用户");
// 可以选择关闭这个无效的连接
session.close(CloseStatus.NOT_ACCEPTABLE);
}
}
/**
* 客户端断开连接时,从 sessions 集合中移除
* @param session 会话
* @param status
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
// 从用户会话映射中也移除
String openId = sessionUsers.remove(session);
if (openId != null) {
userSessions.remove(openId);
sessionLastActiveTime.remove(openId);
log.info("WebSocket连接已关闭 - openId: {}, 状态: {}", openId, status);
}
}
/**
* 处理收到的文本消息
* 对于心跳消息进行特殊处理
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String openId = sessionUsers.get(session);
String payload = message.getPayload();
try {
// 尝试解析为JSON
JsonNode jsonNode = objectMapper.readTree(payload);
// 检查是否是心跳消息
if (jsonNode.has("type") && "ping".equals(jsonNode.get("type").asText())) {
// 更新最后活跃时间
if (openId != null) {
sessionLastActiveTime.put(openId, System.currentTimeMillis());
}
// 发送pong响应
session.sendMessage(new TextMessage("{\"type\":\"pong\",\"time\":" + System.currentTimeMillis() + "}"));
return;
}
} catch (Exception e) {
// 不是JSON格式的消息,忽略错误继续处理
}
// 更新最后活跃时间
if (openId != null) {
sessionLastActiveTime.put(openId, System.currentTimeMillis());
}
log.debug("收到消息 - openId: {}, 内容: {}", openId, payload);
// 在这里可以添加其他消息处理逻辑
}
/**
* 从WebSocketSession中提取token
* @param session WebSocket会话
* @return token,如果不存在则返回null
*/
private String extractToken(WebSocketSession session) {
String query = session.getUri().getQuery();
if (query != null && query.contains("token=")) {
String[] params = query.split("&");
for (String param : params) {
String[] keyValue = param.split("=");
if (keyValue.length == 2 && "token".equals(keyValue[0])) {
log.info("WebSocket连接已获取token - token: {}", keyValue[1]);
return keyValue[1];
}
}
}
return null;
}
/**
* 从token获取对应的openId
* @param token 用户token
* @return openId,如果token无效则返回null
*/
private String getOpenIdFromToken(String token) {
if (token == null || token.isEmpty()) {
return null;
}
try {
// 从Redis中获取token对应的openId
return stringRedisTemplate.opsForValue().get(WECHAT_KEY + token);
} catch (Exception e) {
log.error("从Redis获取token信息异常 - token: {}, 错误: {}", token, e.getMessage());
return null;
}
}
/**
* 发送支付成功的通知给所有连接的客户端
* @param message 消息体
*/
public void sendPaymentSuccessNotification(String message) {
for (WebSocketSession session : sessions) {
try {
// 通过 WebSocket 向每个客户端发送消息
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
log.error("发送支付成功通知失败", e);
}
}
}
/**
* 向指定用户发送消息
* @param openId 用户的openId
* @param message 消息内容
* @return 是否发送成功
*/
public boolean sendMessageToUser(String openId, String message) {
WebSocketSession session = userSessions.get(openId);
if (session != null && session.isOpen()) {
try {
session.sendMessage(new TextMessage(message));
log.info("消息已发送给用户 - openId: {}", openId);
return true;
} catch (IOException e) {
log.error("发送消息给用户失败 - openId: {}", openId, e);
return false;
}
} else {
log.info("用户未通过WebSocket连接 - openId: {}", openId);
return false;
}
}
/**
* 向所有用户发送心跳检测消息
*/
public void sendHeartbeat() {
String heartbeatMsg = "{\"type\":\"heartbeat\",\"time\":" + System.currentTimeMillis() + "}";
for (WebSocketSession session : sessions) {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(heartbeatMsg));
} catch (IOException e) {
log.error("发送心跳消息失败", e);
}
}
}
}
}
注意:这里发送消息给指定用户需要前端传递token,获取存储在redis中的openId(微信小程序用户标识)
3、发送消息我定义了一个定时器发送消息和心跳测试
3.1、根据自己业务封装的消息体
@ApiModel(value = "MessageVo",discriminator = "websocket的消息体")
public class MessageVo {
@ApiModelProperty(value = "消息标题",dataType = "string")
private String title;
@ApiModelProperty(value = "消息内容",dataType = "string")
private String content;
@ApiModelProperty(value = "车牌号码",dataType = "string")
private String plateNumber;
@ApiModelProperty(value = "订单编号",dataType = "string")
private String orderNumber;
@ApiModelProperty(value = "创建时间",dataType = "date")
private Date createTime;
}
/**
* 定时发送提醒消息给待过磅状态的用户
* 每1分钟执行一次,提醒用户进行过磅操作
*/
public void sendWeighingReminder() {
log.info("开始执行待过磅用户提醒任务");
try {
// 查询所有待过磅的订单
WeighingRecords pendingQuery = new WeighingRecords();
pendingQuery.setStatus(0L); // 待过磅
List<WeighingRecords> pendingWeighingOrders = weighingRecordsMapper.selectWeighingRecordsList(pendingQuery);
// 如果没有待过磅订单,直接返回
if (pendingWeighingOrders == null || pendingWeighingOrders.isEmpty()) {
log.info("没有查询到待过磅订单,跳过发送提醒");
return;
}
log.info("查询到 {} 条待过磅订单,开始发送提醒", pendingWeighingOrders.size());
int successCount = 0;
// 遍历所有待过磅订单,发送提醒消息
for (WeighingRecords order : pendingWeighingOrders) {
// 检查是否有有效的openId
String openId = order.getOpenId();
if (openId == null || openId.trim().isEmpty()) {
log.warn("订单 {} 缺少有效的openId,无法发送提醒", order.getOrderNumber());
continue;
}
// 创建消息体
MessageVo messageVo = new MessageVo();
messageVo.setTitle("过磅提醒");
messageVo.setContent("您有一条待过磅的订单,请及时前往过磅点进行过磅操作。");
messageVo.setOrderNumber(order.getOrderNumber());
messageVo.setPlateNumber(order.getPlateNumber()); // 设置车牌号
messageVo.setCreateTime(DateUtils.getNowDate());
try {
// 转换为JSON字符串
String messageJson = objectMapper.writeValueAsString(messageVo);
// 直接使用openId发送消息(WebSocketHandler内部会通过openId查找对应的会话)
boolean sent = webSocketHandler.sendMessageToUser(openId, messageJson);
if (sent) {
successCount++;
log.info("成功向用户 {} 发送过磅提醒消息,订单号: {}", openId, order.getOrderNumber());
} else {
log.info("用户 {} 未连接WebSocket,无法发送过磅提醒消息,订单号: {}", openId, order.getOrderNumber());
}
} catch (JsonProcessingException e) {
log.error("消息序列化异常,订单号: {}, 错误: {}", order.getOrderNumber(), e.getMessage());
} catch (Exception e) {
log.error("发送消息异常,订单号: {}, 错误: {}", order.getOrderNumber(), e.getMessage());
}
}
log.info("过磅提醒任务完成,共尝试: {} 条,成功: {} 条", pendingWeighingOrders.size(), successCount);
} catch (Exception e) {
log.error("过磅提醒任务异常: {}", e.getMessage(), e);
}
}
/**
* 定期发送心跳消息,保持WebSocket连接活跃
* 每25秒执行一次,低于WebSocketConfig中设置的60秒超时时间
*/
public void sendHeartbeat() {
log.debug("开始执行WebSocket心跳任务");
try {
webSocketHandler.sendHeartbeat();
log.debug("WebSocket心跳消息发送完成");
} catch (Exception e) {
log.error("WebSocket心跳任务异常: {}", e.getMessage(), e);
}
}
4、由于这个管理系统是基于若依所以需要配置鉴权,否则会被拦截
这个是部分配置代码
@Bean
protected SecurityFilterChain filterChain(HttpSecurity httpSecurity) throws Exception
{
return httpSecurity
// CSRF禁用,因为不使用session
.csrf(csrf -> csrf.disable())
// 禁用HTTP响应标头
.headers((headersCustomizer) -> {
headersCustomizer.cacheControl(cache -> cache.disable()).frameOptions(options -> options.sameOrigin());
})
// 认证失败处理类
.exceptionHandling(exception -> exception.authenticationEntryPoint(unauthorizedHandler))
// 基于token,所以不需要session
.sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
// 注解标记允许匿名访问的url
.authorizeHttpRequests((requests) -> {
permitAllUrl.getUrls().forEach(url -> requests.antMatchers(url).permitAll());
// 对于登录login 注册register 验证码captchaImage 允许匿名访问
requests.antMatchers("/login", "/register", "/captchaImage",
"/weiXin/login","/weiXin/returnNotify","/ws/**").permitAll()
..........
}
注意:端点配置的是“/ws/order",所以在这了配置为”/ws/**“
三、小程序端的部分代码配置
注意:需要在路径上面传递token,为了后端获取openId向指定用户发送消息
这个是小程序的webSocket的地址示例:“wss://5aa7e45c.r11.cpolar.top/ws/order?token=${this.token}”