C++网络编程 5.TCP套接字(socket)通信进阶-基于多线程的TCP多客户端通信

发布于:2025-07-18 ⋅ 阅读:(19) ⋅ 点赞:(0)

基于多线程的TCP多客户端通信示例详解

这个示例实现了一个能同时处理多个客户端连接的TCP服务器,以及一个能创建多个连接的客户端程序。核心通过C++11的std::thread库实现多线程并发处理,下面分服务器和客户端两部分详细讲解。
先附上完整代码:

multi_conn_threads_server.cpp
#include <iostream>
#include <sys/socket.h> // 套接字API
#include <netinet/in.h> //
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <atomic> // 原子操作(线程安全标志)
#include <functional> // 函数对象支持

// 处理客户端连接的函数
void handleClient(int client_sock, const std::string& client_ip, int client_port) {
    std::cout << "新客户端连接: " << client_ip << ":" << client_port << std::endl;
    
    char buffer[1024];
    while (true) {
        // 接收客户端消息
        memset(buffer, 0, sizeof(buffer));
        int recv_size = read(client_sock, buffer, sizeof(buffer));
        
        if (recv_size <= 0) {
            // 客户端断开连接或出错
            if (recv_size == 0) {
                std::cout << "客户端 " << client_ip << ":" << client_port << " 断开连接" << std::endl;
            } else {
                std::cerr << "接收数据出错: " << strerror(errno) << std::endl;
            }
            break;
        }
        
        std::cout << "从 " << client_ip << ":" << client_port << " 接收: " << buffer;
        
        // 发送响应
        std::string response = "服务器已收到: " + std::string(buffer);
        write(client_sock, response.c_str(), response.size());
    }
    
    // 关闭客户端套接字
    close(client_sock);
}

int main() {
    // 创建监听套接字
    int listen_sock = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_sock == -1) {
        std::cerr << "创建套接字失败: " << strerror(errno) << std::endl;
        return 1;
    }
    
    // 设置端口复用,避免程序重启时端口被占用
    int opt = 1;
    if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
        std::cerr << "设置套接字选项失败: " << strerror(errno) << std::endl;
        close(listen_sock);
        return 1;
    }
    
    // 绑定IP地址和端口
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY; // 监听所有IP
    server_addr.sin_port = htons(9888); // 端口号
    
    if (bind(listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        std::cerr << "绑定失败: " << strerror(errno) << std::endl;
        close(listen_sock);
        return 1;
    }
    
    // 开始监听
    if (listen(listen_sock, 5) == -1) {
        std::cerr << "监听失败: " << strerror(errno) << std::endl;
        close(listen_sock);
        return 1;
    }
    
    std::cout << "服务器启动成功,监听在 0.0.0.0:9888" << std::endl;
    
    std::vector<std::thread> client_threads;
    std::atomic<bool> running(true);
    
    // 主循环接受客户端连接
    while (running) {
        struct sockaddr_in client_addr;
        socklen_t client_addr_len = sizeof(client_addr);
        int client_sock = accept(listen_sock, (struct sockaddr*)&client_addr, &client_addr_len);
        
        if (client_sock == -1) {
            if (running) { // 只有在运行中时才视为错误
                std::cerr << "接受连接失败: " << strerror(errno) << std::endl;
            }
            continue;
        }
        
        // 获取客户端IP和端口
        std::string client_ip = inet_ntoa(client_addr.sin_addr);
        int client_port = ntohs(client_addr.sin_port);
        
        // 创建新线程处理客户端连接
        client_threads.emplace_back(
            handleClient, 
            client_sock, 
            client_ip, 
            client_port
        );
        
        // 分离线程,避免主线程等待
        client_threads.back().detach();
        
        // 限制同时存在的线程数量(可选)
        if (client_threads.size() > 100) {
            client_threads.erase(client_threads.begin());
        }
    }
    
    // 关闭监听套接字
    close(listen_sock);
    std::cout << "服务器已停止" << std::endl;
    
    return 0;
}
multi_conn_threads_client.cpp
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <string>
#include <chrono>

// 服务器地址和端口(需与服务器保持一致)
const std::string SERVER_IP = "127.0.0.1";
const int SERVER_PORT = 9888;
const int NUM_CONNECTIONS = 5; // 同时建立的连接数量

