实现代码
#include <vector>
#include <thread>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
class ThreadPool {
public:
ThreadPool(size_t threads) : stop(false) {
for(size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
for(;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
使用示例
#include <iostream>
#include <chrono>
int main() {
ThreadPool pool(4);
// 提交多个任务到线程池
std::vector<std::future<int>> results;
for(int i = 0; i < 8; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::cout << "hello " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i*i;
})
);
}
// 获取结果
for(auto && result: results)
std::cout << result.get() << ' ';
std::cout << std::endl;
return 0;
}
解析
ThreadPool 类定义
class ThreadPool {
public:
ThreadPool(size_t threads); // 构造函数,指定线程数量
~ThreadPool(); // 析构函数
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
private:
std::vector<std::thread> workers; // 工作线程集合
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queue_mutex; // 任务队列互斥锁
std::condition_variable condition; // 条件变量
bool stop; // 停止标志
};
构造函数解析
ThreadPool(size_t threads) : stop(false) {
for(size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
// 线程工作函数
for(;;) {
std::function<void()> task;
{
// 获取队列锁
std::unique_lock<std::mutex> lock(this->queue_mutex);
// 等待条件满足:停止或任务队列非空
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
// 如果停止且任务为空,线程退出
if(this->stop && this->tasks.empty())
return;
// 获取任务
task = std::move(this->tasks.front());
this->tasks.pop();
}
// 执行任务(在锁外执行,避免锁持有时间过长)
task();
}
});
}
}
stop(false)
- 初始化停止标志为falseworkers.emplace_back
- 创建并启动工作线程for(;;)
- 线程无限循环,等待任务std::unique_lock<std::mutex>
- 获取队列锁
条件:condition.wait
- 等待条件变量通知,防止忙等待stop || !tasks.empty()
(停止或有任务)检查是否应该退出线程
从队列获取任务并移出队列
在锁外执行任务
enqueue 方法解析
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
// 获取返回类型
using return_type = typename std::result_of<F(Args...)>::type;
// 创建packaged_task包装可调用对象
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
// 获取future以便获取结果
std::future<return_type> res = task->get_future();
{
// 获取队列锁
std::unique_lock<std::mutex> lock(queue_mutex);
// 如果线程池已停止,抛出异常
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
// 将任务添加到队列
tasks.emplace([task](){ (*task)(); });
}
// 通知一个等待线程有新任务
condition.notify_one();
return res;
}
模板参数:
F
- 可调用对象类型Args
- 参数类型包
返回类型推导:
std::future<typename std::result_of<F(Args...)>::type>
std::packaged_task
- 包装可调用对象,可以获取futurestd::bind
- 绑定参数std::forward
- 完美转发参数task->get_future()
- 获取与任务关联的future锁保护下的队列操作
condition.notify_one()
- 通知一个等待线程
析构函数解析
~ThreadPool() {
{
// 获取锁并设置停止标志
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
// 通知所有线程
condition.notify_all();
// 等待所有线程完成
for(std::thread &worker: workers)
worker.join();
}
设置停止标志
stop = true
condition.notify_all()
- 唤醒所有等待线程worker.join()
- 等待所有线程结束
动态调整线程数量
void resize(size_t new_size) {
if (new_size < workers.size()) {
// 减少线程数量
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers)
worker.join();
workers.clear();
stop = false;
for (size_t i = 0; i < new_size; ++i) {
workers.emplace_back([this] {
// 线程工作函数
});
}
} else if (new_size > workers.size()) {
// 增加线程数量
for (size_t i = workers.size(); i < new_size; ++i) {
workers.emplace_back([this] {
// 线程工作函数
});
}
}
}
任务优先级
#include <queue>
// 修改任务队列定义
struct Task {
std::function<void()> func;
int priority;
bool operator<(const Task& other) const {
return priority < other.priority; // 优先级高的先执行
}
};
std::priority_queue<Task> tasks;
// 修改enqueue方法
template<class F, class... Args>
auto enqueue(int priority, F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
// ... 其他代码不变
tasks.emplace(Task{[task](){ (*task)(); }, priority});
// ...
}