C++ - 仿 RabbitMQ 实现消息队列--C++11 异步操作实现线程池

发布于:2025-07-18 ⋅ 阅读:(12) ⋅ 点赞:(0)

目录

std::future

介绍

应用场景

 用法示例

使用 std::async 关联异步任务

 使用 std::packaged_task 和 std::future 配合

 使用 std::promise 和 std::future 配合

c++11 线程池实现


std::future

介绍

         std::future 是 C++11 标准库中的一个模板类,它表示一个异步操作的结果。当我们在多线程编程中使用异步任务时,std::future 可以帮助我们在需要的时候获取任务的执行结果。std::future 的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保我们在获取结果时不会遇到未完成的操作。  

应用场景

  1. 异步任务: 当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任务等,std::future 可以用来表示这些异步任务的结果。通过将任务与主线程分离,我们可以实现任务的并行处理,从而提高程序的执行效率。
  2. 并发控制: 在多线程编程中,我们可能需要等待某些任务完成后才能继续执行其他操作。通过使用 std::future,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执行后续操作。
  3. 结果获取:std::future 提供了一种安全的方式来获取异步任务的结果。我们可以使用 std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样,在调用 get()函数时,我们可以确保已经获取到了所需的结果。

 用法示例

使用 std::async 关联异步任务

        std::async 是一种将任务与 std::future 关联的简单方法。它创建并运行一个异步任务,并返回一个与该任务结果关联的 std::future 对象。默认情况下,std::async 是否启动一个新线程,或者在等待 future 时,任务是否同步运行都取决于你给的 参数。这个参数为 std::launch 类型:

  1. ○ std::launch::deferred 表明该函数会被延迟调用,直到在 future 上调用 get()或者 wait()才会开始执行任务。
  2. std::launch::async 表明函数会在自己创建的线程上运行。
  3. std::launch::deferred | std::launch::async 内部通过系统等条件自动选择策略。

deferred策略:

#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>

int add(int num1, int num2)
{
    std::cout << "加法:" << num1+num2 << std::endl;
    return num1+num2;
}

int main()
{
    std::cout << "------------------------1---------------------------" << std::endl;
    // std::launch::deferred: 调用get才会执行加法函数
    std::future<int> result = std::async(std::launch::deferred, add, 11, 22);
    // std::future<int> result = std::async(std::launch::async, add, 11, 22);
    sleep(1);
    std::cout << "------------------------2---------------------------" << std::endl;
    int sum = result.get(); 
    std::cout << sum << std::endl;
    return 0;
}

运行结果:

 async策略:

#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>

int add(int num1, int num2)
{
    std::cout << "加法:" << num1+num2 << std::endl;
    return num1+num2;
}

int main()
{
    std::cout << "------------------------1---------------------------" << std::endl;
    // std::launch::deferred: 调用get才会执行加法函数
    // std::future<int> result = std::async(std::launch::deferred, add, 11, 22);
    std::future<int> result = std::async(std::launch::async, add, 11, 22);
    sleep(1);
    std::cout << "------------------------2---------------------------" << std::endl;
    int sum = result.get(); 
    std::cout << sum << std::endl;
    return 0;
}

运行结果:

 使用 std::packaged_task 和 std::future 配合

        std::packaged_task 就是将任务和 std::future 绑定在一起的模板,是一种对任务的封装。我们可以通过 std::packaged_task 对象获取任务相关联的 std::future 对象,通过调用 get_future()方法获得。std::packaged_task 的模板参数是函数签名。

        可以把 std::future 和 std::async 看成是分开的, 而 std::packaged_task 则是一个整体。

演示demo:

#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>
#include <memory>

int add(int num1, int num2)
{
    sleep(1);
    return num1+num2;
}

int main()
{
    std::packaged_task<int(int, int)> pt(add);
    // pt 可以看做是一个可调用对象, 但不能作为一个完全的函数来使用
    std::future<int> fu = pt.get_future();
    pt(11, 22);
    int result = fu.get();
    std::cout << result << std::endl;
    return 0;
}

运行结果:

异步执行 std::packaged_task 任务:

#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>
#include <memory>

int add(int num1, int num2)
{
    sleep(1);
    return num1+num2;
}