// 单个连接的处理函数
void handleConnection(int conn_id) {
    // 1. 创建客户端套接字
    int client_sock = socket(AF_INET, SOCK_STREAM, 0);
    if (client_sock == -1) {
        std::cerr << "连接 " << conn_id << " 创建套接字失败: " << strerror(errno) << std::endl;
        return;
    }

    // 2. 配置服务器地址
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(SERVER_PORT);
    // 将服务器IP从字符串转换为网络字节序
    if (inet_pton(AF_INET, SERVER_IP.c_str(), &server_addr.sin_addr) <= 0) {
        std::cerr << "连接 " << conn_id << " IP地址转换失败" << std::endl;
        close(client_sock);
        return;
    }

    // 3. 连接服务器
    if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        std::cerr << "连接 " << conn_id << " 连接服务器失败: " << strerror(errno) << std::endl;
        close(client_sock);
        return;
    }
    std::cout << "连接 " << conn_id << " 成功连接到服务器 " << SERVER_IP << ":" << SERVER_PORT << std::endl;

    try {
        // 4. 数据交互(每个连接发送3条消息)
        for (int i = 0; i < 3; ++i) {
            // 发送消息(包含连接ID和序号,方便服务器区分)
            std::string msg = "客户端连接 " + std::to_string(conn_id) + " 的消息 " + std::to_string(i+1);
            ssize_t send_size = send(client_sock, msg.c_str(), msg.size(), 0);
            if (send_size == -1) {
                std::cerr << "连接 " << conn_id << " 发送消息失败: " << strerror(errno) << std::endl;
                break;
            }
            std::cout << "连接 " << conn_id << " 发送: " << msg << std::endl;

            // 接收服务器响应
            char buffer[1024] = {0};
            ssize_t recv_size = recv(client_sock, buffer, sizeof(buffer)-1, 0);
            if (recv_size <= 0) {
                if (recv_size == 0) {
                    std::cout << "连接 " << conn_id << " 服务器已断开" << std::endl;
                } else {
                    std::cerr << "连接 " << conn_id << " 接收响应失败: " << strerror(errno) << std::endl;
                }
                break;
            }
            std::cout << "连接 " << conn_id << " 收到响应: " << buffer << std::endl;

            // 模拟间隔,避免消息发送过快
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
        }
    } catch (const std::exception& e) {
        std::cerr << "连接 " << conn_id << " 异常: " << e.what() << std::endl;
    }

    // 5. 关闭连接
    close(client_sock);
    std::cout << "连接 " << conn_id << " 已关闭" << std::endl;
}

int main() {
    std::cout << "多连接TCP客户端启动,将创建 " << NUM_CONNECTIONS << " 个连接" << std::endl;

    // 存储所有连接的线程
    std::vector<std::thread> connection_threads;

    // 创建多个连接(每个连接一个线程)
    for (int i = 0; i < NUM_CONNECTIONS; ++i) {
        connection_threads.emplace_back(handleConnection, i+1); // 连接ID从1开始
        // 稍微延迟,避免同时建立连接导致服务器压力集中
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    // 等待所有连接线程完成
    for (auto& t : connection_threads) {
        if (t.joinable()) {
            t.join();
        }
    }

    std::cout << "所有连接已处理完成,客户端退出" << std::endl;
    return 0;
}

一、多线程TCP服务器程序详解

服务器的核心目标是:同时接受并处理多个客户端的连接请求,每个客户端的通信逻辑在独立线程中执行,避免单线程阻塞导致其他客户端无法连接。

1. 核心功能概述

  • 启动服务器并监听指定端口(9888);
  • 主循环持续接受新的客户端连接;
  • 每接收到一个客户端连接,创建独立线程处理该客户端的消息收发;
  • 支持客户端正常断开或异常断开的处理;
  • 避免端口占用问题(通过SO_REUSEADDR选项)。

2. 关键代码解析

2.1 初始化与监听套接字
// 创建监听套接字(TCP类型)
int listen_sock = socket(AF_INET, SOCK_STREAM, 0);

// 设置端口复用(解决服务器重启时端口被TIME_WAIT状态占用的问题)
int opt = 1;
setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

// 绑定IP和端口(与基础示例一致)
struct sockaddr_in server_addr;
// ... 填充地址信息(INADDR_ANY绑定所有IP,端口9888)
bind(listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr));

// 开始监听(backlog=5,表示未完成连接队列最大长度)
listen(listen_sock, 5);
  • 端口复用(SO_REUSEADDR):服务器关闭后,端口可能处于TIME_WAIT状态(确保最后一个数据包被接收),此时直接重启服务器会绑定失败。setsockopt设置该选项后,允许端口在TIME_WAIT状态下被重新绑定,方便开发调试。
