基于多线程的TCP多客户端通信示例详解
这个示例实现了一个能同时处理多个客户端连接的TCP服务器,以及一个能创建多个连接的客户端程序。核心通过C++11的std::thread
库实现多线程并发处理,下面分服务器和客户端两部分详细讲解。
先附上完整代码:
multi_conn_threads_server.cpp
#include <iostream>
#include <sys/socket.h> // 套接字API
#include <netinet/in.h> //
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <atomic> // 原子操作(线程安全标志)
#include <functional> // 函数对象支持
// 处理客户端连接的函数
void handleClient(int client_sock, const std::string& client_ip, int client_port) {
std::cout << "新客户端连接: " << client_ip << ":" << client_port << std::endl;
char buffer[1024];
while (true) {
// 接收客户端消息
memset(buffer, 0, sizeof(buffer));
int recv_size = read(client_sock, buffer, sizeof(buffer));
if (recv_size <= 0) {
// 客户端断开连接或出错
if (recv_size == 0) {
std::cout << "客户端 " << client_ip << ":" << client_port << " 断开连接" << std::endl;
} else {
std::cerr << "接收数据出错: " << strerror(errno) << std::endl;
}
break;
}
std::cout << "从 " << client_ip << ":" << client_port << " 接收: " << buffer;
// 发送响应
std::string response = "服务器已收到: " + std::string(buffer);
write(client_sock, response.c_str(), response.size());
}
// 关闭客户端套接字
close(client_sock);
}
int main() {
// 创建监听套接字
int listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sock == -1) {
std::cerr << "创建套接字失败: " << strerror(errno) << std::endl;
return 1;
}
// 设置端口复用,避免程序重启时端口被占用
int opt = 1;
if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
std::cerr << "设置套接字选项失败: " << strerror(errno) << std::endl;
close(listen_sock);
return 1;
}
// 绑定IP地址和端口
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY; // 监听所有IP
server_addr.sin_port = htons(9888); // 端口号
if (bind(listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
std::cerr << "绑定失败: " << strerror(errno) << std::endl;
close(listen_sock);
return 1;
}
// 开始监听
if (listen(listen_sock, 5) == -1) {
std::cerr << "监听失败: " << strerror(errno) << std::endl;
close(listen_sock);
return 1;
}
std::cout << "服务器启动成功,监听在 0.0.0.0:9888" << std::endl;
std::vector<std::thread> client_threads;
std::atomic<bool> running(true);
// 主循环接受客户端连接
while (running) {
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_sock = accept(listen_sock, (struct sockaddr*)&client_addr, &client_addr_len);
if (client_sock == -1) {
if (running) { // 只有在运行中时才视为错误
std::cerr << "接受连接失败: " << strerror(errno) << std::endl;
}
continue;
}
// 获取客户端IP和端口
std::string client_ip = inet_ntoa(client_addr.sin_addr);
int client_port = ntohs(client_addr.sin_port);
// 创建新线程处理客户端连接
client_threads.emplace_back(
handleClient,
client_sock,
client_ip,
client_port
);
// 分离线程,避免主线程等待
client_threads.back().detach();
// 限制同时存在的线程数量(可选)
if (client_threads.size() > 100) {
client_threads.erase(client_threads.begin());
}
}
// 关闭监听套接字
close(listen_sock);
std::cout << "服务器已停止" << std::endl;
return 0;
}
multi_conn_threads_client.cpp
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <string>
#include <chrono>
// 服务器地址和端口(需与服务器保持一致)
const std::string SERVER_IP = "127.0.0.1";
const int SERVER_PORT = 9888;
const int NUM_CONNECTIONS = 5; // 同时建立的连接数量
// 单个连接的处理函数
void handleConnection(int conn_id) {
// 1. 创建客户端套接字
int client_sock = socket(AF_INET, SOCK_STREAM, 0);
if (client_sock == -1) {
std::cerr << "连接 " << conn_id << " 创建套接字失败: " << strerror(errno) << std::endl;
return;
}
// 2. 配置服务器地址
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
// 将服务器IP从字符串转换为网络字节序
if (inet_pton(AF_INET, SERVER_IP.c_str(), &server_addr.sin_addr) <= 0) {
std::cerr << "连接 " << conn_id << " IP地址转换失败" << std::endl;
close(client_sock);
return;
}
// 3. 连接服务器
if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
std::cerr << "连接 " << conn_id << " 连接服务器失败: " << strerror(errno) << std::endl;
close(client_sock);
return;
}
std::cout << "连接 " << conn_id << " 成功连接到服务器 " << SERVER_IP << ":" << SERVER_PORT << std::endl;
try {
// 4. 数据交互(每个连接发送3条消息)
for (int i = 0; i < 3; ++i) {
// 发送消息(包含连接ID和序号,方便服务器区分)
std::string msg = "客户端连接 " + std::to_string(conn_id) + " 的消息 " + std::to_string(i+1);
ssize_t send_size = send(client_sock, msg.c_str(), msg.size(), 0);
if (send_size == -1) {
std::cerr << "连接 " << conn_id << " 发送消息失败: " << strerror(errno) << std::endl;
break;
}
std::cout << "连接 " << conn_id << " 发送: " << msg << std::endl;
// 接收服务器响应
char buffer[1024] = {0};
ssize_t recv_size = recv(client_sock, buffer, sizeof(buffer)-1, 0);
if (recv_size <= 0) {
if (recv_size == 0) {
std::cout << "连接 " << conn_id << " 服务器已断开" << std::endl;
} else {
std::cerr << "连接 " << conn_id << " 接收响应失败: " << strerror(errno) << std::endl;
}
break;
}
std::cout << "连接 " << conn_id << " 收到响应: " << buffer << std::endl;
// 模拟间隔,避免消息发送过快
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
} catch (const std::exception& e) {
std::cerr << "连接 " << conn_id << " 异常: " << e.what() << std::endl;
}
// 5. 关闭连接
close(client_sock);
std::cout << "连接 " << conn_id << " 已关闭" << std::endl;
}
int main() {
std::cout << "多连接TCP客户端启动,将创建 " << NUM_CONNECTIONS << " 个连接" << std::endl;
// 存储所有连接的线程
std::vector<std::thread> connection_threads;
// 创建多个连接(每个连接一个线程)
for (int i = 0; i < NUM_CONNECTIONS; ++i) {
connection_threads.emplace_back(handleConnection, i+1); // 连接ID从1开始
// 稍微延迟,避免同时建立连接导致服务器压力集中
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 等待所有连接线程完成
for (auto& t : connection_threads) {
if (t.joinable()) {
t.join();
}
}
std::cout << "所有连接已处理完成,客户端退出" << std::endl;
return 0;
}
一、多线程TCP服务器程序详解
服务器的核心目标是:同时接受并处理多个客户端的连接请求,每个客户端的通信逻辑在独立线程中执行,避免单线程阻塞导致其他客户端无法连接。
1. 核心功能概述
- 启动服务器并监听指定端口(9888);
- 主循环持续接受新的客户端连接;
- 每接收到一个客户端连接,创建独立线程处理该客户端的消息收发;
- 支持客户端正常断开或异常断开的处理;
- 避免端口占用问题(通过
SO_REUSEADDR
选项)。
2. 关键代码解析
2.1 初始化与监听套接字
// 创建监听套接字(TCP类型)
int listen_sock = socket(AF_INET, SOCK_STREAM, 0);
// 设置端口复用(解决服务器重启时端口被TIME_WAIT状态占用的问题)
int opt = 1;
setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
// 绑定IP和端口(与基础示例一致)
struct sockaddr_in server_addr;
// ... 填充地址信息(INADDR_ANY绑定所有IP,端口9888)
bind(listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr));
// 开始监听(backlog=5,表示未完成连接队列最大长度)
listen(listen_sock, 5);
- 端口复用(SO_REUSEADDR):服务器关闭后,端口可能处于
TIME_WAIT
状态(确保最后一个数据包被接收),此时直接重启服务器会绑定失败。setsockopt
设置该选项后,允许端口在TIME_WAIT
状态下被重新绑定,方便开发调试。
2.2 主循环:接受客户端连接并创建线程
std::vector<std::thread> client_threads; // 存储客户端线程
std::atomic<bool> running(true); // 原子变量:服务器运行标志(线程安全)
while (running) {
// 接受客户端连接(阻塞等待新连接)
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_sock = accept(listen_sock, (struct sockaddr*)&client_addr, &client_addr_len);
if (client_sock == -1) {
if (running) std::cerr << "接受连接失败" << std::endl;
continue;
}
// 获取客户端IP和端口(转换为可读性字符串)
std::string client_ip = inet_ntoa(client_addr.sin_addr); // 二进制IP→点分十进制
int client_port = ntohs(client_addr.sin_port); // 网络字节序→主机字节序
// 创建线程处理该客户端,传入通信套接字、IP、端口
client_threads.emplace_back(handleClient, client_sock, client_ip, client_port);
// 分离线程:线程独立运行,结束后自动释放资源(无需主线程join)
client_threads.back().detach();
// 可选:限制线程容器大小,避免内存占用过高
if (client_threads.size() > 100) {
client_threads.erase(client_threads.begin());
}
}
- 线程分离(detach()):服务器不需要等待每个客户端线程结束(否则主线程会被阻塞),通过
detach()
让线程在后台独立运行,完成后自动回收资源。 - 原子变量
running
:用于线程安全地控制服务器是否继续运行(后续可通过信号处理设置running=false
实现优雅关闭)。
2.3 客户端处理函数(handleClient)
每个客户端连接的核心逻辑,在独立线程中执行:
void handleClient(int client_sock, const std::string& client_ip, int client_port) {
std::cout << "新客户端连接: " << client_ip << ":" << client_port << std::endl;
char buffer[1024];
while (true) {
// 接收客户端消息
memset(buffer, 0, sizeof(buffer));
int recv_size = read(client_sock, buffer, sizeof(buffer));
if (recv_size <= 0) {
// 客户端断开(recv_size=0)或出错(recv_size=-1)
std::cout << "客户端 " << client_ip << ":" << client_port << " 断开连接" << std::endl;
break;
}
// 打印消息并回复
std::cout << "从 " << client_ip << ":" << client_port << " 接收: " << buffer;
std::string response = "服务器已收到: " + std::string(buffer);
write(client_sock, response.c_str(), response.size());
}
close(client_sock); // 关闭通信套接字
}
- 循环接收消息:线程进入循环,持续从客户端读取消息,直到客户端断开连接(
recv_size=0
)或出错。 - 资源释放:客户端断开后,关闭通信套接字,线程自动结束。
3. 服务器核心技术点
3.1 多线程并发处理
- 每个客户端连接对应一个独立线程,线程间通过
client_sock
区分,互不干扰,实现“同时处理多个客户端”。 - 对比单线程服务器:单线程需处理完一个客户端才能接受下一个,多线程服务器在主线程接受连接后,立即将处理逻辑交给子线程,主线程继续接受新连接,显著提升并发能力。
3.2 线程管理与安全
- 线程分离(detach()):避免主线程调用
join()
等待子线程,否则主线程会被阻塞,无法接受新连接。但detach()
后主线程无法控制子线程,需确保子线程能正常结束。 - 原子变量:
std::atomic<bool> running
保证多线程环境下对running
的读写操作是原子的(无数据竞争),后续可通过设置running=false
优雅关闭服务器(需配合信号处理)。
3.3 潜在问题与优化方向
- 线程数量无限制:频繁创建线程会消耗系统资源(内存、CPU),高并发下可能导致服务器性能下降。实际开发中应使用线程池(预先创建固定数量线程,循环处理连接)。
- 输出线程不安全:多个线程同时调用
std::cout
可能导致输出错乱(cout
不是线程安全的),需通过std::mutex
加锁保护。 - 未处理粘包:若客户端发送大数据或连续消息,服务器可能无法区分消息边界(需用“长度前缀法”优化)。
二、多连接TCP客户端程序详解
客户端的核心目标是:模拟多个客户端同时连接服务器,每个连接在独立线程中与服务器交互,验证服务器的多客户端处理能力。
1. 核心功能概述
- 创建指定数量(
NUM_CONNECTIONS=5
)的客户端连接; - 每个连接在独立线程中执行:连接服务器→发送多条消息→接收响应→关闭连接;
- 主线程等待所有连接线程完成后退出。
2. 关键代码解析
2.1 多连接线程创建
const int NUM_CONNECTIONS = 5; // 同时创建5个连接
std::vector<std::thread> connection_threads;
// 为每个连接创建线程
for (int i = 0; i < NUM_CONNECTIONS; ++i) {
connection_threads.emplace_back(handleConnection, i+1); // 传入连接ID(1~5)
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 错开连接时间
}
// 等待所有线程完成
for (auto& t : connection_threads) {
if (t.joinable()) t.join();
}
- 用
std::vector<std::thread>
存储所有连接线程,主线程通过join()
等待所有线程执行完毕,确保所有连接都完成交互。
2.2 单个连接处理函数(handleConnection)
每个线程执行的核心逻辑,模拟客户端与服务器的交互:
void handleConnection(int conn_id) {
// 1. 创建客户端套接字(与基础客户端一致)
int client_sock = socket(AF_INET, SOCK_STREAM, 0);
if (client_sock == -1) { /* 错误处理 */ }
// 2. 配置服务器地址并连接
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
inet_pton(AF_INET, SERVER_IP.c_str(), &server_addr.sin_addr); // IP转换(现代用法)
if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
/* 错误处理 */
}
// 3. 数据交互:发送3条消息并接收响应
for (int i = 0; i < 3; ++i) {
std::string msg = "客户端连接 " + std::to_string(conn_id) + " 的消息 " + std::to_string(i+1);
send(client_sock, msg.c_str(), msg.size(), 0); // 发送消息
char buffer[1024] = {0};
recv(client_sock, buffer, sizeof(buffer)-1, 0); // 接收响应
std::cout << "连接 " << conn_id << " 收到响应: " << buffer << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 模拟延迟
}
// 4. 关闭连接
close(client_sock);
}
3. 客户端核心技术点
3.1 多连接模拟
通过创建多个线程,每个线程模拟一个客户端,验证服务器能否同时处理多个连接。线程间通过conn_id
区分,方便观察输出。
3.2 连接控制
- 连接创建时添加小延迟(
sleep_for(100ms)
),避免所有连接同时发起,减轻服务器瞬间压力(模拟真实场景中客户端陆续连接)。 - 主线程通过
join()
等待所有连接线程完成,确保程序正常退出。
三、程序运行流程与预期输出
1. 运行步骤
- 编译服务器和客户端:
g++ server.cpp -o server -lpthread # 链接线程库 g++ client.cpp -o client -lpthread
- 启动服务器:
./server # 输出:服务器启动成功,监听在 0.0.0.0:9888
- 启动客户端:
./client # 输出:多连接TCP客户端启动,将创建5个连接
2. 预期输出
服务器输出
服务器启动成功,监听在 0.0.0.0:9888
新客户端连接: 127.0.0.1:54321
从 127.0.0.1:54321 接收: 客户端连接 1 的消息 1
从 127.0.0.1:54321 接收: 客户端连接 1 的消息 2
新客户端连接: 127.0.0.1:54322
从 127.0.0.1:54322 接收: 客户端连接 2 的消息 1
...(其他客户端连接和消息)
客户端 127.0.0.1:54321 断开连接
...(所有客户端处理完毕后断开)
客户端输出
多连接TCP客户端启动,将创建 5 个连接
连接 1 成功连接到服务器 127.0.0.1:9888
连接 1 发送: 客户端连接 1 的消息 1
连接 1 收到响应: 服务器已收到: 客户端连接 1 的消息 1
连接 2 成功连接到服务器 127.0.0.1:9888
...(其他连接的发送和响应)
连接 1 已关闭
...(所有连接关闭后)
所有连接已处理完成,客户端退出
四、总结与扩展
这个示例通过多线程实现了TCP服务器的多客户端并发处理,核心亮点是:
- 服务器用线程分离实现“接受连接”与“处理消息”的并发;
- 客户端用多线程模拟真实多用户场景;
- 包含实用技术(端口复用、原子变量、线程管理)。
扩展优化方向
- 线程池替代多线程:避免频繁创建销毁线程,用固定数量线程循环处理连接,提升性能。
- 添加粘包处理:服务器和客户端用“长度前缀法”解析消息边界。
- 优雅关闭机制:通过信号(如
SIGINT
)捕获Ctrl+C
,设置running=false
,关闭监听套接字并等待线程结束。 - 线程安全输出:用
std::mutex
保护std::cout
,避免多线程输出错乱。
掌握这些内容后,可进一步学习IO复用(select/poll/epoll
)等更高效的并发模型,应对高并发场景。
补充:优化后的多线程TCP客户端代码及完整讲解
针对原代码的不足(线程管理低效、输出错乱、无粘包处理、无优雅关闭等),以下是优化后的代码及详细讲解,重点优化了线程池管理、线程安全、粘包处理、优雅关闭四大核心问题。
一、优化后的服务器代码(server.cpp)
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <csignal>
#include <functional>
#include <errno.h>
// 线程安全的输出工具
std::mutex g_cout_mutex;
#define LOG(msg) do { \
std::lock_guard<std::mutex> lock(g_cout_mutex); \
std::cout << msg << std::endl; \
} while(0)
// 全局运行标志(原子变量确保线程安全)
std::atomic<bool> g_running(true);
// 监听套接字(全局以便信号处理函数关闭)
int g_listen_sock = -1;
// 线程池类:预先创建固定数量线程,循环处理任务
class ThreadPool {
private:
std::vector<std::thread> workers; // 工作线程
std::queue<std::function<void()>> tasks;// 任务队列
std::mutex queue_mutex; // 保护任务队列的互斥锁
std::condition_variable condition; // 唤醒线程的条件变量
bool stop; // 线程池停止标志
public:
// 构造函数:创建n个工作线程
ThreadPool(size_t n) : stop(false) {
for (size_t i = 0; i < n; ++i) {
workers.emplace_back([this] {
while (!stop) {
std::function<void()> task;
// 从队列取任务(加锁)
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
// 等待任务或停止信号
this->condition.wait(lock, [this] {
return this->stop || !this->tasks.empty();
});
if (this->stop && this->tasks.empty()) return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
// 执行任务
task();
}
});
}
}
// 析构函数:停止所有线程
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all(); // 唤醒所有等待的线程
for (std::thread& worker : workers) {
if (worker.joinable()) {
worker.join(); // 等待线程结束
}
}
}
// 添加任务到队列
void enqueue(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace(std::move(task));
}
condition.notify_one(); // 唤醒一个等待的线程
}
};
// 粘包处理:发送数据(先发送4字节长度,再发送数据)
bool sendData(int sock, const std::string& data) {
// 计算数据长度(转换为网络字节序)
uint32_t data_len = htonl(data.size());
// 先发送长度
if (send(sock, &data_len, sizeof(data_len), 0) != sizeof(data_len)) {
LOG("发送长度失败: " << strerror(errno));
return false;
}
// 再发送数据
if (send(sock, data.c_str(), data.size(), 0) != (ssize_t)data.size()) {
LOG("发送数据失败: " << strerror(errno));
return false;
}
return true;
}
// 粘包处理:接收数据(先接收4字节长度,再接收对应长度的数据)
std::string recvData(int sock) {
// 先接收长度
uint32_t data_len;
ssize_t recv_len = recv(sock, &data_len, sizeof(data_len), 0);
if (recv_len <= 0) {
if (recv_len < 0) LOG("接收长度失败: " << strerror(errno));
return ""; // 连接断开或错误
}
data_len = ntohl(data_len); // 转换为主机字节序
// 接收数据
std::string data;
data.resize(data_len);
size_t total_recv = 0;
while (total_recv < data_len) {
recv_len = recv(sock, &data[total_recv], data_len - total_recv, 0);
if (recv_len <= 0) {
if (recv_len < 0) LOG("接收数据失败: " << strerror(errno));
return ""; // 连接断开或错误
}
total_recv += recv_len;
}
return data;
}
// 处理客户端连接的任务函数
void handleClient(int client_sock, const std::string& client_ip, int client_port) {
LOG("新客户端连接: " << client_ip << ":" << client_port);
while (g_running) {
// 接收客户端消息(带粘包处理)
std::string recv_msg = recvData(client_sock);
if (recv_msg.empty()) {
LOG("客户端 " << client_ip << ":" << client_port << " 断开连接");
break;
}
LOG("从 " << client_ip << ":" << client_port << " 接收: " << recv_msg);
// 发送响应(带粘包处理)
std::string response = "服务器已收到: " + recv_msg;
if (!sendData(client_sock, response)) {
break;
}
}
// 关闭客户端套接字
close(client_sock);
LOG("客户端 " << client_ip << ":" << client_port << " 连接已关闭");
}
// 信号处理函数:捕获Ctrl+C,设置优雅关闭标志
void signalHandler(int signum) {
if (signum == SIGINT) {
LOG("\n收到停止信号,正在优雅关闭服务器...");
g_running = false;
// 关闭监听套接字,唤醒accept阻塞
if (g_listen_sock != -1) {
close(g_listen_sock);
}
}
}
int main() {
// 注册信号处理函数(捕获Ctrl+C)
signal(SIGINT, signalHandler);
// 创建监听套接字
g_listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if (g_listen_sock == -1) {
LOG("创建套接字失败: " << strerror(errno));
return 1;
}
// 设置端口复用
int opt = 1;
if (setsockopt(g_listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
LOG("设置端口复用失败: " << strerror(errno));
close(g_listen_sock);
return 1;
}
// 绑定IP和端口
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(9888);
if (bind(g_listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
LOG("绑定失败: " << strerror(errno));
close(g_listen_sock);
return 1;
}
// 开始监听
if (listen(g_listen_sock, 5) == -1) {
LOG("监听失败: " << strerror(errno));
close(g_listen_sock);
return 1;
}
LOG("服务器启动成功,监听在 0.0.0.0:9888(线程池大小: 4)");
// 创建线程池(4个工作线程)
ThreadPool thread_pool(4);
// 主循环接受客户端连接
while (g_running) {
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
// accept可能被信号中断,需要处理EINTR错误
int client_sock = accept(g_listen_sock, (struct sockaddr*)&client_addr, &client_addr_len);
if (client_sock == -1) {
if (g_running && errno != EINTR) { // EINTR是正常中断(信号导致)
LOG("接受连接失败: " << strerror(errno));
}
continue;
}
// 获取客户端IP和端口
std::string client_ip = inet_ntoa(client_addr.sin_addr);
int client_port = ntohs(client_addr.sin_port);
// 将客户端处理任务添加到线程池
thread_pool.enqueue([client_sock, client_ip, client_port]() {
handleClient(client_sock, client_ip, client_port);
});
}
// 关闭监听套接字
close(g_listen_sock);
LOG("服务器已停止监听");
// 线程池析构时会等待所有任务完成
LOG("服务器已优雅关闭");
return 0;
}
二、优化后的客户端代码(client.cpp)
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <string>
#include <chrono>
#include <mutex>
#include <cstdint>
// 线程安全的输出工具(避免多线程输出错乱)
std::mutex g_cout_mutex;
#define LOG(msg) do { \
std::lock_guard<std::mutex> lock(g_cout_mutex); \
std::cout << msg << std::endl; \
} while(0)
// 服务器配置
const std::string SERVER_IP = "127.0.0.1";
const int SERVER_PORT = 9888;
const int NUM_CONNECTIONS = 5; // 同时建立的连接数量
// 粘包处理:发送数据(与服务器一致,先发送4字节长度,再发送数据)
bool sendData(int sock, const std::string& data) {
uint32_t data_len = htonl(data.size()); // 长度转为网络字节序
// 先发送长度
if (send(sock, &data_len, sizeof(data_len), 0) != sizeof(data_len)) {
LOG("连接发送长度失败: " << strerror(errno));
return false;
}
// 再发送数据
if (send(sock, data.c_str(), data.size(), 0) != (ssize_t)data.size()) {
LOG("连接发送数据失败: " << strerror(errno));
return false;
}
return true;
}
// 粘包处理:接收数据(与服务器一致,先接收长度,再接收数据)
std::string recvData(int sock) {
uint32_t data_len;
ssize_t recv_len = recv(sock, &data_len, sizeof(data_len), 0);
if (recv_len <= 0) {
if (recv_len < 0) LOG("连接接收长度失败: " << strerror(errno));
return ""; // 连接断开或错误
}
data_len = ntohl(data_len); // 转为主机字节序
std::string data;
data.resize(data_len);
size_t total_recv = 0;
// 循环接收完整数据(处理数据分片)
while (total_recv < data_len) {
recv_len = recv(sock, &data[total_recv], data_len - total_recv, 0);
if (recv_len <= 0) {
if (recv_len < 0) LOG("连接接收数据失败: " << strerror(errno));
return "";
}
total_recv += recv_len;
}
return data;
}
// 单个连接的处理函数
void handleConnection(int conn_id) {
// 创建客户端套接字
int client_sock = socket(AF_INET, SOCK_STREAM, 0);
if (client_sock == -1) {
LOG("连接 " << conn_id << " 创建套接字失败: " << strerror(errno));
return;
}
// 配置服务器地址
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
// 转换服务器IP为网络字节序(现代用法,支持IPv6扩展)
if (inet_pton(AF_INET, SERVER_IP.c_str(), &server_addr.sin_addr) <= 0) {
LOG("连接 " << conn_id << " IP地址转换失败");
close(client_sock);
return;
}
// 连接服务器
if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
LOG("连接 " << conn_id << " 连接服务器失败: " << strerror(errno));
close(client_sock);
return;
}
LOG("连接 " << conn_id << " 成功连接到服务器 " << SERVER_IP << ":" << SERVER_PORT);
try {
// 数据交互:每个连接发送3条消息
for (int i = 0; i < 3; ++i) {
std::string msg = "客户端连接 " + std::to_string(conn_id) + " 的消息 " + std::to_string(i+1);
if (!sendData(client_sock, msg)) {
break;
}
LOG("连接 " << conn_id << " 发送: " << msg);
// 接收服务器响应
std::string response = recvData(client_sock);
if (response.empty()) {
LOG("连接 " << conn_id << " 接收响应失败或服务器断开");
break;
}
LOG("连接 " << conn_id << " 收到响应: " << response);
// 模拟业务处理延迟
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
} catch (const std::exception& e) {
LOG("连接 " << conn_id << " 异常: " << e.what());
}
// 关闭连接
close(client_sock);
LOG("连接 " << conn_id << " 已关闭");
}
int main() {
LOG("多连接TCP客户端启动,将创建 " << NUM_CONNECTIONS << " 个连接");
std::vector<std::thread> connection_threads;
// 创建多个连接(每个连接一个线程)
for (int i = 0; i < NUM_CONNECTIONS; ++i) {
connection_threads.emplace_back(handleConnection, i + 1); // 连接ID从1开始
// 错开连接创建时间,避免服务器瞬间压力过大
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 等待所有连接线程完成
for (auto& t : connection_threads) {
if (t.joinable()) {
t.join();
}
}
LOG("所有连接已处理完成,客户端退出");
return 0;
}
三、核心优化点详解(服务器+客户端)
1. 线程管理优化:从“动态创建线程”到“线程池”
原代码问题:
原服务器每接一个连接就动态创建线程,线程创建/销毁开销大,高并发下系统资源易耗尽(线程栈内存、CPU调度压力)。
优化方案:
服务器引入线程池(ThreadPool
类),预先创建固定数量线程(示例中4个),通过任务队列循环处理客户端连接,避免频繁创建线程:
// 线程池核心逻辑
class ThreadPool {
private:
std::vector<std::thread> workers; // 固定数量的工作线程
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queue_mutex; // 保护任务队列的互斥锁
std::condition_variable condition; // 唤醒线程的条件变量
bool stop; // 停止标志
public:
ThreadPool(size_t n) : stop(false) {
// 预先创建n个工作线程,循环取任务执行
for (size_t i = 0; i < n; ++i) {
workers.emplace_back([this] {
while (!stop) {
std::function<void()> task;
// 加锁取任务
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task(); // 执行任务(客户端处理逻辑)
}
});
}
}
// 添加任务到队列
void enqueue(std::function<void()> task) { /* ... */ }
};
优势:
- 线程复用:工作线程长期存在,循环处理任务,减少线程创建/销毁开销;
- 资源控制:通过线程池大小(如4个)限制并发线程数,避免资源耗尽。
2. 线程安全优化:解决多线程输出错乱
原代码问题:
多个线程同时调用std::cout
,导致输出内容错乱(cout
非线程安全,多个线程输出可能交叉)。
优化方案:
使用std::mutex
为输出加锁,定义线程安全的LOG
宏:
std::mutex g_cout_mutex; // 全局输出互斥锁
#define LOG(msg) do { \
std::lock_guard<std::mutex> lock(g_cout_mutex); // 自动加锁/解锁 \
std::cout << msg << std::endl; \
} while(0)
效果:
所有线程的输出操作通过互斥锁串行执行,确保日志清晰可读,无交叉错乱。
3. 粘包处理:解决TCP数据边界问题
原代码问题:
TCP是流式传输,若客户端连续发送小消息,服务器可能将多个消息合并为一个“大数据块”,无法区分边界(粘包)。
优化方案:
采用“长度前缀法”明确消息边界,发送数据时先传4字节长度(网络字节序),再传实际数据:
// 服务器/客户端发送数据(统一逻辑)
bool sendData(int sock, const std::string& data) {
uint32_t data_len = htonl(data.size()); // 长度转为网络字节序
send(sock, &data_len, sizeof(data_len), 0); // 先发送长度
send(sock, data.c_str(), data.size(), 0); // 再发送数据
return true;
}
// 服务器/客户端接收数据(统一逻辑)
std::string recvData(int sock) {
uint32_t data_len;
recv(sock, &data_len, sizeof(data_len), 0); // 先接收长度
data_len = ntohl(data_len); // 转为主机字节序
std::string data(data_len, '\0');
recv(sock, &data[0], data_len, 0); // 接收对应长度的数据
return data;
}
效果:
无论消息大小或发送频率如何,接收方都能通过“先读长度,再读数据”准确解析每个消息,彻底解决粘包问题。
4. 优雅关闭:确保资源正确释放
原代码问题:
原服务器若通过Ctrl+C
强制终止,可能导致线程未完成、套接字未关闭,资源泄漏。
优化方案:
- 信号处理:捕获
SIGINT
信号(Ctrl+C
),设置g_running=false
标志; - 唤醒阻塞:关闭监听套接字,唤醒
accept
阻塞; - 线程池等待:线程池析构时等待所有工作线程完成当前任务:
// 信号处理函数
void signalHandler(int signum) {
if (signum == SIGINT) {
LOG("\n收到停止信号,正在优雅关闭服务器...");
g_running = false; // 设置全局停止标志
close(g_listen_sock); // 唤醒accept阻塞
}
}
// main函数中注册信号处理
signal(SIGINT, signalHandler);
// 线程池析构时自动等待所有线程完成
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true; // 通知线程停止
}
condition.notify_all(); // 唤醒所有工作线程
for (auto& worker : workers) worker.join(); // 等待线程结束
}
效果:
服务器收到停止信号后,会完成当前客户端的消息处理,关闭所有套接字,释放线程资源,避免泄漏。
5. 错误处理与鲁棒性优化
原代码问题:
对系统调用错误(如accept
被信号中断、send/recv
失败)处理简单,可能导致程序异常退出。
优化方案:
- 处理
EINTR
错误:accept
等系统调用可能被信号中断(返回-1
,errno=EINTR
),这是正常情况,应忽略而非报错; - 完整的发送/接收循环:
recv
可能只接收部分数据(尤其大数据),通过循环确保接收完整; - 资源释放兜底:所有套接字在
close
前检查有效性,线程任务结束后确保关闭通信套接字。
三、运行效果与总结
运行步骤:
- 编译(需链接线程库):
g++ server.cpp -o server -lpthread g++ client.cpp -o client -lpthread
- 启动服务器,再启动客户端,可观察到:
- 服务器日志清晰,无输出错乱;
- 多个客户端连接同时被处理,无阻塞;
- 消息收发准确,无粘包;
Ctrl+C
停止服务器时,会优雅关闭,无资源泄漏。
核心优化价值:
- 性能:线程池减少线程开销,支持更高并发;
- 可靠性:粘包处理确保数据正确解析,错误处理避免异常退出;
- 可维护性:模块化设计(线程池、粘包工具),逻辑清晰;
- 安全性:线程安全输出、优雅关闭机制确保资源正确管理。
这些优化使代码从“演示级”提升为“生产可用级”,可作为多线程TCP服务器的基础框架扩展。