Linux网络编程day7 线程池and UDP

发布于:2025-05-09 ⋅ 阅读:(16) ⋅ 点赞:(0)

线程池

typedef struct{
    void*(*function)(void*);        //函数指针,回调函数
    void*arg;                       //上面函数的参数
}threadpool_task_t;                 //各子线程任务的结构体


/*描述线程池相关信息*/

struct threadpool_t{
    pthread_mutex_t lock;           // 用于锁住本结构体
    pthread_mutex_t thread_counter; //记录忙状态线程个数的锁 -- bust_thr_num

    pthread_cond_t queue_not_full;  //当任务队列满时 , 添加任务的线程阻塞 , 等待此条件变量
    pthread_cond_t queue_not_empty; //任务队列不为空时,通知等待任务的线程

    pthread_t *threads;             //存放线程池中每个线程的Tid . 数组
    pthread_t adjust_tid;           //存管理线程的tid
    threadpool_task_t *task_queue;  //任务队列--数组首地址

    int min_thr_num;                //线程池最小线程数
    int max_thr_num;                //线程池最大线程数
    int live_thr_num;               //当前存活线程个数
    int busy_thr_num;               //忙状态线程个数
    int wait_exit_thr_num;          //要销毁的线程个数

    int queue_front;                //task_queue队头下标
    int queue_rear;                 //task_queue队尾下标
    int queue_size;                 //task_queue队中实际任务数
    int queue_max_size;             //task_queue队列可容纳任务数上限

    int shutdown;                   //标志位,线程池使用状态,true or false
};

线程池模块分析

1、main():创建线程池

                   向线程池中添加任务,借助回调处理任务

                   销毁线程池

int main(void)
{
    //threadpool_t * threadpool_create(int min_thr_num , int max_thr_num , int queue_max_size);
    threadpool_t *thp = threadpool_create(3 , 100 , 100);//创建线程池,最大数量100,最小数量3 ,任务队列最
大容量100.
    printf("pool inited");

    int num[20] , i;  //模拟客户端向服务器发送数据等场景
    for(i = 0; i < 20 ; i++){
        num[i] = i;
        printf("add task %d\n" , i);
        //int threadpool_add(threadpool_t *pool , void*(*function)(void*arg) , void arg);
        threadpool_add(thp , process , (void*)&num[i]); //向线程池中添加任务
    }
    sleep(10);  //等待子线程完成任务
    threadpool_destroy(thp);

    return 0 ;
}

2、pthreadpool_create:创建线程池结构体指针

                                        初始化线程池结构体中N个成员变量

                                        创建N个任务线程

                                        创建1个管理者线程

                                        失败时 , 释放空间

threadpool_t* threadpool_create(int min_thr_num , int max_thr_num , int queue_max_size)
{
    int i ;
    struct threadpool_t *pool = NULL; // 线程池 结构体

    do{
        if((pool = (struct threadpool_t*)malloc(sizeof(struct threadpool_t))) == NULL){
            printf("malloc threadpool fail");
            break;
        }
        pool->min_thr_num = min_thr_num;
        pool->max_thr_num = max_thr_num;
        pool->busy_thr_num = 0;
        pool->live_thr_num = min_thr_num; //活着的线程数 初值=最小线程数
        pool->wait_exit_thr_num = 0;
        pool->queue_size = 0; //有0个产品
        pool->queue_max_size = queue_max_size;//最大任务队列数
        pool->queue_front = 0;
        pool->queue_rear = 0;
        pool->shutdown = false; // 不关闭线程池
        /*根据最大线程上线数 , 给工作线程数组开辟空间,清零*/
        pool->threads = (pthread*)malloc(sizeof(pthread_t)*max_thr_num);
        if(pool->threads == NULL){
            printf("malloc threads fail");
            break;
        }
        memset(pool->threads , 0 , sizeof(pthread_t)*max_thr_num);
        /*给任务队列开辟空间 */
        pool->task_queue = (threadpool_task_t*)malloc(sizeof(threadpool_task_t)*queue_max_size);
        if(pool->task_queue == NULL){
            printf("malloc task_queue fail");
            break;
        }
        /*初始化互斥锁、条件变量 , 使用init动态初始化 , 加上进行返回值判断*/
        if(pthread_mutex_init((&pool->lock) , NULL) != 0
                || pthread_mutex_init(&(pool->thread_counter) , NULL) != 0
                || pthread_cond_init(&(pool->queue_not_empty) , NULL) != 0
                || pthread_cond_init(&(pool->queue_not_full) , NULL) != 0)
        {
            printf("init the lock or cond fail");
            break;
        }
        /*启动min_thr_num个work thread*/
        for(i = 0 ; i < min_thr_num ; i++){
            pthread_create(&(pool->threads[i]) , NULL , threadpool_thread , (void*)pool);//pool指向当前线>程池
            printf("stat thread 0x%x...\n" , (unsigned int)pool->threads[i]);
        }
        pthread_create(&(pool->adjust_tid) , NULL , adjust_thread , (void*)pool);//创建管理者线程

        return pool;
    }while(0);
    threadpool_free(pool); // 前面代码调用失败时,释放pool空间
    return NULL;
}

