线程池分析与设计

发布于:2025-08-09 ⋅ 阅读:(15) ⋅ 点赞:(0)

线程池

基本功能接口

C++11 及以后的标准中,std::packaged_taskstd::future是并发编程中用于任务封装和结果获取的重要组件,它们通常与线程配合使用,实现异步操作。

std::packaged_task

std::packaged_task:封装可调用对象为异步任务,它是一个模板类,用于封装任何可调用对象(包括函数、lambda、函数对象等),并且它还需要与std::future关联使用,当std::packaged_task被执行时,其中封装的任务也会运行,结果会存储在内部,和这个std::packaged_task关联的std::future可以进行调用。

核心功能
  1. 异步任务封装:将任务(在线程池中每个任务起始就是一个函数)打包起来,让它可以异步执行
  2. std::future绑定:因为每一个std::packaged_task()会对应一个std::future,所以在std::packaged_task执行之后,其中的任务也运行了,结果就存储在内部,等待std::future通过get_future()调用.
  3. 执行任务:可以直接通过operator()调用,也可传递给线程执行(std::thread接收std::packaged_task()当参数就行)
简单函数的示例

这里先给出一个简单的小示例,之后会结合线程池进行阐述

#include <future>
#include <thread>
#include <iostream>

int add(int a, int b) {
    return a + b;
}

int main()
{
    // 1、首先需要封装任务-->这里是封装add(),需要一个int做返回值以及两个int参数
    std::packaged_task<int(int, int)> task(add);
    
    // 2、和std::future绑定   异步任务task调用get_future进行绑定
    std::future<int> f = task.get_future();
    
    // 3、在一个线程中执行这个封装好的异步任务
    std::thread t(std::move(task), 10, 20);
    // 这里需要强调一下,封装后的异步任务不可复制,只能进行移动,所以再传给线程做参数时,只能使用std::move()
    
    // 因为这里是异步执行的,所以主线程可以执行其他任务
    
    // 现在获取这个线程中执行的结果
    // future() 具有唯一性,使用get()获取一次之后,就不能获取第二次了,如果结果没有就绪就阻塞等待结果就绪
    int res = f.get();	
    
    // 现在就可以打印获得的这个结果了
    std::cout << "res = " << res << std::endl;
    
    // 在创建线程之后,必须 join或者detach,否则就会出现错误
    t.join();
    return 0;
}



那么接下来是关于std::future获取结果

std::future

用于获取异步操作(线程、任务)执行的结果,可以理解为一种类似于“未来结果的占位符”,因为你启动一个异步线程时,可能无法立即得到结果,但是可以使用std::future对象在未来某个时刻获取结果。

核心功能
  1. 可以通过get()方法获取异步操作得到的结果(返回值),如果在调用get()时,异步操作还未完成,那么就会阻塞当前线程等待有结果产生。
  2. 可以通过valid()判断future是否与一个有效的异步操作关联成功,可以通过wait()阻塞等待结果,也可以通过wait_for()wait_until()等待指定时长之后返回状态。

上方已经给出了示例用法,都是一样的,这里就不给了,待会直接上线程池相关的示例。

任务入队操作

当有新任务到来时,任务会被添加到任务队列中,这个过程中,需要先获取互斥锁,保证任务队列的线程安全,添加任务后,通过条件变量通知等待的线程有新任务到来。我这里将任务划分成了不带返回值的普通任务和带返回值的任务,其中带返回值的任务使用异步封包的方式进行封装,分别如下:

带返回值的异步任务提交到任务队

步骤:

  1. 通过std::bind()std::make_shared()创建一个包装了任务的std::package_task
  2. 获取其对应的std::future用于获取任务执行结果
  3. 在临界区内(加锁)将任务添加到任务队列tasks
  4. 通知一个等待的线程有新任务

以下是线程池提交带有返回值的任务的示例过程