2.2 主循环:接受客户端连接并创建线程
std::vector<std::thread> client_threads;  // 存储客户端线程
std::atomic<bool> running(true);          // 原子变量:服务器运行标志(线程安全)

while (running) {
    // 接受客户端连接(阻塞等待新连接)
    struct sockaddr_in client_addr;
    socklen_t client_addr_len = sizeof(client_addr);
    int client_sock = accept(listen_sock, (struct sockaddr*)&client_addr, &client_addr_len);

    if (client_sock == -1) {
        if (running) std::cerr << "接受连接失败" << std::endl;
        continue;
    }

    // 获取客户端IP和端口(转换为可读性字符串)
    std::string client_ip = inet_ntoa(client_addr.sin_addr);  // 二进制IP→点分十进制
    int client_port = ntohs(client_addr.sin_port);            // 网络字节序→主机字节序

    // 创建线程处理该客户端,传入通信套接字、IP、端口
    client_threads.emplace_back(handleClient, client_sock, client_ip, client_port);
    // 分离线程:线程独立运行,结束后自动释放资源(无需主线程join)
    client_threads.back().detach();

    // 可选:限制线程容器大小,避免内存占用过高
    if (client_threads.size() > 100) {
        client_threads.erase(client_threads.begin());
    }
}
  • 线程分离(detach()):服务器不需要等待每个客户端线程结束(否则主线程会被阻塞),通过detach()让线程在后台独立运行,完成后自动回收资源。
  • 原子变量running:用于线程安全地控制服务器是否继续运行(后续可通过信号处理设置running=false实现优雅关闭)。
2.3 客户端处理函数(handleClient)

每个客户端连接的核心逻辑,在独立线程中执行:

void handleClient(int client_sock, const std::string& client_ip, int client_port) {
    std::cout << "新客户端连接: " << client_ip << ":" << client_port << std::endl;
    
    char buffer[1024];
    while (true) {
        // 接收客户端消息
        memset(buffer, 0, sizeof(buffer));
        int recv_size = read(client_sock, buffer, sizeof(buffer));
        
        if (recv_size <= 0) {
            // 客户端断开(recv_size=0)或出错(recv_size=-1)
            std::cout << "客户端 " << client_ip << ":" << client_port << " 断开连接" << std::endl;
            break;
        }
        
        // 打印消息并回复
        std::cout << "从 " << client_ip << ":" << client_port << " 接收: " << buffer;
        std::string response = "服务器已收到: " + std::string(buffer);
        write(client_sock, response.c_str(), response.size());
    }
    
    close(client_sock);  // 关闭通信套接字
}
  • 循环接收消息:线程进入循环,持续从客户端读取消息,直到客户端断开连接(recv_size=0)或出错。
  • 资源释放:客户端断开后,关闭通信套接字,线程自动结束。

3. 服务器核心技术点

3.1 多线程并发处理
  • 每个客户端连接对应一个独立线程,线程间通过client_sock区分,互不干扰,实现“同时处理多个客户端”。
  • 对比单线程服务器:单线程需处理完一个客户端才能接受下一个,多线程服务器在主线程接受连接后,立即将处理逻辑交给子线程,主线程继续接受新连接,显著提升并发能力。
3.2 线程管理与安全
  • 线程分离(detach()):避免主线程调用join()等待子线程,否则主线程会被阻塞,无法接受新连接。但detach()后主线程无法控制子线程,需确保子线程能正常结束。
  • 原子变量std::atomic<bool> running保证多线程环境下对running的读写操作是原子的(无数据竞争),后续可通过设置running=false优雅关闭服务器(需配合信号处理)。
3.3 潜在问题与优化方向
  • 线程数量无限制:频繁创建线程会消耗系统资源(内存、CPU),高并发下可能导致服务器性能下降。实际开发中应使用线程池(预先创建固定数量线程,循环处理连接)。
  • 输出线程不安全:多个线程同时调用std::cout可能导致输出错乱(cout不是线程安全的),需通过std::mutex加锁保护。
  • 未处理粘包:若客户端发送大数据或连续消息,服务器可能无法区分消息边界(需用“长度前缀法”优化)。

二、多连接TCP客户端程序详解

客户端的核心目标是:模拟多个客户端同时连接服务器,每个连接在独立线程中与服务器交互,验证服务器的多客户端处理能力。

1. 核心功能概述

  • 创建指定数量(NUM_CONNECTIONS=5)的客户端连接;
  • 每个连接在独立线程中执行:连接服务器→发送多条消息→接收响应→关闭连接;
  • 主线程等待所有连接线程完成后退出。