int main()
{
    // pt虽然重载了()运算符,但pt并不是一个函数,所以导致它作为线程的入口函数时,语法上看没有问题,但是实际编译的时候会报错
    // 而 packaged_task 禁止了拷贝构造,
    // 且因为每个 packaged_task 所封装的函数签名都有可能不同,因此也无法当作参数一样传递
    // 传引用不可取,毕竟任务在多线程下执行存在局部变量声明周期的问题,因此不能传引用
    // 因此想要将一个 packaged_task 进行异步调用,
    // 简单方法就只能是 new packaged_task,封装函数传地址进行解引用调用
    // 而类型不同的问题,在使用的时候可以使用类型推导来解决
    
    // 将pt定义为一个智能指针
    auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(add);
    std::future<int> fu = ptask->get_future();
    std::thread th([ptask](){
        (*ptask)(11, 22);
    });
    
    int result = fu.get();
    std::cout << result << std::endl;

    th.join();
    return 0;
}

运行结果:

 使用 std::promise 和 std::future 配合

        std::promise 提供了一种设置值的方式,它可以在设置之后通过相关联的 std::future 对象进行读取。换种说法就是之前说过 std::future 可以读取一个异步函数的返回值了, 但是要等待就绪,而 std::promise 就提供一种 方式手动让 std::future 就绪。

#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>

void add(int num1, int num2, std::promise<int> &prom)
{
    sleep(1);
    prom.set_value(num1+num2);
    return;
}


int main()
{
    std::promise<int> prom;
    std::future<int> fu = prom.get_future();

    std::thread th(add, 11, 22, std::ref(prom));

    int result = fu.get();
    std::cout << result << std::endl;
    
    th.join();
    return 0;
}

运行结果:

c++11 线程池实现

        基于线程池执行任务的时候,入口函数内部执行逻辑是固定的,因此选择std::packaged_task 加上std::future 的组合来实现。

线程池的工作思想:
        用户传入要执行的函数,以及需要处理的数据(函数的参数),由线程池中的
工作线程来执行函数完成任务

实现:

  • 管理的成员
  1. 任务池:用 vector 维护的一个函数任务池子▪ 互斥锁 & 条件变量: 实现同步互斥
  2. 一定数量的工作线程:用于不断从任务池取出任务执行任务▪ 结束运行标志:以便于控制线程池的结束。
  • 管理的操作:
  1. 入队任务:入队一个函数和参数
  2. 停止运行:终止线程池
#include <iostream>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
#include <memory>
#include <vector>
#include <atomic>

class ThreadPool
{
public:
    using Func = std::function<void(void)>;

    ThreadPool(int thread_num = 1):_stop(false)
    {
        for (int i = 0; i < thread_num; i++)
        {
            _threads.emplace_back(&ThreadPool::entry, this);
        }
    }

    ~ThreadPool()
    {
        stop();
    }

    template <class F, class ...Args>
    auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>
    {
        using return_type = decltype(func(args...));
        // 由于不确定参数,就先绑定进函数
        auto Func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
        // 利用智能指针管理一个new出来的packaged_task对象
        auto task = std::make_shared<std::packaged_task<return_type()>>(Func);

        std::future<return_type> fu = task->get_future();
        {
            // 推入任务池
            std::unique_lock<std::mutex> lock(_mutex);
            _tasks.emplace_back([task](){
                (*task)();
            });
            _cv.notify_one();
        }

        return fu;
    }

    void stop() 
    {
        if(_stop) return;
        _stop = true;
        _cv.notify_all();
        for (auto &thread : _threads)
        {
            thread.join();
        }
    }

private:
    void entry()
    {
        while(!_stop)
        {
            std::vector<Func> tmp_tasks;
            {
                // 加锁
                std::unique_lock<std::mutex> lock(_mutex);
                // 等待任务池不为空或线程池停止
                _cv.wait(lock, [this](){
                    return !_tasks.empty() || _stop;
                });
                // 取出任务
                tmp_tasks.swap(_tasks);
            }
            // 执行任务
            for (auto &task : tmp_tasks)
            {
                task();
            }
        }
    }

private:
    std::mutex _mutex;
    std::condition_variable _cv;
    std::vector<std::thread> _threads;
    std::vector<Func> _tasks;
    std::atomic<bool> _stop;
};


网站公告

今日签到

点亮在社区的每一天
去签到