template<typename F, typename... Args>
auto SubmitRetTask(F&& func, Args... args) -> std::future<std::invoke_result_t<F, Args...>> {
    // 首先定义返回类型
    auto ret_type = std::invoke_result_t<F, Args...>;
    // 状态判断
    if (is_shuntdown_.load() || !is_available_.load()) {
        // 返回一个控制
        return std::future<ret_type>();
    }
    
    // 开始封装异步任务
    auto bind_task = std::bind(std::forward<F>(func), std::forward<Args>(args...));
    auto task = std::make_shared<std::packaged_task<ret_type>()>(std::move(bind_task));
    std::future<ret_type> res = task.get_future();
    
    {
        // 在临界区加锁,将任务添加到任务队列中
        std::lock_guard<std::mutex> lock(task_mutex_);
        tasks_.emplace([task](){
            (*task)();
        });
    }
    task_cv_.notify_one();
    return res;
}

// 用到的成员变量
std::queue<std::function<void()> tasks_;	// 任务队列
std::atomic<bool> is_shutdown_;			// 线程是否关闭
std::atomic<bool> is_available_;		// 线程池是否还有效

std::mutex task_mutex_;					// 任务锁
std::condition_variable task_cv_;		// 条件变量,用于阻塞任务

不带返回值的普通任务

template<typename F, typename... Args>
void SubmitTask(F&& func, Args... args) {
    // 终止条件
    if (is_shutdown_.load() || !is_available_.load()) {
        return;
    }
    
    // 封装任务
    auto task = std::bind(std::forward<F>(func), std::forward<Args>(args...));
    {
        std::lock_guard<std::mutex> lock(task_mutex_);
        tasks.emplace([task](){
            task();		// 调用对应的任务
        });
    }
    
    // 唤醒一个阻塞中的线程
    task_cv_.notify_one();
}

所以可以看出,起始线程池中任务的提交过程整体思路都是一致的,只是有返回值的提交上,添加了std::packaged_taskstd::future来做异步任务的封装而已。

工作线程取出任务执行过程

在工作线程开启之后,需要去任务队列中取出任务然后执行。主要的过程是,获取互斥锁保证资源的互斥访问,然后检查任务队列是否为空,如果为空,就需要通过条件变量阻塞,等待任务添加进来。获取到任务之后就会执行任务,执行完毕马上继续获取任务,除非线程池停止并且任务队列为空。

主要的过程如下:

  1. 由于每次都会取出一个任务task,每个任务都是一个函数std::function<void()>
  2. 无限循环,一直访问任务队列,直到线程池停止,然后任务队列为空
  3. 取出任务队列中的任务,执行

我的取出任务的接口函数

成员变量信息

using ThreadPtr = std::shared_ptr<std::thread>;
using Task = std::function<void()>;

// 一个线程信息结构体,包含管理线程的智能指针
struct ThreadInfo {
    ThreadInfo();
    ~ThreadInfo();
    
    ThreadPtr ptr{nullptr};
}

// 每一个线程的信息都是有一个智能指针来管理
using ThreadInfoPtr = std::shared_ptr<ThreadInfo>;

// 线程数组
std::vector<ThreadInfoPtr> work_threads_;

添加线程函数

void ThreadPool::AddThread() {
    // 先从任务队列中取出一个任务
    auto func = [this]() {
        while (true) {
            Task task;
            {
                // 首先获取互斥锁
                std::unique_lock<std::mutex> lock(task_mutex_);
                // 通过条件变量等待条件满足
                task_cv_.wait(lock, [this](){
                    return is_shutdown_.load() || !tasks.empty();
                });
                
                if (is_shutdown_.load() && tasks.empty()) {
                    return;
                }
                
                // 取出任务
                task = std::move(tasks.front());
                tasks.pop();
            }
            task();
        }
    };
    
    // 将取出来的任务封装到线程中添加到线程池
    ThreadInfoPtr thread_ptr = std::shared_ptr<std::thread>();
    thread_ptr->ptr = std::make_shared<ThreadInfo>(std::move(func));
    // 添加到线程池中
    work_threads_.emplace_back(std::move(thread_ptr));
}