2. 关键代码解析

2.1 多连接线程创建
const int NUM_CONNECTIONS = 5;  // 同时创建5个连接
std::vector<std::thread> connection_threads;

// 为每个连接创建线程
for (int i = 0; i < NUM_CONNECTIONS; ++i) {
    connection_threads.emplace_back(handleConnection, i+1);  // 传入连接ID(1~5)
    std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 错开连接时间
}

// 等待所有线程完成
for (auto& t : connection_threads) {
    if (t.joinable()) t.join();
}
  • std::vector<std::thread>存储所有连接线程,主线程通过join()等待所有线程执行完毕,确保所有连接都完成交互。
2.2 单个连接处理函数(handleConnection)

每个线程执行的核心逻辑,模拟客户端与服务器的交互:

void handleConnection(int conn_id) {
    // 1. 创建客户端套接字(与基础客户端一致)
    int client_sock = socket(AF_INET, SOCK_STREAM, 0);
    if (client_sock == -1) { /* 错误处理 */ }

    // 2. 配置服务器地址并连接
    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(SERVER_PORT);
    inet_pton(AF_INET, SERVER_IP.c_str(), &server_addr.sin_addr);  // IP转换(现代用法)
    
    if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        /* 错误处理 */
    }

    // 3. 数据交互:发送3条消息并接收响应
    for (int i = 0; i < 3; ++i) {
        std::string msg = "客户端连接 " + std::to_string(conn_id) + " 的消息 " + std::to_string(i+1);
        send(client_sock, msg.c_str(), msg.size(), 0);  // 发送消息
        
        char buffer[1024] = {0};
        recv(client_sock, buffer, sizeof(buffer)-1, 0);  // 接收响应
        std::cout << "连接 " << conn_id << " 收到响应: " << buffer << std::endl;
        
        std::this_thread::sleep_for(std::chrono::milliseconds(500));  // 模拟延迟
    }

    // 4. 关闭连接
    close(client_sock);
}

3. 客户端核心技术点

3.1 多连接模拟

通过创建多个线程,每个线程模拟一个客户端,验证服务器能否同时处理多个连接。线程间通过conn_id区分,方便观察输出。

3.2 连接控制
  • 连接创建时添加小延迟(sleep_for(100ms)),避免所有连接同时发起,减轻服务器瞬间压力(模拟真实场景中客户端陆续连接)。
  • 主线程通过join()等待所有连接线程完成,确保程序正常退出。

三、程序运行流程与预期输出

1. 运行步骤

  1. 编译服务器和客户端:
    g++ server.cpp -o server -lpthread  # 链接线程库
    g++ client.cpp -o client -lpthread
    
  2. 启动服务器:
    ./server  # 输出:服务器启动成功,监听在 0.0.0.0:9888
    
  3. 启动客户端:
    ./client  # 输出:多连接TCP客户端启动,将创建5个连接
    

2. 预期输出

服务器输出
服务器启动成功,监听在 0.0.0.0:9888
新客户端连接: 127.0.0.1:54321
从 127.0.0.1:54321 接收: 客户端连接 1 的消息 1
从 127.0.0.1:54321 接收: 客户端连接 1 的消息 2
新客户端连接: 127.0.0.1:54322
从 127.0.0.1:54322 接收: 客户端连接 2 的消息 1
...(其他客户端连接和消息)
客户端 127.0.0.1:54321 断开连接
...(所有客户端处理完毕后断开)
客户端输出
多连接TCP客户端启动,将创建 5 个连接
连接 1 成功连接到服务器 127.0.0.1:9888
连接 1 发送: 客户端连接 1 的消息 1
连接 1 收到响应: 服务器已收到: 客户端连接 1 的消息 1
连接 2 成功连接到服务器 127.0.0.1:9888
...(其他连接的发送和响应)
连接 1 已关闭
...(所有连接关闭后)
所有连接已处理完成,客户端退出

四、总结与扩展

这个示例通过多线程实现了TCP服务器的多客户端并发处理,核心亮点是:

  • 服务器用线程分离实现“接受连接”与“处理消息”的并发;
  • 客户端用多线程模拟真实多用户场景;
  • 包含实用技术(端口复用、原子变量、线程管理)。

扩展优化方向

  1. 线程池替代多线程:避免频繁创建销毁线程,用固定数量线程循环处理连接,提升性能。
  2. 添加粘包处理:服务器和客户端用“长度前缀法”解析消息边界。
  3. 优雅关闭机制:通过信号(如SIGINT)捕获Ctrl+C,设置running=false,关闭监听套接字并等待线程结束。
  4. 线程安全输出:用std::mutex保护std::cout,避免多线程输出错乱。

