深入解析Java Socket服务器实现:从基础到优雅停机

发布于:2025-04-21 ⋅ 阅读:(59) ⋅ 点赞:(0)

本文将详细解析一个基于Java Socket实现的服务器程序,涵盖线程池管理、心跳检测、优雅停机等关键特性,并最终提供完整代码实现。

1. 架构概述

这个Socket服务器实现具有以下核心特性:

  1. 基于Java原生Socket API实现
  2. 使用线程池处理客户端连接
  3. 内置心跳检测机制
  4. 支持优雅停机
  5. 限制最大连接数

2. 核心组件解析

2.1 启动入口 - RelayTask

@Component
public class RelayTask implements CommandLineRunner {
    @Autowired
    private SocketService socketService;

    @Override
    public void run(String... args) throws Exception {
        socketService.apply();
    }
}

这是一个Spring Boot应用的启动类,实现了CommandLineRunner接口,在应用启动后自动执行SocketServiceapply()方法启动Socket服务器。

2.2 Socket服务主类 - SocketService

SocketService是整个Socket服务器的核心实现类,采用单例模式(通过Spring的@Service注解实现)。

关键配置参数
private static final int PORT = 5555;
private static final int MAX_CLIENTS = 10;
private static final long HEARTBEAT_INTERVAL = 30000; // 30秒心跳检测

这些参数定义了服务器的基本配置:
• 监听端口:5555
• 最大客户端连接数:10
• 心跳检测间隔:30秒

线程管理
private static final ExecutorService threadPool = 
        Executors.newFixedThreadPool(MAX_CLIENTS);
private static final ScheduledExecutorService scheduler = 
        Executors.newSingleThreadScheduledExecutor();

使用两个线程池:

  1. 主线程池:固定大小(等于最大客户端数),处理客户端连接
  2. 调度线程池:单线程,用于心跳检测
客户端管理
private static final ConcurrentHashMap<String, ClientHandler> activeClients = 
        new ConcurrentHashMap<>();

使用线程安全的ConcurrentHashMap存储活跃客户端,键为客户端ID,值为对应的处理器实例。

2.3 客户端处理器 - ClientHandler

ClientHandler是处理单个客户端连接的核心内部类,实现了Runnable接口。

关键字段
private final Socket clientSocket;
private final String clientId;
private final PrintWriter out;
private final BufferedReader in;
private volatile long lastActivityTime;
private volatile boolean running = true;

clientSocket: 客户端Socket连接
clientId: 由客户端IP和端口组成的唯一标识
out/in: 输出/输入流
lastActivityTime: 记录最后活动时间(用于心跳检测)
running: 控制处理循环的开关

核心方法
public void run() {
    try {
        String inputLine;
        while (running && (inputLine = in.readLine()) != null) {
            lastActivityTime = System.currentTimeMillis();
            processClientMessage(inputLine);
        }
    } catch (SocketException e) {
        // 处理异常
    } finally {
        // 清理资源
    }
}

这是客户端处理的主循环,不断读取客户端消息并处理,同时更新最后活动时间。

2.4 心跳检测机制

private void startHeartbeatChecker() {
    scheduler.scheduleAtFixedRate(() -> {
        long currentTime = System.currentTimeMillis();
        activeClients.forEach((id, handler) -> {
            if (currentTime - handler.lastActivityTime > HEARTBEAT_INTERVAL) {
                System.out.printf("客户端[%s]心跳超时,强制断开%n", id);
                handler.disconnect();
                activeClients.remove(id);
            }
        });
    }, 10, 10, TimeUnit.SECONDS); // 每10秒检查一次
}

心跳检测器每10秒检查一次所有客户端,如果发现某个客户端超过30秒没有活动,则强制断开连接。

2.5 优雅停机实现

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    System.out.println("\n接收到关闭信号,开始优雅停机...");
    shutdownServer(); // 执行自定义关闭逻辑
}));

通过注册JVM关闭钩子,在服务器进程收到终止信号时执行自定义的关闭逻辑:

  1. 关闭心跳检测线程
  2. 通知所有客户端服务器即将关闭
  3. 断开所有客户端连接
  4. 关闭线程池

3. 完整代码实现

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.*;

@Component
public class RelayTask implements CommandLineRunner {
    @Autowired
    private SocketService socketService;

    @Override
    public void run(String... args) throws Exception {
        socketService.apply();
    }
}

@Service
public class SocketService {
    // 服务器配置
    private static final int PORT = 5555;
    private static final int MAX_CLIENTS = 10;
    private static final long HEARTBEAT_INTERVAL = 30000; // 30秒心跳检测

    // 线程池和客户端集合
    private static final ExecutorService threadPool =
            Executors.newFixedThreadPool(MAX_CLIENTS);
    private static final ConcurrentHashMap<String, ClientHandler> activeClients =
            new ConcurrentHashMap<>();
    private static final ScheduledExecutorService scheduler =
            Executors.newSingleThreadScheduledExecutor();