线程池类设计

线程池类负责创建线程池、销毁线程池以及管理线程队列、任务队列以及添加任务或者取出任务执行等操作。

类定义如下:

class ThreadPool{
public:
    explicit ThreadPool(uint32_t thread_count);
    
    // 禁止拷贝线程池
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;
    
    ~ThreadPool();
    
    bool Start();	// 启动线程池
    void Stop();	// 停止线程池
    
    // 提交任务,分别有普通任务和带返回值的任务
    template<typename F, typename... Args>
    void SubmitTask(F&& func, Args... args) {
        if (is_shutdown_.load() || !is_available_.load()) {
            return;
        }
        
        auto task = std::bind(std::forward<F>(func), std::forward<Args>(args...));
        {
            std::unique_lock<std::mutex> lock(task_mutex_);
            // 添加任务
            tasks.emplace([task](){
                task();
            });
        }
        // 唤醒一个等待任务的阻塞线程
        task_cv_.notify_one();
    }
    
    // 提交带有返回值的任务
    template<typename F, typename... Args>
    auto SubmitRetTask(F&& func, Args... args) -> std::future<std::invoke_result_t<F, Args...>> {
        auto ret_type = std::invoke_result_t<F, Args...>;
        // 检查变量判断是否还能继续
        if (is_shutdown_.load() || !is_available_.load()) {
            return std::future<ret_type>();		// 此时需要返回一个空对象
        }
        
        auto bind_task = std::bind(std::forward<F>(func), std::forward<Args>(args...));
        // 用packaged_task和shared_ptr封装异步任务
        auto task = std::make_shared<std::packaged_task<ret_type>()>(std::move(bind_task));
        // 与future绑定
        std::future<ret_type> res = task.get_future();
        
        {
            std::unique_lock<std::mutex> lock(task_mutex_);
            tasks_.emplace([task](){
                (*task)();
            });
        }
        // 唤醒等待线程
        task_cv_.notify_one();
        return res;
    }
    
    
private:
    // 增加线程函数
    void AddThread();
    
    // 通过智能指针来管理线程
    using ThreadPtr = std::shared_ptr<std::thread>;
    using Task = std::function<void()>;
    
    struct ThreadInfo{
        ThreadInfo();
        ~ThreadInfo();
        
        ThreadInfo ptr{nullptr};
    }
    
    using ThreadInfoPtr = std::shared_ptr<ThreadInfo>;
    
    std::vector<ThreadInfoPtr> works_threads_;	
    std::queue<Task> tasks_;
    std::mutex task_mutex_;
    std::condition_variable task_cv_;
    
    std::atomic<uint32_t> thread_count_;
    std::atomic<bool> is_shutdown_;
    std::atomic<bool> is_available_;
}

接口实现

构造与析构

我这里的思路是构造函数初始化一些基本的成员变量,比如thread_count_,is_shutdown_,is_available_就够了,在启动线程池时采取初始化,并且创建线程添加到线程池中,所以构造函数如下:

explicit ThreadPool::ThreadPool(uint32_t thread_count) : thread_count_(thread_count), is_shutdown_(false), is_available(false){}

析构函数和构造函数的思路类似,里面由Stop()这个接口来处理线程池的终止

ThreadPool::~ThreadPool() { Stop();}
Start() 启动线程池和 Stop()终止线程池

Start()负责启动线程池,然后循环创建线程并且添加到容器中

bool ThreadPool::Start() {
    if (!is_available_.load()) {
        is_availeable_.store(true);
        uint32_t thread_count = thread_count_.load();
        
        for (uint32_t i = 0; i < thread_count; i++) {
            AddThread();	// 由这个添加函数完成创建线程并且绑定任务添加到容器中
        }
        return true;
    }
    return false;
}