3、threadpool_thread():进入子线程回调函数。

                                         接收参数(void*)arg

                                         加锁--》lock--》整个结构体的锁

                                         判断条件变量--》wait

/* 线程池中各个工作线程 */
void* threadpool_thread(void* threadpool)
{
    struct threadpool_t *pool = (struct threadpool_t*)threadpool;
    threadpool_task_t task;//任务队列对象

    while(true){
        /*刚创建出线程,等待任务队列里面有队列 ,否则阻塞等待任务队列李有任务后再唤醒接收任务*/
        pthread_mutex_lock(&(pool->lock));
        //queue_size = 0说明没有任务,调用wait函数阻塞在条件变量上,若有任务,跳过while
        while((pool->queue_size == 0) && (!pool->shutdown)){
            printf("thread 0x%x is waiting\n" , (unsigned int)pthread_self());
            pthread_cond_wait(&(pool->queue_not_empty) , &(pool->lock));
            //清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程
            if(pool->wait_exit_thr_num > 0 ){
                pool->wait_exit_thr_num--;
                //如果线程池里线程个数大于最小值时可以结束当前线程
                if(pool->live_thr_num > pool->min_thr_num){
                    printf("thread 0x%x is exiting\n" , (unsigned int)pthread_self());
                    pool_live_thr_num--;
                    pthread_mutex_unlock(&(pool->lock));
                    pthread_exit(NULL);
                }
            }
            //指定true,要关闭线程池里的每个线程,自行退出-->销毁线程池
            if(pool->shutdown){
                pthread_mutex_unlock(&(pool->lock));
                printf("thread 0x%x is exiting\n" , (unsigned int)pthread_self());
                pthread_detach(pthread_self());
                pthread_exit(NULL); // 线程自行结束
            }
            //从任务队列获取任务,出队操作
            task.function = pool->task_queue[pool->queue_front].function;
            task.arg = pool->task_queue[pool->queue_front].arg;

            pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; //出队,模拟环形
            pool->queue_size--;
            //通知可以有新的任务添加进来
            pthread_cond_broadcast(&(pool->queue_not_full));
            //任务取出后立即将线程池锁释放
            pthread_mutex_unlock(&(pool->lock));
            //执行任务
            printf("thread 0x%x stat working\n" , (unsigned int)pthread_self());
            pthread_mutex_lock(&(pool->thread_counter));    //忙状态线程数变量锁
            pool->busy_thr_num++;                           //忙状态线程数+1
            pthread_mutex_unlock(&((pool->thread_counter));

            (*(task.function))(task.arg);//执行回调函数
            //任务结束处理
            printf("thread 0x%x end working\n" , (unsigned int)pthread_self());
            pthread_mutex_lock(&(pool->thread_counter));
            pool->busy_thr_num--;                           //处理掉任务,忙状态线程数-1
            pthread_mutex_unlock(&(pool->thread_counter));
        }
    pthread_exit(NULL);
}

4、adjust_thread():进入管理者线程回调函数                                 

                                 循环10s执行一次

                                 接收参数(void*)arg

                                 加锁--》lock--》整个结构体的锁

                                 获取管理线程时需要用到的变量:live busy queue task

                                 根据既定算法,使用上述3变量判断是否应该创建、销毁线程池中的指定步长的线程。

void* adjust_thread(void* threadpool)
{
    int i ;
    struct threadpool_t *pool = (struct threadpool_t*)threadpool;
    while(!pool->shutdown){

        sleep(DEFAULT_TIME); //定时对线程池管理

        pthread_mutex_lock(&(pool->lock));
        int queue_size = pool->queue_size;
        int live_thr_num  = pool->live_thr_num;
        pthread_mutex_unlock(&(pool->lock));

        pthread_mutex_lock(&(pool->thread_counter));
        int busy_thr_num = pool->busy_thr_num;
        pthread_mutex_unlock(&(pool->pthread_counter));
        //创建新线程,任务数大于最小线程池个数,且存活线程数少于最大线程数
        if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num){
            pthread_mutex_lock(&(pool->lock));
            int add = 0;
            //一次增加DEFAULT_THREAD个线程
            for(i = 0 ; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
                    && pool->live_thr_num < pool_max_thr_num ; i++)
            {
                pthread_create(&(pool->thread[i]) , NULL , threadpool_thread , (void*)pool);
                add++;
                pool->live_thr_num++;
            }
            pthread_mutex_unlock(&(pool->lock));
        }

        if((busy_thr_num *2) < live_thr_num && live_thr_num > pool->min_thr_num){

            pthread_mutex_lock(&(pool->lock));
            pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;
            pthread_mutex_unlock(&(pool->lock));

            for(i = 0 ; i < DEFAULT_THREAD_VARY ; i++){
                pthread_cond_signal(&(pool->queue_not_empty));
            }
        }

    }
    return NULL;
}

5、threadpool_add:模拟产生任务 num[20]

                                  设置回调函数,处理任务sleep(1)代表处理完成

                                  初始化任务队列结构体成员 回调函数和arg

                                  利用环形队列机制实现添加任务,借助队尾指针

                                  唤醒阻塞在条件变量上的线程

//线程池中的线程,模拟处理业务
void* process(void*arg)
{
    printf("thread 0x%x working on task %d\n" , (unsigned int)pthread_self() , (int)arg);
    sleep(1);
    printf("task %d is end\n" , (int)arg);
    return NULL
}
int threadpool_add(struct threadpool_t *pool , (void*)(**function)(void*arg) , (void*)arg)
{
    pthread_mutex_lock(&(pool->lock));
    //为真 , 队列已满 , 调用wait阻塞
    while((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)){
        pthread_cond_wait(&(pool->queue_not_full) , &(pool->lock));
    }
    if(pool->shutdown){
        pthread_cond_broadcast(&(pool->queue_not_empty));
        pthread_mutex_unlock(&(pool->lock));
        return 0 ;
    }
    //清空工作线程 调用的回调函数 的参数
    if(pool->task_queue[pool->queue_rear].arg != NULL){
        pool->task_queue[pool->queue_rear].arg = NULL;
    }
    //添加任务到任务队列
    pool->task_queue[pool->queue_rear].function = function;
    pool->task_queue[pool->queue_rear].arg = arg;
    pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;//队尾指针移动,模拟环形
    pool->queue_size++;//向任务队列中添加一个任务

    //添加完任务后,队列不为空,唤醒线程池中等待处理任务的线程
    pthread_cond_signal(&(pool->queue_not_empty));
    pthread_mutex_unlock(&(pool->lock));

    return 0 ;
}

6、从3中wait之后执行,处理任务:获取任务处理回调函数及参数

                        利用环形队列机制实现处理任务,借助队头指针

                        唤醒阻塞在条件变量上的server

                        修改忙线程数量++

                        执行处理任务线程

                        修改忙线程数量--

7、创建和销毁线程:管理者线程根据上述三个参数判断是否创建、销毁

                                   满足创建条件pthread_create()回调任务线程函数

                                   满足销毁条件wait_exit_thr_num赋值,signal给阻塞在条件变量上的线程发送假条件满足信号,跳转至wait阻塞,阻塞线程会被假信号唤醒,使用pthread_exit。

int threadpool_destroy(threadpool_t *pool)
{
    int i;
    if(pool == NULL)
        return -1;
    pool->shutdown = true;
    pthread_join(pool->adjust_tid , NULL);
    for(i = 0 ; i < pool->live_thr_num ; i++){
        pthread_cond_broadcast(&(pool->queue_not_empty));
    }
    for(i = 0; i < pool->live_thr_num ; i++){
        pthread_join(pool->threads[i] , NULL);
    }
    threadpool_free(pool);
    return 0;
}
int threadpool_free(threadpool_t *pool)
{
    if(pool == NULL)
        return -1;
    if(pool->task_queue)
        free(pool->tast_queue);
    if(pool->threads){
        free(pool->threads);
        pthread_mutex_lock(&(pool->lock));
        pthread_mutex_destroy(&(pool->lock));
        pthread_mutex_lock(&(pool->thread_counter));
        pthread_mutex_destroy(&(pool->thread_counter));
        pthread_cond_destroy(&(pool->queue_not_full));
        pthread_cond_destroy(&(pool->queue_not_empty));
    }
    free(pool);
    pool = NULL;
    return 0;
}

UDP服务器

TCP通信和UDP通信的优缺点

TCP

面向连接的,可靠数据包传输。对于不稳定的网络层,采取完全弥补的通信方式,丢包重传。

优点:稳定 数据流量稳定、速度稳定、顺序

缺点:传输速度慢、效率低,资源开销大。

使用场景:数据完整要求性较高,不追求效率

                  大数据传输、文件传输。

UDP

无连接的,不可靠的数据报传递。对于不稳定的网络层,采取完全不弥补的通信方式,默认还原网络状况。

优点:传输速度快,效率高,资源开销小。

缺点:不稳定 数据流量、速度不稳定,顺序不稳定

使用场景:对时效性要求较高场合。稳定性其次。

                  游戏、视频会议、视频电话。     

----腾讯、华为、阿里 -- 应用层添加数据校验协议,弥补UDP的不足

UDP实现的C/S模型

无三次握手建立连接,故没有accept()、connect()

recv()/send()只能用于TCP通信

server

server:
lfd = socket(AF_INET , SOCK_DGRAM , 0);   SOCK_DGRAM--->报式协议
bind();
listen(); ----可有可无
while(1){  
    //不使用read函数
    recvfrom() //涵盖accept函数中的传出地址结构
    sendto();
}
close();

client

cfd = socket(AF_INET , SOCK_DGRAM , 0);

sendto("服务器地址结构" , 地址结构大小)

recvfrom()
写屏幕
close()

recvfrom函数

ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);
socket:lfd
buf:缓冲区地址
len:缓冲区大小
flags:0
src_addr:传出参数,传出对端地址结构
src_addr:传入传出