    public void apply() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("\n接收到关闭信号,开始优雅停机...");
            shutdownServer(); // 执行自定义关闭逻辑
        }));

        System.out.printf("Socket服务器启动,监听端口: %d,最大连接数: %d%n", PORT, MAX_CLIENTS);

        // 启动心跳检测线程
        startHeartbeatChecker();

        try (ServerSocket serverSocket = new ServerSocket(PORT)) {
            while (true) {
                try {
                    Socket clientSocket = serverSocket.accept();

                    // 检查是否达到最大连接数
                    if (activeClients.size() >= MAX_CLIENTS) {
                        rejectClient(clientSocket);
                        continue;
                    }

                    // 创建客户端处理器
                    ClientHandler handler = new ClientHandler(clientSocket);
                    String clientId = handler.getClientId();

                    // 添加到活跃客户端列表
                    activeClients.put(clientId, handler);
                    threadPool.execute(handler);

                    System.out.printf("客户端[%s]已连接,当前连接数: %d/%d%n",
                            clientId, activeClients.size(), MAX_CLIENTS);
                } catch (IOException e) {
                    System.err.println("接受客户端连接时出错: " + e.getMessage());
                }
            }
        } catch (IOException e) {
            System.err.println("服务器启动失败: " + e.getMessage());
        } finally {
            shutdownServer();
        }
    }

    // 客户端处理器
    private class ClientHandler implements Runnable {
        private final Socket clientSocket;
        private final String clientId;
        private final PrintWriter out;
        private final BufferedReader in;
        private volatile long lastActivityTime;
        private volatile boolean running = true;

        public ClientHandler(Socket socket) throws IOException {
            this.clientSocket = socket;
            this.clientId = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
            this.out = new PrintWriter(socket.getOutputStream(), true);
            this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            this.lastActivityTime = System.currentTimeMillis();

            // 发送欢迎消息
            sendMessage("SERVER|欢迎连接到服务器,你的ID: " + clientId);
        }

        public String getClientId() {
            return clientId;
        }

        public void sendMessage(String message) {
            out.println(message);
        }

        public void disconnect() {
            running = false;
            try {
                clientSocket.close();
            } catch (IOException e) {
                // 忽略关闭异常
            }
        }

        @Override
        public void run() {
            try {
                String inputLine;
                while (running && (inputLine = in.readLine()) != null) {
                    lastActivityTime = System.currentTimeMillis();

                    // 处理客户端消息
                    processClientMessage(inputLine);
                }
            } catch (SocketException e) {
                System.out.printf("客户端[%s]异常断开: %s%n", clientId, e.getMessage());
            } catch (IOException e) {
                System.err.printf("与客户端[%s]通信出错: %s%n", clientId, e.getMessage());
            } finally {
                // 客户端断开处理
                activeClients.remove(clientId);
                System.out.printf("客户端[%s]已断开,剩余连接数: %d%n",
                        clientId, activeClients.size());
                disconnect();
            }
        }

        // 处理消息逻辑
        private void processClientMessage(String message) {
            System.out.printf("收到客户端[%s]消息: %s%n", clientId, message);
        }
    }

    // ---------- 服务器管理方法 ----------

    // 拒绝新连接(达到最大连接数时)
    private void rejectClient(Socket clientSocket) throws IOException {
        PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
        out.println("SERVER|ERROR|服务器已达最大连接数,请稍后再试");
        clientSocket.close();
        System.out.println("已拒绝新连接(达到最大连接数)");
    }

    // 启动心跳检测线程
    private void startHeartbeatChecker() {
        scheduler.scheduleAtFixedRate(() -> {
            long currentTime = System.currentTimeMillis();

            activeClients.forEach((id, handler) -> {
                if (currentTime - handler.lastActivityTime > HEARTBEAT_INTERVAL) {
                    System.out.printf("客户端[%s]心跳超时,强制断开%n", id);
                    handler.disconnect();
                    activeClients.remove(id);
                }
            });
        }, 10, 10, TimeUnit.SECONDS); // 每10秒检查一次
    }

    // 关闭服务器
    private void shutdownServer() {
        System.out.println("服务器关闭中...");
        // 关闭心跳检测线程
        if (scheduler != null) {
            scheduler.shutdownNow();
            System.out.println("心跳检测线程已关闭");
        }
        // 通知所有客户端
        activeClients.forEach((id, handler) -> {
            handler.sendMessage("SERVER|WARN|服务器即将关闭");
            handler.disconnect();
        });

        // 关闭线程池
        threadPool.shutdown();
        try {
            if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
                threadPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            threadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }

        System.out.println("服务器已关闭");
    }

    // 获取当前连接数(可用于监控)
    public int getCurrentConnections() {
        return activeClients.size();
    }
}

4. 扩展建议

这个基础实现可以进一步扩展:

  1. 协议扩展:实现更复杂的消息协议,如JSON或Protobuf
  2. 认证机制:添加客户端认证功能
  3. 消息广播:实现向所有客户端广播消息的功能
  4. 性能监控:添加连接数、吞吐量等监控指标
  5. 配置外部化:将端口、最大连接数等参数移到配置文件中

5. 总结

本文详细解析了一个功能完善的Java Socket服务器实现,涵盖了线程池管理、心跳检测、优雅停机等关键特性。这个实现既保持了简洁性,又具备了生产环境所需的核心功能,可以作为开发更复杂网络应用的坚实基础。


网站公告

今日签到

点亮在社区的每一天
去签到