Stop()代表线程池停止接口,首先需要将所有相关的成员变量置为停止状态下对应的值,然后停止所有进程,回收所有进程,保证所有进程只join()一次

void ThreadPool::Stop() {
    if (!is_shotdown_.load()) {
        return ;
    }
    
    // 将对应的变量置为退出状态
    is_shutdown_.store(true);
    is_available_.store(false);
    
    // 通知所有线程
    task_cv_.notify_all();
    
    // 回收所有线程
    for (auto& thread_info_ptr : work_threads_) {
        if (thread_info_ptr && thread_info_ptr->ptr) {
            std::thread& t = *thread_info_ptr->ptr;
            if (t.joinable()) {
                t.join();
            }
        }
    }
    
    // 清空所有线程容器
    work_threads_.clear();
    {
        // 在线程池关闭的时候,还需要将任务队列中的所有任务pop
        std::lock_guard<std::mutex> lock(task_mutex_);
        while (!tasks_.empty()) {
            tasks_.pop();
        }
    }
}

取出任务绑定线程然后添加到线程函数 AddThread()

AddThread()这个函数主要是从任务队列中取出任务,然后将其绑定到线程,并且添加到容器中.

void ThreadPool::AddThread() {
    // 取出任务
    auto func = [this]() {
        while(true) {
            Task task;
            {
                std::unqiue_lock<std::mutex> lock(task_mutex_);
                task_cv_.wait(lock, [this](){
                   		return is_shutdown_.load() || !tasks.empty(); 
                });
                
                if (is_shutdown_.load() && tasks.empty()) {
                    return;
                }
                
                // 取出任务
                task = std::move(tasks.front());
                tasks.pop();
            }
            task();
        }
    }
    
    // 将其封装为线程
    ThreadInfoPtr thread_ptr = std::make_shared<ThreadInfo>();
    thread_ptr->ptr = std::make_shared<std::thread>(std::move(func));
    work_threads_.emplace_back(std::move(thread_ptr));
}

线程池

基本功能接口

C++11 及以后的标准中,std::packaged_taskstd::future是并发编程中用于任务封装和结果获取的重要组件,它们通常与线程配合使用,实现异步操作。

std::packaged_task

std::packaged_task:封装可调用对象为异步任务,它是一个模板类,用于封装任何可调用对象(包括函数、lambda、函数对象等),并且它还需要与std::future关联使用,当std::packaged_task被执行时,其中封装的任务也会运行,结果会存储在内部,和这个std::packaged_task关联的std::future可以进行调用。

核心功能
  1. 异步任务封装:将任务(在线程池中每个任务起始就是一个函数)打包起来,让它可以异步执行
  2. std::future绑定:因为每一个std::packaged_task()会对应一个std::future,所以在std::packaged_task执行之后,其中的任务也运行了,结果就存储在内部,等待std::future通过get_future()调用.
  3. 执行任务:可以直接通过operator()调用,也可传递给线程执行(std::thread接收std::packaged_task()当参数就行)
简单函数的示例

这里先给出一个简单的小示例,之后会结合线程池进行阐述

#include <future>
#include <thread>
#include <iostream>

int add(int a, int b) {
    return a + b;
}

int main()
{
    // 1、首先需要封装任务-->这里是封装add(),需要一个int做返回值以及两个int参数
    std::packaged_task<int(int, int)> task(add);
    
    // 2、和std::future绑定   异步任务task调用get_future进行绑定
    std::future<int> f = task.get_future();
    
    // 3、在一个线程中执行这个封装好的异步任务
    std::thread t(std::move(task), 10, 20);
    // 这里需要强调一下,封装后的异步任务不可复制,只能进行移动,所以再传给线程做参数时,只能使用std::move()
    
    // 因为这里是异步执行的,所以主线程可以执行其他任务
    
    // 现在获取这个线程中执行的结果
    // future() 具有唯一性,使用get()获取一次之后,就不能获取第二次了,如果结果没有就绪就阻塞等待结果就绪
    int res = f.get();	
    
    // 现在就可以打印获得的这个结果了
    std::cout << "res = " << res << std::endl;
    
    // 在创建线程之后,必须 join或者detach,否则就会出现错误
    t.join();
    return 0;
}



