目录
高性能编程:无锁队列相关概念http://t.csdnimg.cn/DYP7A
相关概念请阅读:
高性能编程:无锁队列相关概念
http://t.csdnimg.cn/DYP7A
1. LockedQueue:队列为空时不阻塞消费者线程
LockedQueue是一种基于锁的队列实现,当队列为空时,消费者线程不会被阻塞。它适用于任务耗时长短不一且不需要严格区分生产者和消费者数量的场景。这种设计允许消费者在队列为空时继续执行其他任务或进行轮询,而不是被迫等待新任务的到来。
1.1 为什么选择基于锁的队列
尽管无锁队列在高并发场景下具有显著的性能优势,但基于锁的队列在某些情况下仍然具有以下优势:
- 实现简单:基于锁的队列实现相对直观,易于理解和维护。
- 适用性广:适用于生产者和消费者数量不固定,且任务处理时间不一致的场景。
- 低并发需求:在并发程度不高的应用中,基于锁的开销较低,且不会成为性能瓶颈。
1.2 LockedQueue 的设计原则
LockedQueue主要考虑以下两点:
- 任务的耗时:任务执行时间不一,可能有长有短。
- 生产者和消费者的数量:不区分生产者和消费者的具体数量。
这种设计允许队列在处理任务时更加灵活,消费者线程在队列为空时不会被阻塞,可以进行其他操作或继续轮询,提高了系统的响应性和资源利用率。
1.3 LockedQueue 代码实践
#ifndef MARK_LOCKEDQUEUE_H
#define MARK_LOCKEDQUEUE_H
#include <deque>
#include <mutex>
template <class T, typename StorageType = std::deque<T> >
class LockedQueue
{
//! Lock access to the queue.
std::mutex _lock;
//! Storage backing the queue.
StorageType _queue;
//! Cancellation flag.
volatile bool _canceled;
public:
//! Create a LockedQueue.
LockedQueue()
: _canceled(false)
{
}
//! Destroy a LockedQueue.
virtual ~LockedQueue()
{
}
//! Adds an item to the queue.
void add(const T& item)
{
lock();
_queue.push_back(item);
unlock();
}
//! Adds items back to front of the queue
template<class Iterator>
void readd(Iterator begin, Iterator end)
{
std::lock_guard<std::mutex> lock(_lock);
_queue.insert(_queue.begin(), begin, end);
}
//! Gets the next result in the queue, if any.
bool next(T& result)
{
std::lock_guard<std::mutex> lock(_lock);
if (_queue.empty())
return false;
result = _queue.front();
_queue.pop_front();
return true;
}
template<class Checker>
bool next(T& result, Checker& check)
{
std::lock_guard<std::mutex> lock(_lock);
if (_queue.empty())
return false;
result = _queue.front();
if (!check.Process(result))
return false;
_queue.pop_front();
return true;
}
//! Peeks at the top of the queue. Check if the queue is empty before calling! Remember to unlock after use if autoUnlock == false.
T& peek(bool autoUnlock = false)
{
lock();
T& result = _queue.front();
if (autoUnlock)
unlock();
return result;
}
//! Cancels the queue.
void cancel()
{
std::lock_guard<std::mutex> lock(_lock);
_canceled = true;
}
//! Checks if the queue is cancelled.
bool cancelled()
{
std::lock_guard<std::mutex> lock(_lock);
return _canceled;
}
//! Locks the queue for access.
void lock()
{
this->_lock.lock();
}
//! Unlocks the queue.
void unlock()
{
this->_lock.unlock();
}
///! Calls pop_front of the queue
void pop_front()
{
std::lock_guard<std::mutex> lock(_lock);
_queue.pop_front();
}
///! Checks if we're empty or not with locks held
bool empty()
{
std::lock_guard<std::mutex> lock(_lock);
return _queue.empty();
}
};
#endif
1.3.1类成员变量
std::mutex _lock
:- 用于保护队列的线程安全访问。任何对队列的修改操作都需要先获取此锁,确保同一时间只有一个线程可以访问队列。
StorageType _queue
:- 队列的底层存储结构,默认使用
std::deque
,但可以通过模板参数自定义其他容器类型,如std::vector
或自定义的环形缓冲区。
- 队列的底层存储结构,默认使用
volatile bool _canceled
:- 用于标记队列是否被取消。
volatile
关键字用于防止编译器对其进行优化,确保多线程环境下的可见性。
- 用于标记队列是否被取消。
1.3.2主要成员函数
1.3.2.1 add函数
void add(const T& item)
{
lock();
_queue.push_back(item);
unlock();
}
- 功能:向队列末尾添加一个元素。
- 实现:
- 获取锁。
- 将元素添加到队列末尾。
- 释放锁。
- 注意:手动锁定和解锁,需确保调用者正确管理锁的生命周期。
1.3.2.2 read函数
template<class Iterator>
void readd(Iterator begin, Iterator end)
{
std::lock_guard<std::mutex> lock(_lock);
_queue.insert(_queue.begin(), begin, end);
}
- 功能:将一组元素重新添加到队列前端。
- 实现:
- 使用
std::lock_guard
自动管理锁的生命周期。 - 将迭代器范围内的元素插入到队列开头。
- 使用
1.3.2.3 next函数
bool next(T& result)
{
std::lock_guard<std::mutex> lock(_lock);
if (_queue.empty())
return false;
result = _queue.front();
_queue.pop_front();
return true;
}
- 功能:尝试获取队列中的下一个元素。如果队列为空,返回
false
。 - 实现:
- 自动获取和释放锁。
- 检查队列是否为空。
- 如果不为空,取出队首元素并移除。
- 返回
true
表示成功获取元素。
template<class Checker>
bool next(T& result, Checker& check)
{
std::lock_guard<std::mutex> lock(_lock);
if (_queue.empty())
return false;
result = _queue.front();
if (!check.Process(result))
return false;
_queue.pop_front();
return true;
}
- 功能:带有检查器的
next
函数,只有在check.Process(result)
返回true
时,才移除队首元素。 - 实现:
- 获取锁。
- 检查队列是否为空。
- 获取队首元素。
- 调用检查器处理元素,若返回
true
,则移除元素并返回true
。 - 否则,不移除元素并返回
false
。
1.3.2.4 peek函数
T& peek(bool autoUnlock = false)
{
lock();
T& result = _queue.front();
if (autoUnlock)
unlock();
return result;
}
功能:查看队列的第一个元素,但不移除它。
实现:
- 获取锁。
- 返回队首元素的引用。
- 根据
autoUnlock
参数决定是否立即释放锁。
注意:调用者需要在不使用
autoUnlock
时手动释放锁,确保不会造成死锁或资源泄漏。
1.3.2.5 Cancle和Canclled函数
void cancel()
{
std::lock_guard<std::mutex> lock(_lock);
_canceled = true;
}
bool cancelled()
{
std::lock_guard<std::mutex> lock(_lock);
return _canceled;
}
- 功能:设置和检查队列的取消状态。
- 实现:
cancel
函数通过锁保护设置取消标志为true
。cancelled
函数通过锁保护读取取消标志。
1.3.2.6 lock和unlock函数
void lock()
{
this->_lock.lock();
}
void unlock()
{
this->_lock.unlock();
}
功能:手动控制队列的锁。
实现:直接调用
std::mutex
的lock
和unlock
方法。注意:手动管理锁需要谨慎,确保每次
lock
都对应一次unlock
,避免死锁。
1.3.2.7 pop_front和empty函数
void pop_front()
{
std::lock_guard<std::mutex> lock(_lock);
_queue.pop_front();
}
bool empty()
{
std::lock_guard<std::mutex> lock(_lock);
return _queue.empty();
}
- 功能:
pop_front
:移除队列的第一个元素。empty
:检查队列是否为空。
- 实现:
- 使用
std::lock_guard
自动管理锁。 - 执行相应的队列操作。
- 使用
1.3.3 如何实现队列为空时不阻塞消费者
在 LockedQueue
中,消费者通过调用 next
函数来尝试获取队列中的下一个元素。如果队列为空,next
函数会立即返回 false
,而不是让消费者线程进入阻塞状态。这种非阻塞的设计适用于以下场景:
任务耗时长短不一:
- 长耗时任务:消费者可能需要较长时间来处理任务。在这种情况下,消费者不会被队列的空闲状态所阻塞,可以继续执行其他操作或检查队列。
- 短耗时任务:消费者可以快速地处理任务,并频繁地检查队列状态,而无需等待。
不区分生产者和消费者数量:
- 无论生产者和消费者的数量如何变化,消费者线程在队列为空时都不会被强制阻塞,而是能够灵活地处理或跳过。
1.3.4 使用示例
假设有一个生产者线程不断向 LockedQueue
添加任务,而多个消费者线程从队列中获取任务并处理:
#include "LockedQueue.h"
#include <thread>
#include <iostream>
#include <vector>
#include <chrono>
// 示例任务
struct Task {
int id;
// 其他任务相关数据
};
// 生产者函数
void producer(LockedQueue<Task>& queue) {
for (int i = 0; i < 100; ++i) {
Task task = {i};
queue.add(task);
std::cout << "Produced Task " << task.id << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 模拟生产延迟
}
queue.cancel(); // 生产完成,取消队列
}
// 消费者函数
void consumer(LockedQueue<Task>& queue, int consumer_id) {
while (!queue.cancelled()) {
Task task;
if (queue.next(task)) {
std::cout << "Consumer " << consumer_id << " processing Task " << task.id << std::endl;
// 模拟任务处理时间
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
// 队列为空,消费者可以执行其他操作或稍作等待
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
std::cout << "Consumer " << consumer_id << " exiting." << std::endl;
}
int main() {
LockedQueue<Task> queue;
// 启动生产者线程
std::thread prod_thread(producer, std::ref(queue));
// 启动多个消费者线程
std::vector<std::thread> consumers;
for (int i = 0; i < 3; ++i) {
consumers.emplace_back(consumer, std::ref(queue), i + 1);
}
// 等待生产者线程完成
prod_thread.join();
// 等待消费者线程完成
for (auto& cons : consumers) {
cons.join();
}
return 0;
}
解释:
生产者线程:
- 向队列中添加 100 个任务,每添加一个任务后等待 50 毫秒。
- 添加完所有任务后,调用
queue.cancel()
标记队列为取消状态。
消费者线程:
- 不断尝试从队列中获取任务。
- 如果成功获取到任务,处理该任务(模拟处理时间为 100 毫秒)。
- 如果队列为空,消费者线程不会被阻塞,而是等待 10 毫秒后再次尝试获取任务。
- 当队列被取消且无任务时,消费者线程退出。
输出示例:
Produced Task 0
Consumer 1 processing Task 0
Produced Task 1
Produced Task 2
Consumer 2 processing Task 1
Produced Task 3
Consumer 3 processing Task 2
...
Consumer 1 exiting.
Consumer 2 exiting.
Consumer 3 exiting.
1.4 LockedQueue 的优势与限制
1.4.1优势
非阻塞消费者:
- 消费者线程在队列为空时不会被阻塞,可以进行其他操作或继续轮询,提高系统的灵活性和响应性。
简单易用:
- 基于
std::mutex
和std::deque
的实现,使得代码易于理解和维护。
- 基于
适应性强:
- 不区分生产者和消费者的数量,适用于生产者和消费者数量动态变化的场景。
任务耗时灵活:
- 适用于处理任务耗时长短不一的情况,消费者线程能够根据队列状态灵活调整行为。
1.4.2限制
性能瓶颈:
- 基于锁的实现在高并发环境下可能成为性能瓶颈,尤其是在生产者和消费者数量较多时,锁竞争会显著增加。
上下文切换开销:
- 频繁获取和释放锁可能导致上下文切换,增加系统开销,影响整体性能。
缺乏无锁队列的高并发优势:
- 相较于无锁队列,基于锁的队列在高并发场景下的性能较低,无法充分利用多核 CPU 的能力。
潜在的死锁风险:
- 手动管理锁(如
peek
函数中的autoUnlock == false
情况)可能导致死锁或资源泄漏,需谨慎设计和使用。
- 手动管理锁(如
1.5总结
LockedQueue 作为一种基于锁的队列实现,通过 std::mutex
确保线程安全,允许多个生产者和消费者在多线程环境中安全地操作队列。当队列为空时,消费者线程不会被阻塞,而是可以继续执行其他任务或进行轮询,这使得它特别适用于处理任务耗时长短不一且生产者和消费者数量不固定的场景。
设计关键点:
- 线程安全:通过
std::mutex
保护队列,确保生产者和消费者的并发访问不会导致数据竞争和不一致。 - 非阻塞消费者:消费者在队列为空时不会被阻塞,通过
next
函数返回状态,允许线程灵活应对队列状态。 - 灵活性:不区分生产者和消费者数量,适用于动态变化的多线程应用场景。
使用建议:
适用场景:
- 任务处理时间不一致,需灵活调整消费者行为。
- 生产者和消费者数量可能动态变化。
- 系统对实现简单性和可维护性有较高要求,且并发度不极高。
性能优化:
- 在高并发场景下,考虑使用无锁队列或其他更高效的并发数据结构,以减少锁竞争和上下文切换的开销。
- 避免在消费者线程中长时间持有锁,确保锁的获取和释放尽可能快速。
参考: