1.纯Websocket实现消息发送
1.1一对一发送
前端
用户在输入框输入消息内容(
sendText
)选择特定接收用户(
sendUserId
)点击发送按钮触发
handlerSend
方法构造消息内容JSON:
{ text: "Hello", // 消息内容 toUserId: 123 // 目标用户ID }
包装为WebSocket标准格式:
{ type: "demo-message-send", // 消息类型 content: '{"text":"Hello","toUserId":123}' // 字符串化的内容 }
通过
send()
方法发送
- 前端在
setup
函数中,使用useWebSocket
方法,根据server
变量(WebSocket 服务地址)建立连接。server
地址由VITE_BASE_URL
(环境变量)、/infra/ws
路径和token
(通过getRefreshToken
获取)组成。 - 设置
autoReconnect
为true
,表示自动重连;heartbeat
为true
,表示开启心跳机制。
- 当用户在前端输入消息并点击发送按钮时,
handlerSend
函数被调用。 - 首先将发送内容
sendText
和接收用户sendUserId
进行 JSON 化处理,构建消息内容messageContent
。 - 然后将消息类型
type
(demo-message-send
)和消息内容messageContent
再次 JSON 化,形成最终的消息jsonMessage
。 - 最后使用
send
函数将jsonMessage
发送到后端。
const server = ref(
(import.meta.env.VITE_BASE_URL + '/infra/ws').replace('http', 'ws') +
'?token=' +
getRefreshToken() // 使用 getRefreshToken() 方法,而不使用 getAccessToken() 方法的原因:WebSocket 无法方便的刷新访问令牌
) // WebSocket 服务地址
const getIsOpen = computed(() => status.value === 'OPEN') // WebSocket 连接是否打开
const getTagColor = computed(() => (getIsOpen.value ? 'success' : 'red')) // WebSocket 连接的展示颜色
/** 发起 WebSocket 连接 */
const { status, data, send, close, open } = useWebSocket(server.value, {
autoReconnect: true,
heartbeat: true
})
/** 发送消息 */
const sendText = ref('') // 发送内容
const sendUserId = ref('') // 发送人
const handlerSend = () => {
// 1.1 先 JSON 化 message 消息内容
const messageContent = JSON.stringify({
text: sendText.value,
toUserId: sendUserId.value
})
// 1.2 再 JSON 化整个消息
const jsonMessage = JSON.stringify({
type: 'demo-message-send',
content: messageContent
})
// 2. 最后发送消息
send(jsonMessage)
sendText.value = ''
}
后端
- 注册监听器:
DemoWebSocketMessageListener
类通过实现WebSocketMessageListener<DemoSendMessage>
接口,并使用@Component
注解将自己注册为 Spring Bean。框架启动时会扫描所有实现了该接口的 Bean,并将它们注册到消息处理器中。
/**
* WebSocket 示例:单发消息
*/
@Component
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {
@Resource
private WebSocketMessageSender webSocketMessageSender;
@Override
public void onMessage(WebSocketSession session, DemoSendMessage message) {
Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);
// 情况一:单发
if (message.getToUserId() != null) {
DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId)
.setText(message.getText()).setSingle(true);
webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), // 给指定用户
"demo-message-receive", toMessage);
return;
}
// 情况二:群发
DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId)
.setText(message.getText()).setSingle(false);
webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 给所有用户
"demo-message-receive", toMessage);
}
@Override
public String getType() {
return "demo-message-send";
}
}
- 消息类型绑定:
getType()
方法返回"demo-message-send"
,这表明该监听器专门处理类型为"demo-message-send"
的消息。当后端接收到消息时,会根据消息类型路由到对应的监听器。
当 WebSocket 服务器接收到消息后:
- 消息解析:框架首先解析消息的 JSON 格式,提取
type
字段(如"demo-message-send"
)。 - 类型匹配:后端框架会自动将
type
为"demo-message-send"
的消息路由到DemoWebSocketMessageListener
的onMessage
方法。 - 调用回调:将消息反序列化为
DemoSendMessage
对象,并调用监听器的onMessage
方法。
/**
* JSON 格式 {@link WebSocketHandler} 实现类
* 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。
*/
@Slf4j
public class JsonWebSocketMessageHandler extends TextWebSocketHandler {
/**
* type 与 WebSocketMessageListener 的映射
* 用于存储不同消息类型对应的监听器,键为消息类型,值为对应的监听器实例
*/
private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();
@SuppressWarnings({"rawtypes", "unchecked"})
public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {
// 遍历传入的监听器列表
listenersList.forEach((Consumer<WebSocketMessageListener>)
listener -> {
// 将监听器的类型(通过 getType() 方法获取)作为键,监听器实例作为值,存入 listeners 映射中
listeners.put(listener.getType(), listener);
});
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 1.1 空消息,跳过
// 如果消息的负载长度为 0,说明是一个空消息,直接返回,不进行后续处理
if (message.getPayloadLength() == 0) {
return;
}
// 1.2 ping 心跳消息,直接返回 pong 消息。
// 如果消息的负载长度为 4 且负载内容为 "ping",则向客户端发送 "pong" 消息,表示响应心跳
if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {
session.sendMessage(new TextMessage("pong"));
return;
}
// 2.1 解析消息
try {
// 将文本消息的负载解析为 JsonWebSocketMessage 对象
JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);
// 如果解析后的消息为空,记录错误日志并返回,不进行后续处理
if (jsonMessage == null) {
log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload());
return;
}
// 如果解析后的消息类型为空,记录错误日志并返回,不进行后续处理
if (StrUtil.isEmpty(jsonMessage.getType())) {
log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload());
return;
}
// 2.2 获得对应的 WebSocketMessageListener
// 根据消息类型从 listeners 映射中获取对应的监听器
WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());
// 如果没有找到对应的监听器,记录错误日志并返回,不进行后续处理
if (messageListener == null) {
log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload());
return;
}
// 2.3 处理消息
// 获取监听器泛型参数类型
Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);
// 将消息内容解析为对应类型的对象
Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);
// 获取当前会话的租户 ID
Long tenantId = WebSocketFrameworkUtils.getTenantId(session);
// 执行租户相关的操作,调用监听器的 onMessage 方法处理消息
TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));
} catch (Throwable ex) {
// 如果在处理消息过程中发生异常,记录错误日志
log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload());
}
}
}
WebSocketMessageListener
之所以能监听消息,是因为:
- 接口契约:实现
WebSocketMessageListener
接口并指定消息类型(getType()
)。 - 框架支持:Spring 框架自动扫描并注册监听器,实现消息的解析和分发。
- 类型匹配:前端发送的消息
type
与后端监听器的getType()
一致,触发回调。
这个过程类似于 HTTP 请求的路由机制,只不过 WebSocket 是长连接,需要持续监听消息。
通常,WebSocket 框架(如 Spring WebSocket)会提供以下核心组件:
- 消息解码器:将二进制数据转换为 Java 对象(如
DemoSendMessage
)。 - 消息路由器:根据消息类型将消息路由到对应的监听器。
- 会话管理器:维护所有 WebSocket 会话(
WebSocketSession
),并提供获取用户信息的工具(如WebSocketFrameworkUtils.getLoginUserId
)。
- 后端的
DemoWebSocketMessageListener
类实现了WebSocketMessageListener
接口的onMessage
方法。 - 当有消息到达时,
onMessage
方法被调用,从WebSocketSession
中获取登录用户 ID(fromUserId
)。 - 根据消息中的
toUserId
判断是单发还是群发:- 如果
toUserId
不为空,则创建DemoReceiveMessage
对象,设置fromUserId
、text
和single
为true
,通过webSocketMessageSender
的sendObject
方法将消息发送给指定用户。 - 如果
toUserId
为空,则创建DemoReceiveMessage
对象,设置fromUserId
、text
和single
为false
,通过webSocketMessageSender
的sendObject
方法将消息发送给所有用户。
- 如果
JsonWebSocketMessageHandler
接收并解析消息根据
type="demo-message-send"
找到DemoWebSocketMessageListener
调用
onMessage
方法:从Session中获取发送者ID(
fromUserId
)检查
message.getToUserId()
不为null,进入单发逻辑
构造响应消息:
new DemoReceiveMessage() .setFromUserId(fromUserId) .setText(message.getText()) .setSingle(true)
通过
webSocketMessageSender
发送给指定用户:webSocketMessageSender.sendObject( UserTypeEnum.ADMIN.getValue(), // 用户类型 message.getToUserId(), // 目标用户ID "demo-message-receive", // 消息类型 toMessage // 消息内容 )
实际示例:
用户A(ID:100)发送"下午开会"给用户B(ID:101)
前端发送:
{"type":"demo-message-send","content":"{\"text\":\"下午开会\",\"toUserId\":101}"}
后端处理后发送给用户B:
{"type":"demo-message-receive","content":"{\"fromUserId\":100,\"text\":\"下午开会\",\"single\":true}"}
1.2一对多发送
前端
用户在输入框输入消息内容(
sendText
)不选择特定用户(或选择"所有人")
点击发送按钮触发
handlerSend
方法构造消息内容JSON:
{ text: "系统维护通知", // 消息内容 toUserId: "" // 空表示群发 }
包装为WebSocket标准格式并发送
后端
同上接收解析流程
onMessage
方法中检查message.getToUserId()
为null,进入群发逻辑构造响应消息:
new DemoReceiveMessage() .setFromUserId(fromUserId) .setText(message.getText()) .setSingle(false)
通过
webSocketMessageSender
发送给所有用户:webSocketMessageSender.sendObject( UserTypeEnum.ADMIN.getValue(), // 用户类型 "demo-message-receive", // 消息类型 toMessage // 消息内容 )
实际示例:
管理员发送"系统即将升级"给所有用户
前端发送:
{"type":"demo-message-send","content":"{\"text\":\"系统即将升级\",\"toUserId\":\"\"}"}
后端处理后广播:
{"type":"demo-message-receive","content":"{\"fromUserId\":1,\"text\":\"系统即将升级\",\"single\":false}"}