那么接下来是关于std::future获取结果

std::future

用于获取异步操作(线程、任务)执行的结果,可以理解为一种类似于“未来结果的占位符”,因为你启动一个异步线程时,可能无法立即得到结果,但是可以使用std::future对象在未来某个时刻获取结果。

核心功能
  1. 可以通过get()方法获取异步操作得到的结果(返回值),如果在调用get()时,异步操作还未完成,那么就会阻塞当前线程等待有结果产生。
  2. 可以通过valid()判断future是否与一个有效的异步操作关联成功,可以通过wait()阻塞等待结果,也可以通过wait_for()wait_until()等待指定时长之后返回状态。

上方已经给出了示例用法,都是一样的,这里就不给了,待会直接上线程池相关的示例。

任务入队操作

当有新任务到来时,任务会被添加到任务队列中,这个过程中,需要先获取互斥锁,保证任务队列的线程安全,添加任务后,通过条件变量通知等待的线程有新任务到来。我这里将任务划分成了不带返回值的普通任务和带返回值的任务,其中带返回值的任务使用异步封包的方式进行封装,分别如下:

带返回值的异步任务提交到任务队

步骤:

  1. 通过std::bind()std::make_shared()创建一个包装了任务的std::package_task
  2. 获取其对应的std::future用于获取任务执行结果
  3. 在临界区内(加锁)将任务添加到任务队列tasks
  4. 通知一个等待的线程有新任务

以下是线程池提交带有返回值的任务的示例过程

template<typename F, typename... Args>
auto SubmitRetTask(F&& func, Args... args) -> std::future<std::invoke_result_t<F, Args...>> {
    // 首先定义返回类型
    auto ret_type = std::invoke_result_t<F, Args...>;
    // 状态判断
    if (is_shuntdown_.load() || !is_available_.load()) {
        // 返回一个控制
        return std::future<ret_type>();
    }
    
    // 开始封装异步任务
    auto bind_task = std::bind(std::forward<F>(func), std::forward<Args>(args...));
    auto task = std::make_shared<std::packaged_task<ret_type>()>(std::move(bind_task));
    std::future<ret_type> res = task.get_future();
    
    {
        // 在临界区加锁,将任务添加到任务队列中
        std::lock_guard<std::mutex> lock(task_mutex_);
        tasks_.emplace([task](){
            (*task)();
        });
    }
    task_cv_.notify_one();
    return res;
}

// 用到的成员变量
std::queue<std::function<void()> tasks_;	// 任务队列
std::atomic<bool> is_shutdown_;			// 线程是否关闭
std::atomic<bool> is_available_;		// 线程池是否还有效

std::mutex task_mutex_;					// 任务锁
std::condition_variable task_cv_;		// 条件变量,用于阻塞任务

不带返回值的普通任务

template<typename F, typename... Args>
void SubmitTask(F&& func, Args... args) {
    // 终止条件
    if (is_shutdown_.load() || !is_available_.load()) {
        return;
    }
    
    // 封装任务
    auto task = std::bind(std::forward<F>(func), std::forward<Args>(args...));
    {
        std::lock_guard<std::mutex> lock(task_mutex_);
        tasks.emplace([task](){
            task();		// 调用对应的任务
        });
    }
    
    // 唤醒一个阻塞中的线程
    task_cv_.notify_one();
}

所以可以看出,起始线程池中任务的提交过程整体思路都是一致的,只是有返回值的提交上,添加了std::packaged_taskstd::future来做异步任务的封装而已。

工作线程取出任务执行过程

在工作线程开启之后,需要去任务队列中取出任务然后执行。主要的过程是,获取互斥锁保证资源的互斥访问,然后检查任务队列是否为空,如果为空,就需要通过条件变量阻塞,等待任务添加进来。获取到任务之后就会执行任务,执行完毕马上继续获取任务,除非线程池停止并且任务队列为空。

