目标
实现一个服务器程序,能够同时和多个客户端建立连接。
使用线程池,使用可以复用的线程,避免创建和销毁线程时的资源浪费,优化连接速度。
线程池实现
原理
线程池会提前创建出几个工作线程,
需要执行的任务,会被放入线程池的任务队列中
这几个工作线程会等待条件唤醒(pthread_cond),一旦任务队列中存在任务,那么就会唤醒,同时执行任务队列中元素的函数指针。
另外根据工作负载的不同,线程池会动态的添加会减小线程数量以应对不同情况(为了简化,本篇文章不涉及动态添加和销毁线程)。
实现
- 数据结构
typedef struct ThreadPool{
int max;
int min;
Task* TaskQueue;
int queueFront;
int queueRear;
int queueSize;
int queueCap;
pthread_t* ThreadID;
pthread_t manager_ThreadID;
pthread_mutex_t threadpool_mutex;
pthread_cond_t notFull;
pthread_cond_t notEmpty;
}ThreadPool;
- 任务队列
任务队列实际上是一个环形队列,队列的每个元素包括要执行的函数指针和传入函数的参数指针。
void queueAdd(ThreadPool* threadpool,Task* task){
pthread_mutex_lock(&threadpool->threadpool_mutex);
if(threadpool->queueSize == threadpool->queueCap){
pthread_cond_wait(&threadpool->notFull,&threadpool->threadpool_mutex);
}
threadpool->TaskQueue[threadpool->queueRear].function = task->function;
threadpool->TaskQueue[threadpool->queueRear].arg = task->arg;
threadpool->queueRear = (threadpool->queueRear + 1)%threadpool->queueCap;
threadpool->queueSize++;
pthread_mutex_unlock(&threadpool->threadpool_mutex);
pthread_cond_signal(&threadpool->notEmpty);
}
- 工作线程
void* worker(void* arg){
ThreadPool* threadpool = (ThreadPool*)arg;
while(1){
Task task;
pthread_mutex_lock(&threadpool->threadpool_mutex);
while(threadpool->queueSize == 0){
pthread_cond_wait(&threadpool->notEmpty,&threadpool->threadpool_mutex);
}
task.function = threadpool->TaskQueue[threadpool->queueFront].function;
task.arg = threadpool->TaskQueue[threadpool->queueFront].arg;
threadpool->queueFront = (threadpool->queueFront + 1)%threadpool->queueCap;
threadpool->queueSize--;
pthread_mutex_unlock(&threadpool->threadpool_mutex);
task.function(task.arg);
pthread_cond_signal(&threadpool->notFull);
}
return NULL;
}
socket服务端实现
流程:
- 创建套接字
- 配置属性
- 绑定端口和ip
- 监听端口
- 等待连接
- read/write
初始化
void server_init(Socket_Info* info){
info->lfd=socket(AF_INET, SOCK_STREAM, 0);
if(info->lfd == -1)
{
perror("socket");
exit(0);
}
info->sock = (struct sockaddr*)malloc(sizeof(struct sockaddr_in));
info->sock->sin_family = AF_INET;
info->sock->sin_port = htons(10003); // 大端端坣
info->sock->sin_addr.s_addr = INADDR_ANY;
int ret = bind(info->lfd, info->sock, sizeof(struct sockaddr_in));
if(ret == -1)
{
perror("bind");
exit(0);
}
// 3. 设置监坬
ret = listen(info->lfd, LISTEN_NUM);
if(ret == -1)
{
perror("listen");
exit(0);
}
}
连接和处理
while(1){
SubSocketInfo* subinfo;
for(int i=0; i<MAX_CONNECT; ++i){
if(infos[i].cfd == -1){
subinfo = &infos[i];
}
}
int cfd = accept(socket->lfd, (struct sockaddr*)&subinfo->addr, &clilen);
subinfo->cfd = cfd;
if(cfd == -1)
{
perror("accept");
break;
}
Task task;
task.arg = subinfo;
task.function = server_progress;
queueAdd(pool,&task);
}
void* server_progress(void* arg){
SubSocketInfo* infos_ptr = (SubSocketInfo*)arg;
char ip[24] = {0};
printf("thread id: %ld 客户端的IP地址: %s, 端口: %d\n",
pthread_self(),
inet_ntop(AF_INET, &infos_ptr->addr.sin_addr.s_addr, ip, sizeof(ip)),
ntohs(infos_ptr->addr.sin_port));
while(1){
char buf[1024];
memset(buf, 0, sizeof(buf));
int len = read(infos_ptr->cfd, buf, sizeof(buf));
if(len > 0)
{
printf("thread id: %ld 客户端say: %s\n",pthread_self(), buf);
write(infos_ptr->cfd, buf, len);
}
else if(len == 0)
{
printf("客户端断开了连接...\n");
break;
}
else
{
perror("read");
break;
}
}
close(infos_ptr->cfd);
infos_ptr->cfd = -1;
}
配合实现
每次建立连接后,将服务端的处理函数和参数添加进线程池的任务队列中即可完成最简易的基于线程池的并发socket服务端函数。