java基础完结.最后补充一下 WebSocket
原生使用TCP实现Websocket
Websocket全双工通讯的协议 借助于Http协议进行连接,当客户端连接到服务端的时候会向服务端发送一个类似下面的HTTP报文:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
这是一个HTTP的get请求报文,注意该报文中有一个upgrade首部,它的作用是告诉服务端需要将通信协议切换到websocket。
如果服务端支持websocket协议,那么它就会将自己的通信协议切换到websocket,同时发给客户端类似于以下的一个响应报文头:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
返回的状态码为101,表示同意客户端协议转换请求,并将它转换为websocket协议。以上过程都是利用HTTP通信完成的,称之为websocket协议握手。
实现步骤
阶段一、客户端通过 HTTP 协议发送包含特殊头部的请求,触发协议升级:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
- Upgrade: websocket明确请求升级协议。
- Sec-WebSocket-Key:客户端生成的随机字符串,用于安全验证。
- Sec-WebSocket-Version:指定协议版本(RFC 6455 规定为 13)。
阶段二、服务器端进行响应确认,返回 101 Switching Protocols 响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
- Sec-WebSocket-Accept:服务器将客户端的 Sec-WebSocket-Key 与固定字符串拼接后,计算 SHA-1 哈希并进行 Base64 编码,生成验证令牌。
阶段三、此时 TCP 连接从 HTTP 升级为 WebSocket 协议,后续数据可通过二进制帧进行传输。
阶段四、数据传输,WebSocket是一种全双工通信协议,客户端与服务端可同时发送/接收数据,无需等待对方请求,数据帧是以二进制格式进行传输的。
如下图所示:
- FIN (1 bit):标记是否为消息的最后一个分片。
- Opcode (4 bits):定义数据类型(如文本 0x1、二进制 0x2、关闭连接 0x8、Ping 0x9、Pong 0xA)。
- Mask (1 bit):客户端发送的数据需掩码处理(防止缓存污染攻击),服务端发送的数据无需掩码。
- Payload Length (7 or 7+16 or 7+64 bits):帧内容的长度,支持最大 2^64-1 字节。
- Masking-key(32 bits),掩码密钥,由上面的标志位 MASK 决定的,如果使用掩码就是 4 个字节的随机数,否则就不存在。
- payload data 字段:这里存放的就是真正要传输的数据
阶段五、连接关闭,客户端或服务器端都可以发起关闭。
示例代码
前端代码:
<!DOCTYPE html>
<html>
<body>
<input type="text" id="messageInput" placeholder="输入消息">
<button onclick="sendMessage()">发送</button>
<div id="messages"></div>
<script>
// 创建 WebSocket 连接
const socket = new WebSocket('ws://localhost:8080');
// 连接打开时触发
socket.addEventListener('open', () => {
logMessage('连接已建立');
});
// 接收消息时触发
socket.addEventListener('message', (event) => {
logMessage('收到消息: ' + event.data);
});
// 连接关闭时触发
socket.addEventListener('close', () => {
logMessage('连接已关闭');
});
// 错误处理
socket.addEventListener('error', (error) => {
logMessage('连接错误: ' + error.message);
});
// 发送消息
function sendMessage() {
const message = document.getElementById('messageInput').value;
socket.send(message);
logMessage('发送消息: ' + message);
}
// 日志输出
function logMessage(message) {
const messagesDiv = document.getElementById('messages');
const p = document.createElement('p');
p.textContent = message;
messagesDiv.appendChild(p);
}
</script>
</body>
</html>
java代码:
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class WebSocketServer {
private static final int PORT = 8080;
private static final String WEBSOCKET_KEY_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
private final ExecutorService threadPool = Executors.newCachedThreadPool();
private ServerSocket serverSocket;
private boolean running = false;
public static void main(String[] args) {
WebSocketServer server = new WebSocketServer();
server.start();
}
public void start() {
try {
serverSocket = new ServerSocket(PORT);
running = true;
System.out.println("WebSocket服务器已启动,监听端口: " + PORT);
while (running) {
Socket clientSocket = serverSocket.accept();
threadPool.execute(() -> handleClient(clientSocket));
}
} catch (IOException e) {
if (running) {
System.err.println("服务器启动失败: " + e.getMessage());
}
}
}
public void stop() {
running = false;
try {
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
threadPool.shutdown();
System.out.println("服务器已停止");
} catch (IOException e) {
System.err.println("关闭服务器时出错: " + e.getMessage());
}
}
private void handleClient(Socket clientSocket) {
try (InputStream in = clientSocket.getInputStream();
OutputStream out = clientSocket.getOutputStream()) {
// 读取HTTP握手请求
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
StringBuilder request = new StringBuilder();
String line;
while ((line = reader.readLine()) != null && !line.isEmpty()) {
request.append(line).append("\r\n");
}
System.out.println("收到握手请求:\n" + request);
// 提取WebSocket密钥
String key = extractWebSocketKey(request.toString());
if (key == null) {
System.out.println("不是WebSocket握手请求");
return;
}
// 生成响应密钥
String responseKey = generateResponseKey(key);
// 发送HTTP升级响应
String response = "HTTP/1.1 101 Switching Protocols\r\n" +
"Upgrade: websocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Accept: " + responseKey + "\r\n\r\n";
out.write(response.getBytes(StandardCharsets.UTF_8));
out.flush();
System.out.println("发送握手响应");
// 开始WebSocket通信
communicateWebSocket(clientSocket, in, out);
} catch (IOException e) {
System.err.println("处理客户端时出错: " + e.getMessage());
} finally {
try {
clientSocket.close();
} catch (IOException e) {
System.err.println("关闭客户端连接时出错: " + e.getMessage());
}
}
}
private String extractWebSocketKey(String request) {
String[] lines = request.split("\r\n");
for (String line : lines) {
if (line.startsWith("Sec-WebSocket-Key:")) {
return line.substring("Sec-WebSocket-Key:".length()).trim();
}
}
return null;
}
private String generateResponseKey(String key) {
try {
String concatenated = key + WEBSOCKET_KEY_MAGIC;
MessageDigest sha1 = MessageDigest.getInstance("SHA-1");
byte[] hash = sha1.digest(concatenated.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(hash);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("SHA-1算法不可用", e);
}
}
private void communicateWebSocket(Socket clientSocket, InputStream in, OutputStream out) throws IOException {
while (clientSocket.isConnected()) {
// 读取WebSocket帧
byte[] header = new byte[2];
if (in.read(header) != 2) {
break;
}
boolean fin = (header[0] & 0x80) != 0;
int opcode = header[0] & 0x0F;
boolean masked = (header[1] & 0x80) != 0;
long payloadLength = header[1] & 0x7F;
if (payloadLength == 126) {
byte[] extended = new byte[2];
if (in.read(extended) != 2) {
break;
}
payloadLength = ((extended[0] & 0xFF) << 8) | (extended[1] & 0xFF);
} else if (payloadLength == 127) {
byte[] extended = new byte[8];
if (in.read(extended) != 8) {
break;
}
payloadLength = 0;
for (int i = 0; i < 8; i++) {
payloadLength = (payloadLength << 8) | (extended[i] & 0xFF);
}
}
byte[] maskingKey = new byte[4];
if (masked && in.read(maskingKey) != 4) {
break;
}
byte[] payload = new byte[(int) payloadLength];
if (in.read(payload) != payloadLength) {
break;
}
// 解掩码
if (masked) {
for (int i = 0; i < payloadLength; i++) {
payload[i] = (byte) (payload[i] ^ maskingKey[i % 4]);
}
}
// 处理控制帧
if (opcode == 8) { // 关闭帧
System.out.println("收到关闭帧");
sendCloseFrame(out);
break;
} else if (opcode == 9) { // ping帧
System.out.println("收到ping帧");
sendPong(out, payload);
} else if (opcode == 1) { // 文本帧
String message = new String(payload, StandardCharsets.UTF_8);
System.out.println("收到消息: " + message);
sendMessage(out, "服务器收到: " + message);
}
}
}
private void sendMessage(OutputStream out, String message) throws IOException {
byte[] payload = message.getBytes(StandardCharsets.UTF_8);
int payloadLength = payload.length;
// 构建WebSocket帧
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// 第一个字节: FIN=1, opcode=1(文本帧)
baos.write(0x81);
// 第二个字节: 负载长度
if (payloadLength <= 125) {
baos.write(payloadLength);
} else if (payloadLength <= 65535) {
baos.write(126);
baos.write((payloadLength >> 8) & 0xFF);
baos.write(payloadLength & 0xFF);
} else {
baos.write(127);
for (int i = 7; i >= 0; i--) {
baos.write((int) ((payloadLength >> (i * 8)) & 0xFF));
}
}
// 写入负载数据
baos.write(payload);
// 发送帧
out.write(baos.toByteArray());
out.flush();
}
private void sendPong(OutputStream out, byte[] payload) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// 第一个字节: FIN=1, opcode=10(乒乓帧)
baos.write(0x8A);
// 第二个字节: 负载长度
if (payload.length <= 125) {
baos.write(payload.length);
} else {
throw new IOException("Ping负载太长,不支持");
}
// 写入负载数据
baos.write(payload);
// 发送帧
out.write(baos.toByteArray());
out.flush();
}
private void sendCloseFrame(OutputStream out) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// 第一个字节: FIN=1, opcode=8(关闭帧)
baos.write(0x88);
// 第二个字节: 负载长度为 0
baos.write(0x00);
out.write(baos.toByteArray());
out.flush();
}
}
Tomcat 实现 WebSocket
Tomcat 实现 WebSocket 主要基于 Java WebSocket API(JSR 356),并采用了高效的非阻塞 I/O 模型来处理大量并发连接,而非传统的线程池模型。
1. Tomcat 的 WebSocket 实现原理
1.1 基于 NIO 的非阻塞 I/O 模型
Tomcat 自 7.0 版本后引入了 NIO 连接器(org.apache.coyote.http11.Http11NioProtocol
),并在 8.0 + 版本中默认使用 NIO2(Http11Nio2Protocol
)。这些连接器使用单线程管理多个连接,通过事件驱动的方式处理 I/O 操作,避免了传统线程池的局限性。
- Selector 模式:Tomcat 使用
java.nio.channels.Selector
监听多个SocketChannel
的读写事件,一个线程可以管理数千个连接。 - 异步处理:WebSocket 连接建立后,数据读写通过异步方式进行,不会阻塞线程。
1.2 WebSocket 握手与协议升级
当客户端发起 WebSocket 握手请求(HTTP 1.1 + Upgrade: websocket
头)时,Tomcat 会:
- 解析 HTTP 请求,验证
Sec-WebSocket-Key
等头信息。 - 生成
Sec-WebSocket-Accept
响应头,完成协议升级。 - 将连接从 HTTP 处理器移交到 WebSocket 处理器(如
WsFrameServer
)。
1.3 生命周期管理
Tomcat 为每个 WebSocket 连接创建一个Session
对象,并通过Endpoint
接口管理生命周期:
onOpen()
:连接建立时触发。onMessage()
:收到消息时触发。onClose()
:连接关闭时触发。onError()
:发生错误时触发。
Tomcat 的 WebSocket 实现通过非阻塞 I/O + 少量线程(如 1 个 Acceptor 线程 + N 个 Selector 线程)即可管理大量连接,显著提升吞吐量。
2. Tomcat 的线程模型
Tomcat 的 WebSocket 处理涉及三类线程:
- Acceptor 线程:接收新连接,将其注册到 Selector。
- Selector 线程(I/O 线程):监听所有连接的 I/O 事件,触发回调。
- Worker 线程(可选):处理耗时操作(如业务逻辑),避免阻塞 Selector 线程。
示例配置(server.xml)
<Connector port="8080" protocol="org.apache.coyote.http11.Http11Nio2Protocol"
maxThreads="200" <!-- Worker线程数 -->
acceptCount="100" <!-- 最大等待连接数 -->
selectorThreadCount="2" <!-- Selector线程数 -->
maxConnections="8192" <!-- 最大并发连接数 -->
/>
3. 性能优化建议
- 调整 Selector 线程数:根据 CPU 核心数设置,通常为
2~4
。 - 增大
maxConnections
:默认值较低(如 8192),可根据内存调整。 - 异步处理业务逻辑:避免在 Selector 线程中执行耗时操作。
- 使用 WebSocket 注解:通过
@ServerEndpoint
简化开发,Tomcat 会自动优化。
示例:异步处理消息
@ServerEndpoint("/ws")
public class MyWebSocket {
@OnMessage
public void onMessage(Session session, String message) {
// 提交到线程池处理业务逻辑,避免阻塞Selector线程
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.submit(() -> {
// 处理消息(如数据库操作、调用外部服务)
session.getAsyncRemote().sendText("处理完成");
});
}
}
4. 与 Netty 的对比
- Tomcat:基于 Servlet 容器,适合与现有 Web 应用集成,开箱即用。
- Netty:纯 NIO 框架,性能更高(约 10%~20%),但需要手动配置。
对于大多数应用,Tomcat 的 WebSocket 实现已足够高效.Tomcat 通过非阻塞 I/O + 事件驱动模型实现 WebSocket,避免了传统线程池的瓶颈,可轻松支持数万并发连接;对于高性能场景(如游戏服务器),可考虑 Netty。