主要的过程如下:

  1. 由于每次都会取出一个任务task,每个任务都是一个函数std::function<void()>
  2. 无限循环,一直访问任务队列,直到线程池停止,然后任务队列为空
  3. 取出任务队列中的任务,执行

我的取出任务的接口函数

成员变量信息

using ThreadPtr = std::shared_ptr<std::thread>;
using Task = std::function<void()>;

// 一个线程信息结构体,包含管理线程的智能指针
struct ThreadInfo {
    ThreadInfo();
    ~ThreadInfo();
    
    ThreadPtr ptr{nullptr};
}

// 每一个线程的信息都是有一个智能指针来管理
using ThreadInfoPtr = std::shared_ptr<ThreadInfo>;

// 线程数组
std::vector<ThreadInfoPtr> work_threads_;

添加线程函数

void ThreadPool::AddThread() {
    // 先从任务队列中取出一个任务
    auto func = [this]() {
        while (true) {
            Task task;
            {
                // 首先获取互斥锁
                std::unique_lock<std::mutex> lock(task_mutex_);
                // 通过条件变量等待条件满足
                task_cv_.wait(lock, [this](){
                    return is_shutdown_.load() || !tasks.empty();
                });
                
                if (is_shutdown_.load() && tasks.empty()) {
                    return;
                }
                
                // 取出任务
                task = std::move(tasks.front());
                tasks.pop();
            }
            task();
        }
    };
    
    // 将取出来的任务封装到线程中添加到线程池
    ThreadInfoPtr thread_ptr = std::shared_ptr<std::thread>();
    thread_ptr->ptr = std::make_shared<ThreadInfo>(std::move(func));
    // 添加到线程池中
    work_threads_.emplace_back(std::move(thread_ptr));
}

线程池类设计

线程池类负责创建线程池、销毁线程池以及管理线程队列、任务队列以及添加任务或者取出任务执行等操作。

类定义如下:

class ThreadPool{
public:
    explicit ThreadPool(uint32_t thread_count);
    
    // 禁止拷贝线程池
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;
    
    ~ThreadPool();
    
    bool Start();	// 启动线程池
    void Stop();	// 停止线程池
    
    // 提交任务,分别有普通任务和带返回值的任务
    template<typename F, typename... Args>
    void SubmitTask(F&& func, Args... args) {
        if (is_shutdown_.load() || !is_available_.load()) {
            return;
        }
        
        auto task = std::bind(std::forward<F>(func), std::forward<Args>(args...));
        {
            std::unique_lock<std::mutex> lock(task_mutex_);
            // 添加任务
            tasks.emplace([task](){
                task();
            });
        }
        // 唤醒一个等待任务的阻塞线程
        task_cv_.notify_one();
    }
    
    // 提交带有返回值的任务
    template<typename F, typename... Args>
    auto SubmitRetTask(F&& func, Args... args) -> std::future<std::invoke_result_t<F, Args...>> {
        auto ret_type = std::invoke_result_t<F, Args...>;
        // 检查变量判断是否还能继续
        if (is_shutdown_.load() || !is_available_.load()) {
            return std::future<ret_type>();		// 此时需要返回一个空对象
        }
        
        auto bind_task = std::bind(std::forward<F>(func), std::forward<Args>(args...));
        // 用packaged_task和shared_ptr封装异步任务
        auto task = std::make_shared<std::packaged_task<ret_type>()>(std::move(bind_task));
        // 与future绑定
        std::future<ret_type> res = task.get_future();
        
        {
            std::unique_lock<std::mutex> lock(task_mutex_);
            tasks_.emplace([task](){
                (*task)();
            });
        }
        // 唤醒等待线程
        task_cv_.notify_one();
        return res;
    }
    
    
private:
    // 增加线程函数
    void AddThread();
    