掌握这些内容后,可进一步学习IO复用(select/poll/epoll)等更高效的并发模型,应对高并发场景。

补充:优化后的多线程TCP客户端代码及完整讲解

针对原代码的不足(线程管理低效、输出错乱、无粘包处理、无优雅关闭等),以下是优化后的代码及详细讲解,重点优化了线程池管理、线程安全、粘包处理、优雅关闭四大核心问题。

一、优化后的服务器代码(server.cpp)

#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <csignal>
#include <functional>
#include <errno.h>

// 线程安全的输出工具
std::mutex g_cout_mutex;
#define LOG(msg) do { \
    std::lock_guard<std::mutex> lock(g_cout_mutex); \
    std::cout << msg << std::endl; \
} while(0)

// 全局运行标志(原子变量确保线程安全)
std::atomic<bool> g_running(true);
// 监听套接字(全局以便信号处理函数关闭)
int g_listen_sock = -1;

// 线程池类:预先创建固定数量线程,循环处理任务
class ThreadPool {
private:
    std::vector<std::thread> workers;       // 工作线程
    std::queue<std::function<void()>> tasks;// 任务队列
    std::mutex queue_mutex;                 // 保护任务队列的互斥锁
    std::condition_variable condition;      // 唤醒线程的条件变量
    bool stop;                              // 线程池停止标志

public:
    // 构造函数:创建n个工作线程
    ThreadPool(size_t n) : stop(false) {
        for (size_t i = 0; i < n; ++i) {
            workers.emplace_back([this] {
                while (!stop) {
                    std::function<void()> task;
                    // 从队列取任务(加锁)
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        // 等待任务或停止信号
                        this->condition.wait(lock, [this] { 
                            return this->stop || !this->tasks.empty(); 
                        });
                        if (this->stop && this->tasks.empty()) return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    // 执行任务
                    task();
                }
            });
        }
    }

    // 析构函数:停止所有线程
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();  // 唤醒所有等待的线程
        for (std::thread& worker : workers) {
            if (worker.joinable()) {
                worker.join();  // 等待线程结束
            }
        }
    }

    // 添加任务到队列
    void enqueue(std::function<void()> task) {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            tasks.emplace(std::move(task));
        }
        condition.notify_one();  // 唤醒一个等待的线程
    }
};

// 粘包处理:发送数据(先发送4字节长度,再发送数据)
bool sendData(int sock, const std::string& data) {
    // 计算数据长度(转换为网络字节序)
    uint32_t data_len = htonl(data.size());
    // 先发送长度
    if (send(sock, &data_len, sizeof(data_len), 0) != sizeof(data_len)) {
        LOG("发送长度失败: " << strerror(errno));
        return false;
    }
    // 再发送数据
    if (send(sock, data.c_str(), data.size(), 0) != (ssize_t)data.size()) {
        LOG("发送数据失败: " << strerror(errno));
        return false;
    }
    return true;
}

// 粘包处理:接收数据(先接收4字节长度,再接收对应长度的数据)
std::string recvData(int sock) {
    // 先接收长度
    uint32_t data_len;
    ssize_t recv_len = recv(sock, &data_len, sizeof(data_len), 0);
    if (recv_len <= 0) {
        if (recv_len < 0) LOG("接收长度失败: " << strerror(errno));
        return "";  // 连接断开或错误
    }
    data_len = ntohl(data_len);  // 转换为主机字节序

    // 接收数据
    std::string data;
    data.resize(data_len);
    size_t total_recv = 0;
    while (total_recv < data_len) {
        recv_len = recv(sock, &data[total_recv], data_len - total_recv, 0);
        if (recv_len <= 0) {
            if (recv_len < 0) LOG("接收数据失败: " << strerror(errno));
            return "";  // 连接断开或错误
        }
        total_recv += recv_len;
    }
    return data;
}

// 处理客户端连接的任务函数
void handleClient(int client_sock, const std::string& client_ip, int client_port) {
    LOG("新客户端连接: " << client_ip << ":" << client_port);

    while (g_running) {
        // 接收客户端消息(带粘包处理)
        std::string recv_msg = recvData(client_sock);
        if (recv_msg.empty()) {
            LOG("客户端 " << client_ip << ":" << client_port << " 断开连接");
            break;
        }

        LOG("从 " << client_ip << ":" << client_port << " 接收: " << recv_msg);

        // 发送响应(带粘包处理)
        std::string response = "服务器已收到: " + recv_msg;
        if (!sendData(client_sock, response)) {
            break;
        }
    }

    // 关闭客户端套接字
    close(client_sock);
    LOG("客户端 " << client_ip << ":" << client_port << " 连接已关闭");
}

