自制线程池
线程池头文件
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <pthread.h>
#include "queue.h"
typedef void (*EnterFP)(void*);
typedef struct ThreadPool
{
int thread_cnt;//线程数量
pthread_t* tids;//线程id数组
Queue* store;//任务队列
EnterFP enter;//进入线程函数指针
pthread_mutex_t hlock;//队头锁
pthread_mutex_t tlock;//队尾锁
pthread_cond_t empty;//队空条件变量
pthread_cond_t full;//队满条件变量
}ThreadPool;
//初始化线程池
ThreadPool* creat_threadpool(int thread_cnt,int store_cap,EnterFP enter);
//启动线程池
void start_threadpool(ThreadPool* threadpool);
//生产数据
void push_threadpool(ThreadPool* threadpool,void* task);
//消费数据
void* pop_threadpool(ThreadPool* threadpool);
//销毁线程池
void destroy_threadpool(ThreadPool* threadpool);
#endif // THREADPOOL_H
线程池主要函数实现
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include"threadpool.h"
#include"queue.h"
//线程池中的线程入口函数
static void *run(void* arg)
{
ThreadPool* threadpool = (ThreadPool*)arg;
for(;;)
{
//线程就负责消费数据,能返回就意味着从仓库拿到了数据
void *task=pop_threadpool(threadpool);
threadpool->enter(task);
}
}
//初始化线程池
ThreadPool* creat_threadpool(int thread_cnt,int store_cap,EnterFP enter)
{
//申请线程池内存
ThreadPool* threadpool = (ThreadPool*)malloc(sizeof(ThreadPool));
//申请线程id内存
threadpool->tids=malloc(thread_cnt*sizeof(pthread_t));
//初始化线程数量
threadpool->thread_cnt=thread_cnt;
//创建仓库队列
threadpool->store=create_queue(store_cap);
//初始化线程业务逻辑函数
threadpool->enter=enter;
//初始化互斥锁,条件变量
pthread_mutex_init(&threadpool->hlock,NULL);
pthread_mutex_init(&threadpool->tlock,NULL);
pthread_cond_init(&threadpool->full,NULL);
pthread_cond_init(&threadpool->empty,NULL);
return threadpool;
}
//启动线程池
void start_threadpool(ThreadPool* threadpool)
{
for(int i=0;i<threadpool->thread_cnt;i++)
{
pthread_create(threadpool->tids+i,NULL,run,threadpool);
}
}
//生产数据
void push_threadpool(ThreadPool* threadpool,void* task)
{
//队尾加锁
pthread_mutex_lock(&threadpool->tlock);
//如果一直队列满了,就等待
while(full_queue(threadpool->store))
{
//唤醒消费数据的线程
pthread_cond_signal(&threadpool->empty);
//等待生产数据的线程,并解锁队尾
pthread_cond_wait(&threadpool->full,&threadpool->tlock);
}
//生产数据,存入仓库队列
push_queue(threadpool->store,task);
//唤醒消费数据的线程
pthread_cond_signal(&threadpool->empty);
//解锁队尾
pthread_mutex_unlock(&threadpool->tlock);
}
//消费数据
void* pop_threadpool(ThreadPool* threadpool)
{
//队头加锁
pthread_mutex_lock(&threadpool->hlock);
//如果仓库队列空了,就等待
while(empty_queue(threadpool->store))
{
//唤醒生产数据的线程
pthread_cond_signal(&threadpool->full);
//睡眠并解锁队头
pthread_cond_wait(&threadpool->empty,&threadpool->hlock);
}
//消费数据,取出仓库队列
void* task=front_queue(threadpool->store);
pop_queue(threadpool->store);
//唤醒生产数据的线程
pthread_cond_signal(&threadpool->full);
//解锁队头
pthread_mutex_unlock(&threadpool->hlock);
return task;
}
//销毁线程池
void destroy_threadpool(ThreadPool* threadpool)
{
//结束线程池中所有消费者线程
for(int i=0;i<threadpool->thread_cnt;i++)
{
pthread_cancel(threadpool->tids[i]);
}
//销毁互斥锁,条件变量
pthread_mutex_destroy(&threadpool->hlock);
pthread_mutex_destroy(&threadpool->tlock);
pthread_cond_destroy(&threadpool->full);
pthread_cond_destroy(&threadpool->empty);
//销毁仓库队列
destroy_queue(threadpool->store);
//销毁线程池
free(threadpool->tids);
free(threadpool);
}
其中使用的队列工具
头文件部分
#ifndef QUEUE_H
#define QUEUE_H
#include <stdbool.h>
typedef struct Queue
{
void **arr;
int cap;
int front;
int rear;
} Queue;
//创建队列
Queue *create_queue(int cap);
//销毁队列
void destroy_queue(Queue *q);
//入队
void push_queue(Queue *q, void *data);
//出队
void *pop_queue(Queue *q);
//队列是否为空
bool empty_queue(Queue *q);
//队列是否已满
int full_queue(Queue *q);
//取队头元素
void *front_queue(Queue *q);
//取队尾元素
void *rear_queue(Queue *q);
#endif /* QUEUE_H */
具体实现
#include "queue.h"
#include<stdio.h>
#include<stdlib.h>
#include<stdbool.h>
//创建队列
Queue *create_queue(int cap)
{
Queue *queue=malloc(sizeof(Queue));
queue->arr=malloc(cap*sizeof(void*)+1);
queue->cap=cap;
queue->front=0;
queue->rear=0;
return queue;
}
//销毁队列
void destroy_queue(Queue *q)
{
free(q->arr);
free(q);
}
//入队
void push_queue(Queue *q, void *data)
{
q->arr[q->rear++]=data;
q->rear%=q->cap;
}
//出队
void *pop_queue(Queue *q)
{
q->front=(q->front+1)%q->cap;
}
//队列是否为空
bool empty_queue(Queue *q)
{
return (q->front==q->rear);
}
//队列是否已满
int full_queue(Queue *q)
{
return ((q->rear+1)%q->cap==q->front);
}
//取队头元素
void *front_queue(Queue *q)
{
return q->arr[q->front];
}
//取队尾元素
void *rear_queue(Queue *q)
{
return q->arr[(q->rear-1+q->cap)%q->cap];
}
封装线程池
gcc -fpic -c queue.c threadpool.c -->产生queue.o 和 threadpool.o文件
gcc -shared -fpic queue.o threadpool.o -o libthreadpool.so -->生成.so的动态库文件
sudo cp libthreadpool.so /usr/lib -->将动态库文件复制一份到指定目录文件下
sudo cp threadpool.h /usr/include/ -->将头文件复制一份到系统include文件中,下次使用时直接使用#include<threadpool.h>