【c++】线程池的原理及实现

发布于:2024-05-10 ⋅ 阅读:(27) ⋅ 点赞:(0)
在这里插入图片描述


📄前言

不知道各位是否有试过点进限时抽奖网站、抢票网站呢?你是否好奇过一个网站、游戏是如何实现数十万、百万用户一起进行访问呢? 其实这类软件的背后的服务器总是离不开一个叫做线程池的设计,而这就是本文将讲解的内容,学习如何设计线程池,是每个后端程序猿的必修课之一。

线程池的原理

概念

当我们在一台多核CPU机器上使用多线程,无疑能够提高程序的性能,但不是每台机器的CPU都是“线程撕裂者”,我们无法给每个任务都分配一条线程,过度分配线程只会让程序性能下降。为此,我们需要给线程与需要执行的任务进行集中管理,减少线程创建和销毁所造成的开销,而这就是线程池。

线程池也是池化技术的一种,即将资源提前分配好,集中管理,当需要使用的时候在去使用,从而提高程序的效率。

  • 线程池的优点:

    1. 提高程序可维护性:线程池作为一个模块,降低了代码的耦合性,使得程序的可维护性得到提高
    2. 减少系统开销:避免频繁地创建和销毁资源,减少了系统的开销,尤其是在高负载情况下更能体现出其效率。
    3. 提高性能:通过并发执行任务,从而提高程序的性能。

工作原理

线程池最基础的三个结构组成就是任务队列工作线程线程管理。其中线程池中的任务队列负责将用户的任务打包成合适的格式,等待工作线程来取走工作线程负责任务的并发执行,而线程管理则用于管理线程的创建、消亡。

其实,线程池也可以看作是一种消费者生产者模型

  • 生产者:用户负责生成任务并将它们交给任务队列。可以将这看作为发送一个资源请求。
  • 消费者:线程池负责消费者的角色,用于处理将任务队列中的任务(资源)。

请添加图片描述

介绍完了线程池的工作原理,那就简单的介绍下工作流程吧。

工作流程:

+----------------+       +---------------------+       +-------------------+       +-------------------+
| 线程池初始化     |       | 添加任务到任务队列    |       | 线程从队列中取任务  |       | 执行任务并返回线程池  |
| 创建线程         |  -->  | 所有任务入队列        |  -->  | 自动取出并执行任务   |  -->  | 线程等待下一任务       |
+----------------+       +---------------------+       +-------------------+       +-------------------+

线程池的实现

要实现一个不那么塑料的线程池,就得要对C++的包装器、异步函数工具、完美转发有所了解,如果你对它们不太了解,可以去观看我的上一篇文章

你知道如何快速实现一个简单、高效的线程池吗?那就是上github搜索线程池,然后找到一个最多人收藏的(✓)。本篇文章讲解的线程池是基于github上大佬的线程池 所修改的。如果要提高编程实力,请务必去多上github观看源码。

线程池的基础结构

这个线程池只实现了线程池所必备的三个功能,**任务队列、工作线程、线程管理。**这个线程池的实现不过百行,对于初学者来说比较友好,没有必要把焦点分散到其他地方,只需要集中于了解线程池本身的实现原理。

class ThreadPool {
public:
    static ThreadPool* getInstance(size_t n = 0);   //单例模式实现

    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) //将任务推入队列
    -> std::future<typename std::invoke_result_t<F, Args...>>;	//c++17 invoke_result_t用于获取推导的函数返回值

    template<class F>   // 无参函数版本
    auto enqueue(F&& f)
    -> std::future<typename std::invoke_result_t<F>>;

    ~ThreadPool();

    ThreadPool(const ThreadPool&) = delete;	//防止拷贝
    ThreadPool& operator=(const ThreadPool&) = delete;
private:
    ThreadPool(size_t);