// 信号处理函数:捕获Ctrl+C,设置优雅关闭标志
void signalHandler(int signum) {
    if (signum == SIGINT) {
        LOG("\n收到停止信号,正在优雅关闭服务器...");
        g_running = false;
        // 关闭监听套接字,唤醒accept阻塞
        if (g_listen_sock != -1) {
            close(g_listen_sock);
        }
    }
}

int main() {
    // 注册信号处理函数(捕获Ctrl+C)
    signal(SIGINT, signalHandler);

    // 创建监听套接字
    g_listen_sock = socket(AF_INET, SOCK_STREAM, 0);
    if (g_listen_sock == -1) {
        LOG("创建套接字失败: " << strerror(errno));
        return 1;
    }

    // 设置端口复用
    int opt = 1;
    if (setsockopt(g_listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
        LOG("设置端口复用失败: " << strerror(errno));
        close(g_listen_sock);
        return 1;
    }

    // 绑定IP和端口
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(9888);

    if (bind(g_listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        LOG("绑定失败: " << strerror(errno));
        close(g_listen_sock);
        return 1;
    }

    // 开始监听
    if (listen(g_listen_sock, 5) == -1) {
        LOG("监听失败: " << strerror(errno));
        close(g_listen_sock);
        return 1;
    }

    LOG("服务器启动成功,监听在 0.0.0.0:9888(线程池大小: 4)");

    // 创建线程池(4个工作线程)
    ThreadPool thread_pool(4);

    // 主循环接受客户端连接
    while (g_running) {
        struct sockaddr_in client_addr;
        socklen_t client_addr_len = sizeof(client_addr);
        // accept可能被信号中断,需要处理EINTR错误
        int client_sock = accept(g_listen_sock, (struct sockaddr*)&client_addr, &client_addr_len);
        if (client_sock == -1) {
            if (g_running && errno != EINTR) {  // EINTR是正常中断(信号导致)
                LOG("接受连接失败: " << strerror(errno));
            }
            continue;
        }

        // 获取客户端IP和端口
        std::string client_ip = inet_ntoa(client_addr.sin_addr);
        int client_port = ntohs(client_addr.sin_port);

        // 将客户端处理任务添加到线程池
        thread_pool.enqueue([client_sock, client_ip, client_port]() {
            handleClient(client_sock, client_ip, client_port);
        });
    }

    // 关闭监听套接字
    close(g_listen_sock);
    LOG("服务器已停止监听");

    // 线程池析构时会等待所有任务完成
    LOG("服务器已优雅关闭");

    return 0;
}

二、优化后的客户端代码(client.cpp)

#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <string>
#include <chrono>
#include <mutex>
#include <cstdint>

// 线程安全的输出工具(避免多线程输出错乱)
std::mutex g_cout_mutex;
#define LOG(msg) do { \
    std::lock_guard<std::mutex> lock(g_cout_mutex); \
    std::cout << msg << std::endl; \
} while(0)

// 服务器配置
const std::string SERVER_IP = "127.0.0.1";
const int SERVER_PORT = 9888;
const int NUM_CONNECTIONS = 5;  // 同时建立的连接数量

// 粘包处理:发送数据(与服务器一致,先发送4字节长度,再发送数据)
bool sendData(int sock, const std::string& data) {
    uint32_t data_len = htonl(data.size());  // 长度转为网络字节序
    // 先发送长度
    if (send(sock, &data_len, sizeof(data_len), 0) != sizeof(data_len)) {
        LOG("连接发送长度失败: " << strerror(errno));
        return false;
    }
    // 再发送数据
    if (send(sock, data.c_str(), data.size(), 0) != (ssize_t)data.size()) {
        LOG("连接发送数据失败: " << strerror(errno));
        return false;
    }
    return true;
}

// 粘包处理:接收数据(与服务器一致,先接收长度,再接收数据)
std::string recvData(int sock) {
    uint32_t data_len;
    ssize_t recv_len = recv(sock, &data_len, sizeof(data_len), 0);
    if (recv_len <= 0) {
        if (recv_len < 0) LOG("连接接收长度失败: " << strerror(errno));
        return "";  // 连接断开或错误
    }
    data_len = ntohl(data_len);  // 转为主机字节序

    std::string data;
    data.resize(data_len);
    size_t total_recv = 0;
    // 循环接收完整数据(处理数据分片)
    while (total_recv < data_len) {
        recv_len = recv(sock, &data[total_recv], data_len - total_recv, 0);
        if (recv_len <= 0) {
            if (recv_len < 0) LOG("连接接收数据失败: " << strerror(errno));
            return "";
        }
        total_recv += recv_len;
    }
    return data;
}

// 单个连接的处理函数
void handleConnection(int conn_id) {
    // 创建客户端套接字
    int client_sock = socket(AF_INET, SOCK_STREAM, 0);
    if (client_sock == -1) {
        LOG("连接 " << conn_id << " 创建套接字失败: " << strerror(errno));
        return;
    }

    // 配置服务器地址
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(SERVER_PORT);
    // 转换服务器IP为网络字节序(现代用法,支持IPv6扩展)
    if (inet_pton(AF_INET, SERVER_IP.c_str(), &server_addr.sin_addr) <= 0) {
        LOG("连接 " << conn_id << " IP地址转换失败");
        close(client_sock);
        return;
    }

    // 连接服务器
    if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        LOG("连接 " << conn_id << " 连接服务器失败: " << strerror(errno));
        close(client_sock);
        return;
    }
    LOG("连接 " << conn_id << " 成功连接到服务器 " << SERVER_IP << ":" << SERVER_PORT);

    try {
        // 数据交互:每个连接发送3条消息
        for (int i = 0; i < 3; ++i) {
            std::string msg = "客户端连接 " + std::to_string(conn_id) + " 的消息 " + std::to_string(i+1);
            if (!sendData(client_sock, msg)) {
                break;
            }
            LOG("连接 " << conn_id << " 发送: " << msg);

            // 接收服务器响应
            std::string response = recvData(client_sock);
            if (response.empty()) {
                LOG("连接 " << conn_id << " 接收响应失败或服务器断开");
                break;
            }
            LOG("连接 " << conn_id << " 收到响应: " << response);

            // 模拟业务处理延迟
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
        }
    } catch (const std::exception& e) {
        LOG("连接 " << conn_id << " 异常: " << e.what());
    }

    // 关闭连接
    close(client_sock);
    LOG("连接 " << conn_id << " 已关闭");
}

