在 Qt 多线程编程中,线程池是一种高效管理和复用线程的技术,能够减少线程创建和销毁的开销,提高应用性能和响应性。本文将深入探讨 Qt 线程池的设计与实现,包括核心组件、任务调度、线程管理和性能优化等方面。
一、线程池基本原理
1. 线程池核心组件
- 工作线程:预先创建的一组线程,等待执行任务
- 任务队列:存储待执行的任务,线程从队列中获取任务执行
- 管理器:负责线程的创建、销毁和任务的分配调度
2. 线程池工作流程
- 初始化时创建一定数量的工作线程
- 外部提交任务到任务队列
- 工作线程从任务队列获取任务并执行
- 任务执行完毕后,线程返回线程池等待下一个任务
- 根据系统负载和任务特性,动态调整线程数量
二、简单线程池实现
1. 任务接口定义
class Runnable {
public:
virtual ~Runnable() {}
virtual void run() = 0;
};
2. 线程池实现
class ThreadPool {
public:
explicit ThreadPool(int threadCount = QThread::idealThreadCount())
: m_stop(false) {
// 创建工作线程
for (int i = 0; i < threadCount; ++i) {
QThread *thread = new QThread(this);
thread->start();
m_threads.append(thread);
// 创建工作者并移动到线程
Worker *worker = new Worker(&m_taskQueue, &m_mutex, &m_condition, &m_stop);
worker->moveToThread(thread);
// 连接信号槽
connect(worker, &Worker::taskFinished, this, &ThreadPool::taskFinished);
m_workers.append(worker);
// 启动工作者
QMetaObject::invokeMethod(worker, "run", Qt::QueuedConnection);
}
}
~ThreadPool() {
// 停止线程池
{
QMutexLocker locker(&m_mutex);
m_stop = true;
m_condition.wakeAll();
}
// 等待所有线程完成
foreach (QThread *thread, m_threads) {
thread->quit();
thread->wait();
}
}
void enqueueTask(Runnable *task) {
QMutexLocker locker(&m_mutex);
m_taskQueue.enqueue(task);
m_condition.wakeOne();
}
signals:
void taskFinished(Runnable *task);
private:
QList<QThread*> m_threads;
QList<Worker*> m_workers;
QQueue<Runnable*> m_taskQueue;
QMutex m_mutex;
QWaitCondition m_condition;
bool m_stop;
};
3. 工作者实现
class Worker : public QObject {
Q_OBJECT
public:
explicit Worker(QQueue<Runnable*> *taskQueue, QMutex *mutex,
QWaitCondition *condition, bool *stop, QObject *parent = nullptr)
: QObject(parent), m_taskQueue(taskQueue), m_mutex(mutex),
m_condition(condition), m_stop(stop) {}
public slots:
void run() {
while (true) {
Runnable *task = nullptr;
{
QMutexLocker locker(m_mutex);
// 等待任务或停止信号
while (m_taskQueue->isEmpty() && !*m_stop) {
m_condition->wait(m_mutex);
}
// 检查是否需要停止
if (*m_stop && m_taskQueue->isEmpty()) {
return;
}
// 获取任务
task = m_taskQueue->dequeue();
}
// 执行任务
if (task) {
task->run();
emit taskFinished(task);
}
}
}
signals:
void taskFinished(Runnable *task);
private:
QQueue<Runnable*> *m_taskQueue;
QMutex *m_mutex;
QWaitCondition *m_condition;
bool *m_stop;
};
三、高级线程池实现
1. 带优先级的任务队列
template <typename T>
class PriorityQueue {
public:
void enqueue(const T &task, int priority = 0) {
QMutexLocker locker(&m_mutex);
// 按优先级插入任务
for (auto it = m_tasks.begin(); it != m_tasks.end(); ++it) {
if (priority > it->second) {
m_tasks.insert(it, qMakePair(task, priority));
return;
}
}
// 优先级最低,插入到末尾
m_tasks.append(qMakePair(task, priority));
m_condition.wakeOne();
}
T dequeue() {
QMutexLocker locker(&m_mutex);
// 等待任务
while (m_tasks.isEmpty()) {
m_condition.wait(&m_mutex);
}
// 获取最高优先级任务
T task = m_tasks.takeFirst().first();
return task;
}
bool isEmpty() const {
QMutexLocker locker(&m_mutex);
return m_tasks.isEmpty();
}
private:
QList<QPair<T, int>> m_tasks;
mutable QMutex m_mutex;
QWaitCondition m_condition;
};
2. 动态调整线程数量的线程池
class DynamicThreadPool {
public:
explicit DynamicThreadPool(int minThreads = 4, int maxThreads = 16)
: m_minThreads(minThreads), m_maxThreads(maxThreads),
m_activeThreads(0), m_stop(false) {
// 创建最小数量的线程
for (int i = 0; i < minThreads; ++i) {
createWorker();
}
}
~DynamicThreadPool() {
// 停止线程池
{
QMutexLocker locker(&m_mutex);
m_stop = true;
m_condition.wakeAll();
}
// 等待所有线程完成
foreach (QThread *thread, m_threads) {
thread->quit();
thread->wait();
}
}
void enqueueTask(Runnable *task) {
QMutexLocker locker(&m_mutex);
// 如果任务队列太长且线程数未达到最大值,创建新线程
if (m_taskQueue.size() > m_activeThreads && m_threads.size() < m_maxThreads) {
createWorker();
}
m_taskQueue.enqueue(task);
m_condition.wakeOne();
}
private:
void createWorker() {
QThread *thread = new QThread(this);
thread->start();
m_threads.append(thread);
Worker *worker = new Worker(&m_taskQueue, &m_mutex, &m_condition,
&m_stop, &m_activeThreads);
worker->moveToThread(thread);
connect(worker, &Worker::taskStarted, this, &DynamicThreadPool::taskStarted);
connect(worker, &Worker::taskFinished, this, &DynamicThreadPool::taskFinished);
connect(worker, &Worker::idleTimeout, this, &DynamicThreadPool::workerIdleTimeout);
m_workers.append(worker);
QMetaObject::invokeMethod(worker, "run", Qt::QueuedConnection);
}
void workerIdleTimeout(Worker *worker) {
QMutexLocker locker(&m_mutex);
// 如果线程数超过最小值,删除空闲线程
if (m_threads.size() > m_minThreads) {
int index = m_workers.indexOf(worker);
if (index >= 0) {
QThread *thread = m_threads.takeAt(index);
m_workers.takeAt(index)->deleteLater();
thread->quit();
thread->wait();
thread->deleteLater();
}
}
}
private:
QList<QThread*> m_threads;
QList<Worker*> m_workers;
QQueue<Runnable*> m_taskQueue;
QMutex m_mutex;
QWaitCondition m_condition;
int m_minThreads;
int m_maxThreads;
int m_activeThreads;
bool m_stop;
};
四、使用 QtConcurrent 的线程池
1. QtConcurrent 基础用法
#include <QtConcurrent>
// 定义一个函数
int compute(int value) {
// 模拟耗时计算
QThread::sleep(1);
return value * value;
}
// 使用 QtConcurrent 运行函数
void useQtConcurrent() {
// 在全局线程池中运行任务
QFuture<int> future = QtConcurrent::run(compute, 42);
// 可以继续执行其他代码...
// 获取结果
int result = future.result();
qDebug() << "Result:" << result;
}
// 并行处理容器中的元素
void processContainer() {
QList<int> inputList = {1, 2, 3, 4, 5};
// 并行处理每个元素
QFuture<void> future = QtConcurrent::map(inputList, [](int &value) {
value = value * value;
});
// 等待所有任务完成
future.waitForFinished();
qDebug() << "Processed list:" << inputList;
}
2. 使用 QFutureWatcher 监控任务
void useFutureWatcher() {
// 创建并启动任务
QFuture<int> future = QtConcurrent::run([]() {
// 模拟耗时操作
QThread::sleep(3);
return 42;
});
// 创建监听器
QFutureWatcher<int> *watcher = new QFutureWatcher<int>(this);
// 连接信号槽
connect(watcher, &QFutureWatcher<int>::finished, this, [this, watcher]() {
int result = watcher->result();
qDebug() << "Task finished with result:" << result;
watcher->deleteLater();
});
// 设置监听器
watcher->setFuture(future);
}
五、线程池性能优化
1. 任务队列优化
// 使用无锁队列提高性能
template <typename T>
class LockFreeQueue {
public:
void enqueue(const T &value) {
Node *newNode = new Node(value);
Node *oldTail = m_tail.load();
while (!m_tail.compare_exchange_weak(oldTail, newNode)) {
// CAS 失败,重试
}
oldTail->next = newNode;
}
bool dequeue(T &value) {
Node *oldHead = m_head.load();
if (oldHead == m_tail.load()) {
return false; // 队列为空
}
Node *newHead = oldHead->next;
if (m_head.compare_exchange_weak(oldHead, newHead)) {
value = oldHead->data;
delete oldHead;
return true;
}
return false; // CAS 失败,重试
}
private:
struct Node {
T data;
Node *next;
explicit Node(const T &value) : data(value), next(nullptr) {}
};
std::atomic<Node*> m_head;
std::atomic<Node*> m_tail;
};
2. 线程亲和性设置
// 设置线程亲和性
void setThreadAffinity(QThread *thread, int cpuCore) {
#ifdef Q_OS_LINUX
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpuCore, &cpuset);
pthread_setaffinity_np(thread->threadId(), sizeof(cpu_set_t), &cpuset);
#endif
}
// 在线程启动时设置亲和性
void Worker::run() {
// 设置线程亲和性
setThreadAffinity(QThread::currentThread(), m_cpuCore);
// 线程执行代码
// ...
}
六、线程池最佳实践
1. 任务分割策略
// 将大任务分割为多个小任务
void splitTask() {
const int taskCount = 100;
const int batchSize = 10;
for (int i = 0; i < taskCount; i += batchSize) {
threadPool->enqueueTask(new BatchTask(i, batchSize));
}
}
// 批处理任务实现
class BatchTask : public Runnable {
public:
BatchTask(int startIndex, int count)
: m_startIndex(startIndex), m_count(count) {}
void run() override {
for (int i = 0; i < m_count; ++i) {
// 处理单个任务
processItem(m_startIndex + i);
}
}
private:
int m_startIndex;
int m_count;
};
2. 线程池大小配置
// 根据 CPU 核心数配置线程池大小
int determineThreadPoolSize() {
int coreCount = QThread::idealThreadCount();
// I/O 密集型任务可以设置比 CPU 核心数大的线程数
if (isIoIntensive()) {
return coreCount * 2;
}
// CPU 密集型任务通常设置为 CPU 核心数
return coreCount;
}
七、总结
线程池是 Qt 多线程编程中的重要技术,能够有效管理线程资源,提高应用性能。本文介绍了线程池的基本原理、实现方法和优化策略,主要内容包括:
- 线程池核心组件:工作线程、任务队列和管理器
- 线程池实现方式:从简单实现到动态调整线程数量的高级实现
- QtConcurrent 线程池:Qt 提供的高级线程池接口
- 性能优化技术:无锁队列、线程亲和性设置等
- 最佳实践:任务分割、合理配置线程池大小等
在实际开发中,应根据应用的特点和需求选择合适的线程池实现方式,并通过性能测试不断优化线程池配置,以达到最佳的性能和资源利用率。