private:
    std::once_flag _flag;   //用于初始化函数

    std::vector< std::thread > _workers;    // 工作线程
    std::queue< std::function<void()> > _tasks; // 任务队列
    std::mutex _queue_mutex;    // 用于保护任务队列的互斥锁
    std::condition_variable _condition; // 用于同步操作
    std::atomic<bool> _stop;
};

任务队列的实现

任务队列的难点在与如何理解如何包装任务的,以及智能指针的理解。但抛开C++中这些繁杂的部分,其实也就是往队列中发送任务。

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) //将任务推入队列
-> std::future<std::invoke_result_t<F, Args...>>	//返回一个期待,用于获取函数运行结果/异常结果
{
    using result_type = std::invoke_result_t<F, Args...>;

    auto task = std::make_shared< std::packaged_task<result_type()> > (
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
            );  // 包装任务成为异步任务。使用shared_ptr来动态管理存亡。

    std::future<result_type> res = task->get_future();  //期待绑定task
    {
        std::lock_guard<std::mutex> lock(_queue_mutex); // 保护任务队列,
        if(_stop)   throw std::runtime_error("在停止的线程池中加入任务");
        _tasks.emplace([task] { (*task)(); });  //再次将任务包装成 function<void()>。
    }
    _condition.notify_one();    //通知线程有任务可以消费了
    return res;
}

// 尝试自己去实现无参函数版本

工作线程的实现

工作线程的实现相比任务队列而言较为简单,只要确保往任务队列中取数据是线程安全,以及正确地结束线程即可。

static ThreadPool* getInstance(size_t n = 0)   //单例模式实现
{   //c++11 的call_once,可用于保证线程安全地初始化单例对象,从而杜绝双重检查语句。
    std::call_once(_flag, [n] { _instance = new ThreadPool(n); });
    return _instance;
}

ThreadPool(size_t n)
{
    for(int i = 0; i < n; ++i)
    {
        _workers.emplace_back
        (
            [this]
            {
                for(;;)
                {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(_queue_mutex);    //保护任务队列
                        //当队列为空时,等待新任务
                        _condition.wait(lock, [this]{ return _stop || !_tasks.empty(); });
                        if(_stop && _tasks.empty()) return; // 线程池停止,且没有任务,则退出
                        task = std::move(_tasks.front()); // 从队列获取任务
                        _tasks.pop();
                    }
                    task(); // 并发执行任务
                }
            }
        );
    }
}

~ThreadPool()
{
    _stop.store(true);	
    for(auto& it : _workers)	//回收线程资源
        it.join(); 	
}

std::once_flag ThreadPool::_flag;	//静态成员要在类外初始化
ThreadPool* ThreadPool::_instance = nullptr;

线程池的应用与拓展

线程池的设计就至此结束了,让我们来简单使用下做好的线程池吧。

int test(int num1, int num2)
{
    return num1 + num2;
}

int main() {
    httplib::Server server;
    ThreadPool* pool = ThreadPool::getInstance(5);
    std::vector<std::future<int>> results(10);

    for (int i = 0; i < 10; i++) {
        results[i] = pool->enqueue(test, i, i + 1);
    }

    for(auto& it : results)
        std::cout << it.get() << std::endl;

    return 0;
}

线程池的拓展

这个线程池只是一个非常基础的线程池实现,实际的线程池会比这复杂得多,如下为线程池常见的拓展方式:

  1. 实现动态调整线程数:针对不同的场景线程池能够自动释放、创建线程。
  2. 优先级任务队列:为了确保紧急任务能够快速执行,可以定义一个优先级任务队列。
  3. 线程池任务异常处理:确保单个任务异常不会影响整个程序。

📓总结

掌握如何编写一个高效、安全的线程池对一个后端程序员来讲就像是西方不能没有耶路撒冷,毕竟如果要开发一个高并发需求的程序,例如抢票网站,服务器速度更不上、或者出现了线程安全,到时候用户就要寄律师函了。

📜博客主页:主页

📫我的专栏:C++

📱我的github:github


网站公告

今日签到

点亮在社区的每一天
去签到