int main() {
    LOG("多连接TCP客户端启动,将创建 " << NUM_CONNECTIONS << " 个连接");

    std::vector<std::thread> connection_threads;

    // 创建多个连接(每个连接一个线程)
    for (int i = 0; i < NUM_CONNECTIONS; ++i) {
        connection_threads.emplace_back(handleConnection, i + 1);  // 连接ID从1开始
        // 错开连接创建时间,避免服务器瞬间压力过大
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    // 等待所有连接线程完成
    for (auto& t : connection_threads) {
        if (t.joinable()) {
            t.join();
        }
    }

    LOG("所有连接已处理完成,客户端退出");
    return 0;
}

三、核心优化点详解(服务器+客户端)

1. 线程管理优化:从“动态创建线程”到“线程池”

原代码问题:

原服务器每接一个连接就动态创建线程,线程创建/销毁开销大,高并发下系统资源易耗尽(线程栈内存、CPU调度压力)。

优化方案:

服务器引入线程池ThreadPool类),预先创建固定数量线程(示例中4个),通过任务队列循环处理客户端连接,避免频繁创建线程:

// 线程池核心逻辑
class ThreadPool {
private:
    std::vector<std::thread> workers;  // 固定数量的工作线程
    std::queue<std::function<void()>> tasks;  // 任务队列
    std::mutex queue_mutex;  // 保护任务队列的互斥锁
    std::condition_variable condition;  // 唤醒线程的条件变量
    bool stop;  // 停止标志
public:
    ThreadPool(size_t n) : stop(false) {
        // 预先创建n个工作线程,循环取任务执行
        for (size_t i = 0; i < n; ++i) {
            workers.emplace_back([this] {
                while (!stop) {
                    std::function<void()> task;
                    // 加锁取任务
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex);
                        condition.wait(lock, [this] { return stop || !tasks.empty(); });
                        if (stop && tasks.empty()) return;
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    task();  // 执行任务(客户端处理逻辑)
                }
            });
        }
    }
    // 添加任务到队列
    void enqueue(std::function<void()> task) { /* ... */ }
};
优势:
  • 线程复用:工作线程长期存在,循环处理任务,减少线程创建/销毁开销;
  • 资源控制:通过线程池大小(如4个)限制并发线程数,避免资源耗尽。