返回值:成功接收数据字节数
失败-1 errno   0对端关闭

sendto函数                 

ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
socket:套接字
buf:存储数据的缓冲区
len:数据长度
flags:0
dest_addr:传入参数,目标地址结构
src_addr:地址结构长度

返回值:成功写出数据字节数
失败-1 errno   

udp server端模型

#include<stdlib.h>
#include<stdio.h>
#include<unistd.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<ctype.h>
#define SERV_PORT 9004
void sys_err(char*s)
{
    perror(s);
    exit(1);
}

int main(int argc , char*argv[])
{
    int sockfd , i , n;
    char buf[BUFSIZ] , str[INET_ADDRSTRLEN];
    struct sockaddr_in serv_addr , clit_addr;
    socklen_t clitlen;
    bzero(&serv_addr , sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(SERV_PORT);
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    sockfd  = socket(AF_INET , SOCK_DGRAM , 0);
    if(sockfd == -1)
        sys_err("socket error");
    bind(sockfd , (struct sockaddr*)&serv_addr , sizeof(serv_addr));
    printf("Accepting connections-----");
    while(1){
        clitlen = sizeof(clit_addr);
        n = recvfrom(sockfd , buf , strlen(buf) , 0 , (struct sockaddr*)clit_addr
        , &clitlen);
        if(n == -1)
            sys_err("recvfrom error");
        printf("received from %s at port%d\n" , 
        inet_ntop(AF_INET , &clit_addr.sin_addr , str , sizeof(str)),
        ntohs(clit_addr.sin_port));
        for(i = 0 ; i < n ; i++)
            buf[i] = toupper(buf[i]);
        n = sendto(sockfd , buf , n , 0 , (struct sockaddr*)&clit_addr , sizeof(clit_addr));
        if(n == -1)
            sys_err("sendto error");
    }
    close(sockfd);
    return 0 ;
}

udp client端模型

#include<stdlib.h>
#include<stdio.h>
#include<unistd.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<ctype.h>
#define SERV_PORT 9004

void sys_err(char*s)
{
    perror(s);
    exit(1);
}

int main(int argc , char*argv[])
{
    int sockfd , i , n;
    char buf[BUFSIZ] ;
    struct sockaddr_in serv_addr;
    
    bzero(&serv_addr , sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(SERV_PORT);
    inet_pton(AF_INET , "xx.xx.x.xxx" , &serv_addr.sin_addr.s_addr);
    sockfd  = socket(AF_INET , SOCK_DGRAM , 0);
    if(sockfd == -1)
        sys_err("socket error");
    while((fgets = (buf , BUFSIZ , stdin)) != NULL){
        n = sendto(sockfd , buf , strlen(buf) , 0 ,(struct sockaddr*)&serv_addr , sizeof(serv_addr));
        if(n == -1)
            sys_err("sendto error");
        n = recvfrom(sockfd , buf , BUFSIZ , 0 , NULL , 0);
        if(n == -1)
            sys_err("recvfrom error"); 
        write(STDOUT_FILENO , buf , n); 
    }

    close(sockfd);
    return 0 ;
}


网站公告

今日签到

点亮在社区的每一天
去签到