    // 通过智能指针来管理线程
    using ThreadPtr = std::shared_ptr<std::thread>;
    using Task = std::function<void()>;
    
    struct ThreadInfo{
        ThreadInfo();
        ~ThreadInfo();
        
        ThreadInfo ptr{nullptr};
    }
    
    using ThreadInfoPtr = std::shared_ptr<ThreadInfo>;
    
    std::vector<ThreadInfoPtr> works_threads_;	
    std::queue<Task> tasks_;
    std::mutex task_mutex_;
    std::condition_variable task_cv_;
    
    std::atomic<uint32_t> thread_count_;
    std::atomic<bool> is_shutdown_;
    std::atomic<bool> is_available_;
}

接口实现

构造与析构

我这里的思路是构造函数初始化一些基本的成员变量,比如thread_count_,is_shutdown_,is_available_就够了,在启动线程池时采取初始化,并且创建线程添加到线程池中,所以构造函数如下:

explicit ThreadPool::ThreadPool(uint32_t thread_count) : thread_count_(thread_count), is_shutdown_(false), is_available(false){}

析构函数和构造函数的思路类似,里面由Stop()这个接口来处理线程池的终止

ThreadPool::~ThreadPool() { Stop();}
Start() 启动线程池和 Stop()终止线程池

Start()负责启动线程池,然后循环创建线程并且添加到容器中

bool ThreadPool::Start() {
    if (!is_available_.load()) {
        is_availeable_.store(true);
        uint32_t thread_count = thread_count_.load();
        
        for (uint32_t i = 0; i < thread_count; i++) {
            AddThread();	// 由这个添加函数完成创建线程并且绑定任务添加到容器中
        }
        return true;
    }
    return false;
}

Stop()代表线程池停止接口,首先需要将所有相关的成员变量置为停止状态下对应的值,然后停止所有进程,回收所有进程,保证所有进程只join()一次

void ThreadPool::Stop() {
    if (!is_shotdown_.load()) {
        return ;
    }
    
    // 将对应的变量置为退出状态
    is_shutdown_.store(true);
    is_available_.store(false);
    
    // 通知所有线程
    task_cv_.notify_all();
    
    // 回收所有线程
    for (auto& thread_info_ptr : work_threads_) {
        if (thread_info_ptr && thread_info_ptr->ptr) {
            std::thread& t = *thread_info_ptr->ptr;
            if (t.joinable()) {
                t.join();
            }
        }
    }
    
    // 清空所有线程容器
    work_threads_.clear();
    {
        // 在线程池关闭的时候,还需要将任务队列中的所有任务pop
        std::lock_guard<std::mutex> lock(task_mutex_);
        while (!tasks_.empty()) {
            tasks_.pop();
        }
    }
}

取出任务绑定线程然后添加到线程函数 AddThread()

AddThread()这个函数主要是从任务队列中取出任务,然后将其绑定到线程,并且添加到容器中.

void ThreadPool::AddThread() {
    // 取出任务
    auto func = [this]() {
        while(true) {
            Task task;
            {
                std::unqiue_lock<std::mutex> lock(task_mutex_);
                task_cv_.wait(lock, [this](){
                   		return is_shutdown_.load() || !tasks.empty(); 
                });
                
                if (is_shutdown_.load() && tasks.empty()) {
                    return;
                }
                
                // 取出任务
                task = std::move(tasks.front());
                tasks.pop();
            }
            task();
        }
    }
    
    // 将其封装为线程
    ThreadInfoPtr thread_ptr = std::make_shared<ThreadInfo>();
    thread_ptr->ptr = std::make_shared<std::thread>(std::move(func));
    work_threads_.emplace_back(std::move(thread_ptr));
}

最后,希望自己继续加油,学无止境,还请各位大佬海涵,如有错误请直接指出,我一定会及时修改。如果侵权,请联系我删除~


网站公告

今日签到

点亮在社区的每一天
去签到