本文将详细解析一个基于Java Socket实现的服务器程序,涵盖线程池管理、心跳检测、优雅停机等关键特性,并最终提供完整代码实现。
1. 架构概述
这个Socket服务器实现具有以下核心特性:
- 基于Java原生Socket API实现
- 使用线程池处理客户端连接
- 内置心跳检测机制
- 支持优雅停机
- 限制最大连接数
2. 核心组件解析
2.1 启动入口 - RelayTask
@Component
public class RelayTask implements CommandLineRunner {
@Autowired
private SocketService socketService;
@Override
public void run(String... args) throws Exception {
socketService.apply();
}
}
这是一个Spring Boot应用的启动类,实现了CommandLineRunner
接口,在应用启动后自动执行SocketService
的apply()
方法启动Socket服务器。
2.2 Socket服务主类 - SocketService
SocketService
是整个Socket服务器的核心实现类,采用单例模式(通过Spring的@Service
注解实现)。
关键配置参数
private static final int PORT = 5555;
private static final int MAX_CLIENTS = 10;
private static final long HEARTBEAT_INTERVAL = 30000; // 30秒心跳检测
这些参数定义了服务器的基本配置:
• 监听端口:5555
• 最大客户端连接数:10
• 心跳检测间隔:30秒
线程管理
private static final ExecutorService threadPool =
Executors.newFixedThreadPool(MAX_CLIENTS);
private static final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
使用两个线程池:
- 主线程池:固定大小(等于最大客户端数),处理客户端连接
- 调度线程池:单线程,用于心跳检测
客户端管理
private static final ConcurrentHashMap<String, ClientHandler> activeClients =
new ConcurrentHashMap<>();
使用线程安全的ConcurrentHashMap
存储活跃客户端,键为客户端ID,值为对应的处理器实例。
2.3 客户端处理器 - ClientHandler
ClientHandler
是处理单个客户端连接的核心内部类,实现了Runnable
接口。
关键字段
private final Socket clientSocket;
private final String clientId;
private final PrintWriter out;
private final BufferedReader in;
private volatile long lastActivityTime;
private volatile boolean running = true;
• clientSocket
: 客户端Socket连接
• clientId
: 由客户端IP和端口组成的唯一标识
• out
/in
: 输出/输入流
• lastActivityTime
: 记录最后活动时间(用于心跳检测)
• running
: 控制处理循环的开关
核心方法
public void run() {
try {
String inputLine;
while (running && (inputLine = in.readLine()) != null) {
lastActivityTime = System.currentTimeMillis();
processClientMessage(inputLine);
}
} catch (SocketException e) {
// 处理异常
} finally {
// 清理资源
}
}
这是客户端处理的主循环,不断读取客户端消息并处理,同时更新最后活动时间。
2.4 心跳检测机制
private void startHeartbeatChecker() {
scheduler.scheduleAtFixedRate(() -> {
long currentTime = System.currentTimeMillis();
activeClients.forEach((id, handler) -> {
if (currentTime - handler.lastActivityTime > HEARTBEAT_INTERVAL) {
System.out.printf("客户端[%s]心跳超时,强制断开%n", id);
handler.disconnect();
activeClients.remove(id);
}
});
}, 10, 10, TimeUnit.SECONDS); // 每10秒检查一次
}
心跳检测器每10秒检查一次所有客户端,如果发现某个客户端超过30秒没有活动,则强制断开连接。
2.5 优雅停机实现
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("\n接收到关闭信号,开始优雅停机...");
shutdownServer(); // 执行自定义关闭逻辑
}));
通过注册JVM关闭钩子,在服务器进程收到终止信号时执行自定义的关闭逻辑:
- 关闭心跳检测线程
- 通知所有客户端服务器即将关闭
- 断开所有客户端连接
- 关闭线程池
3. 完整代码实现
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.*;
@Component
public class RelayTask implements CommandLineRunner {
@Autowired
private SocketService socketService;
@Override
public void run(String... args) throws Exception {
socketService.apply();
}
}
@Service
public class SocketService {
// 服务器配置
private static final int PORT = 5555;
private static final int MAX_CLIENTS = 10;
private static final long HEARTBEAT_INTERVAL = 30000; // 30秒心跳检测
// 线程池和客户端集合
private static final ExecutorService threadPool =
Executors.newFixedThreadPool(MAX_CLIENTS);
private static final ConcurrentHashMap<String, ClientHandler> activeClients =
new ConcurrentHashMap<>();
private static final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
public void apply() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("\n接收到关闭信号,开始优雅停机...");
shutdownServer(); // 执行自定义关闭逻辑
}));
System.out.printf("Socket服务器启动,监听端口: %d,最大连接数: %d%n", PORT, MAX_CLIENTS);
// 启动心跳检测线程
startHeartbeatChecker();
try (ServerSocket serverSocket = new ServerSocket(PORT)) {
while (true) {
try {
Socket clientSocket = serverSocket.accept();
// 检查是否达到最大连接数
if (activeClients.size() >= MAX_CLIENTS) {
rejectClient(clientSocket);
continue;
}
// 创建客户端处理器
ClientHandler handler = new ClientHandler(clientSocket);
String clientId = handler.getClientId();
// 添加到活跃客户端列表
activeClients.put(clientId, handler);
threadPool.execute(handler);
System.out.printf("客户端[%s]已连接,当前连接数: %d/%d%n",
clientId, activeClients.size(), MAX_CLIENTS);
} catch (IOException e) {
System.err.println("接受客户端连接时出错: " + e.getMessage());
}
}
} catch (IOException e) {
System.err.println("服务器启动失败: " + e.getMessage());
} finally {
shutdownServer();
}
}
// 客户端处理器
private class ClientHandler implements Runnable {
private final Socket clientSocket;
private final String clientId;
private final PrintWriter out;
private final BufferedReader in;
private volatile long lastActivityTime;
private volatile boolean running = true;
public ClientHandler(Socket socket) throws IOException {
this.clientSocket = socket;
this.clientId = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
this.out = new PrintWriter(socket.getOutputStream(), true);
this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
this.lastActivityTime = System.currentTimeMillis();
// 发送欢迎消息
sendMessage("SERVER|欢迎连接到服务器,你的ID: " + clientId);
}
public String getClientId() {
return clientId;
}
public void sendMessage(String message) {
out.println(message);
}
public void disconnect() {
running = false;
try {
clientSocket.close();
} catch (IOException e) {
// 忽略关闭异常
}
}
@Override
public void run() {
try {
String inputLine;
while (running && (inputLine = in.readLine()) != null) {
lastActivityTime = System.currentTimeMillis();
// 处理客户端消息
processClientMessage(inputLine);
}
} catch (SocketException e) {
System.out.printf("客户端[%s]异常断开: %s%n", clientId, e.getMessage());
} catch (IOException e) {
System.err.printf("与客户端[%s]通信出错: %s%n", clientId, e.getMessage());
} finally {
// 客户端断开处理
activeClients.remove(clientId);
System.out.printf("客户端[%s]已断开,剩余连接数: %d%n",
clientId, activeClients.size());
disconnect();
}
}
// 处理消息逻辑
private void processClientMessage(String message) {
System.out.printf("收到客户端[%s]消息: %s%n", clientId, message);
}
}
// ---------- 服务器管理方法 ----------
// 拒绝新连接(达到最大连接数时)
private void rejectClient(Socket clientSocket) throws IOException {
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
out.println("SERVER|ERROR|服务器已达最大连接数,请稍后再试");
clientSocket.close();
System.out.println("已拒绝新连接(达到最大连接数)");
}
// 启动心跳检测线程
private void startHeartbeatChecker() {
scheduler.scheduleAtFixedRate(() -> {
long currentTime = System.currentTimeMillis();
activeClients.forEach((id, handler) -> {
if (currentTime - handler.lastActivityTime > HEARTBEAT_INTERVAL) {
System.out.printf("客户端[%s]心跳超时,强制断开%n", id);
handler.disconnect();
activeClients.remove(id);
}
});
}, 10, 10, TimeUnit.SECONDS); // 每10秒检查一次
}
// 关闭服务器
private void shutdownServer() {
System.out.println("服务器关闭中...");
// 关闭心跳检测线程
if (scheduler != null) {
scheduler.shutdownNow();
System.out.println("心跳检测线程已关闭");
}
// 通知所有客户端
activeClients.forEach((id, handler) -> {
handler.sendMessage("SERVER|WARN|服务器即将关闭");
handler.disconnect();
});
// 关闭线程池
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
} catch (InterruptedException e) {
threadPool.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("服务器已关闭");
}
// 获取当前连接数(可用于监控)
public int getCurrentConnections() {
return activeClients.size();
}
}
4. 扩展建议
这个基础实现可以进一步扩展:
- 协议扩展:实现更复杂的消息协议,如JSON或Protobuf
- 认证机制:添加客户端认证功能
- 消息广播:实现向所有客户端广播消息的功能
- 性能监控:添加连接数、吞吐量等监控指标
- 配置外部化:将端口、最大连接数等参数移到配置文件中
5. 总结
本文详细解析了一个功能完善的Java Socket服务器实现,涵盖了线程池管理、心跳检测、优雅停机等关键特性。这个实现既保持了简洁性,又具备了生产环境所需的核心功能,可以作为开发更复杂网络应用的坚实基础。