自制线程池

发布于:2024-10-12 ⋅ 阅读:(132) ⋅ 点赞:(0)

自制线程池

线程池头文件

#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <pthread.h>
#include "queue.h"

typedef void (*EnterFP)(void*);

typedef struct ThreadPool
{
    int thread_cnt;//线程数量
    pthread_t* tids;//线程id数组
    Queue* store;//任务队列
    EnterFP enter;//进入线程函数指针
    pthread_mutex_t hlock;//队头锁
    pthread_mutex_t tlock;//队尾锁
    pthread_cond_t empty;//队空条件变量
    pthread_cond_t full;//队满条件变量
}ThreadPool;

//初始化线程池
ThreadPool* creat_threadpool(int thread_cnt,int store_cap,EnterFP enter);

//启动线程池
void start_threadpool(ThreadPool* threadpool);
//生产数据
void push_threadpool(ThreadPool* threadpool,void* task);
//消费数据
void* pop_threadpool(ThreadPool* threadpool);
//销毁线程池
void destroy_threadpool(ThreadPool* threadpool);

#endif // THREADPOOL_H

线程池主要函数实现

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include"threadpool.h"
#include"queue.h"

//线程池中的线程入口函数
static void *run(void* arg)
{

    ThreadPool* threadpool = (ThreadPool*)arg;
    for(;;)
    {
        //线程就负责消费数据,能返回就意味着从仓库拿到了数据
        void *task=pop_threadpool(threadpool);
        threadpool->enter(task);
    }
}


//初始化线程池
ThreadPool* creat_threadpool(int thread_cnt,int store_cap,EnterFP enter)
{ 
    //申请线程池内存
    ThreadPool* threadpool = (ThreadPool*)malloc(sizeof(ThreadPool));
    //申请线程id内存
    threadpool->tids=malloc(thread_cnt*sizeof(pthread_t));
    //初始化线程数量
    threadpool->thread_cnt=thread_cnt;
    //创建仓库队列
    threadpool->store=create_queue(store_cap);
    //初始化线程业务逻辑函数
    threadpool->enter=enter;
    //初始化互斥锁,条件变量
    pthread_mutex_init(&threadpool->hlock,NULL);
    pthread_mutex_init(&threadpool->tlock,NULL);
    pthread_cond_init(&threadpool->full,NULL);
    pthread_cond_init(&threadpool->empty,NULL);
    return threadpool;
}
//启动线程池
void start_threadpool(ThreadPool* threadpool)
{ 
    for(int i=0;i<threadpool->thread_cnt;i++)
    {  
        pthread_create(threadpool->tids+i,NULL,run,threadpool);
    }
}
//生产数据
void push_threadpool(ThreadPool* threadpool,void* task)
{ 
    //队尾加锁
    pthread_mutex_lock(&threadpool->tlock);
    //如果一直队列满了,就等待
    while(full_queue(threadpool->store))
    {
        //唤醒消费数据的线程
        pthread_cond_signal(&threadpool->empty);
        //等待生产数据的线程,并解锁队尾
        pthread_cond_wait(&threadpool->full,&threadpool->tlock);
    }
    //生产数据,存入仓库队列
    push_queue(threadpool->store,task);
    //唤醒消费数据的线程
    pthread_cond_signal(&threadpool->empty);
    //解锁队尾
    pthread_mutex_unlock(&threadpool->tlock);
}
//消费数据
void* pop_threadpool(ThreadPool* threadpool)
{ 
    //队头加锁
    pthread_mutex_lock(&threadpool->hlock);
    //如果仓库队列空了,就等待
    while(empty_queue(threadpool->store))
    {
        //唤醒生产数据的线程
		pthread_cond_signal(&threadpool->full);
		//睡眠并解锁队头
		pthread_cond_wait(&threadpool->empty,&threadpool->hlock);
    }
	//消费数据,取出仓库队列
    void* task=front_queue(threadpool->store);
	 pop_queue(threadpool->store);
    //唤醒生产数据的线程
	pthread_cond_signal(&threadpool->full);

    //解锁队头
	pthread_mutex_unlock(&threadpool->hlock);
    return task;
}
//销毁线程池
void destroy_threadpool(ThreadPool* threadpool)
{ 
    //结束线程池中所有消费者线程
    for(int i=0;i<threadpool->thread_cnt;i++)
    {  
        pthread_cancel(threadpool->tids[i]);
    }
    //销毁互斥锁,条件变量
    pthread_mutex_destroy(&threadpool->hlock);
    pthread_mutex_destroy(&threadpool->tlock);
    pthread_cond_destroy(&threadpool->full);
    pthread_cond_destroy(&threadpool->empty);
    //销毁仓库队列
    destroy_queue(threadpool->store);
    //销毁线程池
    free(threadpool->tids);
    free(threadpool);
}

其中使用的队列工具

头文件部分
#ifndef QUEUE_H
#define QUEUE_H
#include <stdbool.h>
typedef struct Queue
{
    void **arr;
    int cap;
    int front;
    int rear;
} Queue;

//创建队列
Queue *create_queue(int cap);

//销毁队列
void destroy_queue(Queue *q);

//入队
void push_queue(Queue *q, void *data);

//出队
void *pop_queue(Queue *q);

//队列是否为空
bool empty_queue(Queue *q);

//队列是否已满
int full_queue(Queue *q);

//取队头元素
void *front_queue(Queue *q);

//取队尾元素
void *rear_queue(Queue *q);
#endif /* QUEUE_H */
具体实现
#include "queue.h"
#include<stdio.h>
#include<stdlib.h>
#include<stdbool.h>
//创建队列
Queue *create_queue(int cap)
{
    Queue *queue=malloc(sizeof(Queue));
    queue->arr=malloc(cap*sizeof(void*)+1);
    queue->cap=cap;
    queue->front=0;
    queue->rear=0;
    return queue;
}

//销毁队列
void destroy_queue(Queue *q)
{
    free(q->arr);
    free(q);
}

//入队
void push_queue(Queue *q, void *data)
{  
    q->arr[q->rear++]=data;
    q->rear%=q->cap;
}

//出队
void *pop_queue(Queue *q)
{  
    q->front=(q->front+1)%q->cap;
}

//队列是否为空
bool empty_queue(Queue *q)
{  
    return (q->front==q->rear);
}

//队列是否已满
int full_queue(Queue *q)
{  
    return ((q->rear+1)%q->cap==q->front);
}

//取队头元素
void *front_queue(Queue *q)
{  
    return q->arr[q->front];
}

//取队尾元素
void *rear_queue(Queue *q)
{  
    return q->arr[(q->rear-1+q->cap)%q->cap];
}

封装线程池

gcc -fpic -c queue.c threadpool.c -->产生queue.o 和 threadpool.o文件

gcc -shared -fpic queue.o threadpool.o -o libthreadpool.so -->生成.so的动态库文件

sudo cp libthreadpool.so /usr/lib -->将动态库文件复制一份到指定目录文件下
sudo cp threadpool.h /usr/include/ -->将头文件复制一份到系统include文件中,下次使用时直接使用#include<threadpool.h>


网站公告

今日签到

点亮在社区的每一天
去签到