目录
使用 std::packaged_task 和 std::future 配合
使用 std::promise 和 std::future 配合
std::future
介绍
std::future 是 C++11 标准库中的一个模板类,它表示一个异步操作的结果。当我们在多线程编程中使用异步任务时,std::future 可以帮助我们在需要的时候获取任务的执行结果。std::future 的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保我们在获取结果时不会遇到未完成的操作。
应用场景
- 异步任务: 当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任务等,std::future 可以用来表示这些异步任务的结果。通过将任务与主线程分离,我们可以实现任务的并行处理,从而提高程序的执行效率。
- 并发控制: 在多线程编程中,我们可能需要等待某些任务完成后才能继续执行其他操作。通过使用 std::future,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执行后续操作。
- 结果获取:std::future 提供了一种安全的方式来获取异步任务的结果。我们可以使用 std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样,在调用 get()函数时,我们可以确保已经获取到了所需的结果。
用法示例
使用 std::async 关联异步任务
std::async 是一种将任务与 std::future 关联的简单方法。它创建并运行一个异步任务,并返回一个与该任务结果关联的 std::future 对象。默认情况下,std::async 是否启动一个新线程,或者在等待 future 时,任务是否同步运行都取决于你给的 参数。这个参数为 std::launch 类型:
- ○ std::launch::deferred 表明该函数会被延迟调用,直到在 future 上调用 get()或者 wait()才会开始执行任务。
- std::launch::async 表明函数会在自己创建的线程上运行。
- std::launch::deferred | std::launch::async 内部通过系统等条件自动选择策略。
deferred策略:
#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>
int add(int num1, int num2)
{
std::cout << "加法:" << num1+num2 << std::endl;
return num1+num2;
}
int main()
{
std::cout << "------------------------1---------------------------" << std::endl;
// std::launch::deferred: 调用get才会执行加法函数
std::future<int> result = std::async(std::launch::deferred, add, 11, 22);
// std::future<int> result = std::async(std::launch::async, add, 11, 22);
sleep(1);
std::cout << "------------------------2---------------------------" << std::endl;
int sum = result.get();
std::cout << sum << std::endl;
return 0;
}
运行结果:
async策略:
#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>
int add(int num1, int num2)
{
std::cout << "加法:" << num1+num2 << std::endl;
return num1+num2;
}
int main()
{
std::cout << "------------------------1---------------------------" << std::endl;
// std::launch::deferred: 调用get才会执行加法函数
// std::future<int> result = std::async(std::launch::deferred, add, 11, 22);
std::future<int> result = std::async(std::launch::async, add, 11, 22);
sleep(1);
std::cout << "------------------------2---------------------------" << std::endl;
int sum = result.get();
std::cout << sum << std::endl;
return 0;
}
运行结果:
使用 std::packaged_task 和 std::future 配合
std::packaged_task 就是将任务和 std::future 绑定在一起的模板,是一种对任务的封装。我们可以通过 std::packaged_task 对象获取任务相关联的 std::future 对象,通过调用 get_future()方法获得。std::packaged_task 的模板参数是函数签名。
可以把 std::future 和 std::async 看成是分开的, 而 std::packaged_task 则是一个整体。
演示demo:
#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>
#include <memory>
int add(int num1, int num2)
{
sleep(1);
return num1+num2;
}
int main()
{
std::packaged_task<int(int, int)> pt(add);
// pt 可以看做是一个可调用对象, 但不能作为一个完全的函数来使用
std::future<int> fu = pt.get_future();
pt(11, 22);
int result = fu.get();
std::cout << result << std::endl;
return 0;
}
运行结果:
异步执行 std::packaged_task 任务:
#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>
#include <memory>
int add(int num1, int num2)
{
sleep(1);
return num1+num2;
}
int main()
{
// pt虽然重载了()运算符,但pt并不是一个函数,所以导致它作为线程的入口函数时,语法上看没有问题,但是实际编译的时候会报错
// 而 packaged_task 禁止了拷贝构造,
// 且因为每个 packaged_task 所封装的函数签名都有可能不同,因此也无法当作参数一样传递
// 传引用不可取,毕竟任务在多线程下执行存在局部变量声明周期的问题,因此不能传引用
// 因此想要将一个 packaged_task 进行异步调用,
// 简单方法就只能是 new packaged_task,封装函数传地址进行解引用调用
// 而类型不同的问题,在使用的时候可以使用类型推导来解决
// 将pt定义为一个智能指针
auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(add);
std::future<int> fu = ptask->get_future();
std::thread th([ptask](){
(*ptask)(11, 22);
});
int result = fu.get();
std::cout << result << std::endl;
th.join();
return 0;
}
运行结果:
使用 std::promise 和 std::future 配合
std::promise 提供了一种设置值的方式,它可以在设置之后通过相关联的 std::future 对象进行读取。换种说法就是之前说过 std::future 可以读取一个异步函数的返回值了, 但是要等待就绪,而 std::promise 就提供一种 方式手动让 std::future 就绪。
#include <iostream>
#include <thread>
#include <future>
#include <unistd.h>
void add(int num1, int num2, std::promise<int> &prom)
{
sleep(1);
prom.set_value(num1+num2);
return;
}
int main()
{
std::promise<int> prom;
std::future<int> fu = prom.get_future();
std::thread th(add, 11, 22, std::ref(prom));
int result = fu.get();
std::cout << result << std::endl;
th.join();
return 0;
}
运行结果:
c++11 线程池实现
基于线程池执行任务的时候,入口函数内部执行逻辑是固定的,因此选择std::packaged_task 加上std::future 的组合来实现。
线程池的工作思想:
用户传入要执行的函数,以及需要处理的数据(函数的参数),由线程池中的
工作线程来执行函数完成任务
实现:
- 管理的成员
- 任务池:用 vector 维护的一个函数任务池子▪ 互斥锁 & 条件变量: 实现同步互斥
- 一定数量的工作线程:用于不断从任务池取出任务执行任务▪ 结束运行标志:以便于控制线程池的结束。
- 管理的操作:
- 入队任务:入队一个函数和参数
- 停止运行:终止线程池
#include <iostream>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
#include <memory>
#include <vector>
#include <atomic>
class ThreadPool
{
public:
using Func = std::function<void(void)>;
ThreadPool(int thread_num = 1):_stop(false)
{
for (int i = 0; i < thread_num; i++)
{
_threads.emplace_back(&ThreadPool::entry, this);
}
}
~ThreadPool()
{
stop();
}
template <class F, class ...Args>
auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>
{
using return_type = decltype(func(args...));
// 由于不确定参数,就先绑定进函数
auto Func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
// 利用智能指针管理一个new出来的packaged_task对象
auto task = std::make_shared<std::packaged_task<return_type()>>(Func);
std::future<return_type> fu = task->get_future();
{
// 推入任务池
std::unique_lock<std::mutex> lock(_mutex);
_tasks.emplace_back([task](){
(*task)();
});
_cv.notify_one();
}
return fu;
}
void stop()
{
if(_stop) return;
_stop = true;
_cv.notify_all();
for (auto &thread : _threads)
{
thread.join();
}
}
private:
void entry()
{
while(!_stop)
{
std::vector<Func> tmp_tasks;
{
// 加锁
std::unique_lock<std::mutex> lock(_mutex);
// 等待任务池不为空或线程池停止
_cv.wait(lock, [this](){
return !_tasks.empty() || _stop;
});
// 取出任务
tmp_tasks.swap(_tasks);
}
// 执行任务
for (auto &task : tmp_tasks)
{
task();
}
}
}
private:
std::mutex _mutex;
std::condition_variable _cv;
std::vector<std::thread> _threads;
std::vector<Func> _tasks;
std::atomic<bool> _stop;
};