2. 线程安全优化:解决多线程输出错乱

原代码问题:

多个线程同时调用std::cout,导致输出内容错乱(cout非线程安全,多个线程输出可能交叉)。

优化方案:

使用std::mutex为输出加锁,定义线程安全的LOG宏:

std::mutex g_cout_mutex;  // 全局输出互斥锁
#define LOG(msg) do { \
    std::lock_guard<std::mutex> lock(g_cout_mutex);  // 自动加锁/解锁 \
    std::cout << msg << std::endl; \
} while(0)
效果:

所有线程的输出操作通过互斥锁串行执行,确保日志清晰可读,无交叉错乱。

3. 粘包处理:解决TCP数据边界问题

原代码问题:

TCP是流式传输,若客户端连续发送小消息,服务器可能将多个消息合并为一个“大数据块”,无法区分边界(粘包)。

优化方案:

采用“长度前缀法”明确消息边界,发送数据时先传4字节长度(网络字节序),再传实际数据:

// 服务器/客户端发送数据(统一逻辑)
bool sendData(int sock, const std::string& data) {
    uint32_t data_len = htonl(data.size());  // 长度转为网络字节序
    send(sock, &data_len, sizeof(data_len), 0);  // 先发送长度
    send(sock, data.c_str(), data.size(), 0);    // 再发送数据
    return true;
}

// 服务器/客户端接收数据(统一逻辑)
std::string recvData(int sock) {
    uint32_t data_len;
    recv(sock, &data_len, sizeof(data_len), 0);  // 先接收长度
    data_len = ntohl(data_len);  // 转为主机字节序
    std::string data(data_len, '\0');
    recv(sock, &data[0], data_len, 0);  // 接收对应长度的数据
    return data;
}
效果:

无论消息大小或发送频率如何,接收方都能通过“先读长度,再读数据”准确解析每个消息,彻底解决粘包问题。

4. 优雅关闭:确保资源正确释放

原代码问题:

原服务器若通过Ctrl+C强制终止,可能导致线程未完成、套接字未关闭,资源泄漏。

优化方案:
  1. 信号处理:捕获SIGINT信号(Ctrl+C),设置g_running=false标志;
  2. 唤醒阻塞:关闭监听套接字,唤醒accept阻塞;
  3. 线程池等待:线程池析构时等待所有工作线程完成当前任务:
// 信号处理函数
void signalHandler(int signum) {
    if (signum == SIGINT) {
        LOG("\n收到停止信号,正在优雅关闭服务器...");
        g_running = false;  // 设置全局停止标志
        close(g_listen_sock);  // 唤醒accept阻塞
    }
}

// main函数中注册信号处理
signal(SIGINT, signalHandler);

// 线程池析构时自动等待所有线程完成
~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;  // 通知线程停止
    }
    condition.notify_all();  // 唤醒所有工作线程
    for (auto& worker : workers) worker.join();  // 等待线程结束
}
效果:

服务器收到停止信号后,会完成当前客户端的消息处理,关闭所有套接字,释放线程资源,避免泄漏。

5. 错误处理与鲁棒性优化

原代码问题:

对系统调用错误(如accept被信号中断、send/recv失败)处理简单,可能导致程序异常退出。

优化方案:
  1. 处理EINTR错误accept等系统调用可能被信号中断(返回-1errno=EINTR),这是正常情况,应忽略而非报错;
  2. 完整的发送/接收循环recv可能只接收部分数据(尤其大数据),通过循环确保接收完整;
  3. 资源释放兜底:所有套接字在close前检查有效性,线程任务结束后确保关闭通信套接字。

三、运行效果与总结

运行步骤:

  1. 编译(需链接线程库):
    g++ server.cpp -o server -lpthread
    g++ client.cpp -o client -lpthread
    
  2. 启动服务器,再启动客户端,可观察到:
    • 服务器日志清晰,无输出错乱;
    • 多个客户端连接同时被处理,无阻塞;
    • 消息收发准确,无粘包;
    • Ctrl+C停止服务器时,会优雅关闭,无资源泄漏。

核心优化价值:

  • 性能:线程池减少线程开销,支持更高并发;
  • 可靠性:粘包处理确保数据正确解析,错误处理避免异常退出;
  • 可维护性:模块化设计(线程池、粘包工具),逻辑清晰;
  • 安全性:线程安全输出、优雅关闭机制确保资源正确管理。

这些优化使代码从“演示级”提升为“生产可用级”,可作为多线程TCP服务器的基础框架扩展。


网站公告

今日签到

点亮在社区的每一天
去签到