C++开发基础之:队列用法与生产者消费者模型实战和可直接复用的线程安全的队列

发布于:2025-08-20 ⋅ 阅读:(12) ⋅ 点赞:(0)

在这里插入图片描述

前言

在多线程编程中,生产者-消费者模型 是最常见的并发模式之一。生产者线程负责不断将数据放入队列,而消费者线程则从队列中取出数据进行处理。
C++ 标准库为我们提供了多种队列相关容器和工具,可以轻松实现这一模型。本文实践一步步掌握 C++ 队列的使用。

一、C++ 中常见的队列容器

在这里插入图片描述

1. std::queue

  • 底层默认基于 std::deque 实现

  • 先进先出(FIFO)结构

  • 常用方法:

    • push():入队
    • pop():出队(不返回值,只移除)
    • front():取队首元素
    • back():取队尾元素
    • empty():判空
    • size():元素数量

示例代码:

#include <iostream>
#include <queue>

int main() {
    std::queue<int> q;

    q.push(10);
    q.push(20);
    q.push(30);

    std::cout << "队首: " << q.front() << std::endl; // 10
    std::cout << "队尾: " << q.back()  << std::endl; // 30

    q.pop(); // 移除 10
    std::cout << "新的队首: " << q.front() << std::endl; // 20
}

执行结果
在这里插入图片描述

2. std::deque(双端队列)

  • 既可以从队首插入/删除,也可以从队尾插入/删除
  • std::queue 的底层就是基于 std::deque

示例:

#include <deque>
#include <iostream>

int main() {
    std::deque<int> dq;

    dq.push_back(1);  // 尾部插入
    dq.push_front(2); // 头部插入

    std::cout << dq.front() << " " << dq.back() << std::endl; // 2 1
}

执行结果
在这里插入图片描述

3. std::priority_queue(优先队列)

  • 默认大顶堆(最大值优先)
  • 适用于需要任务调度、优先级处理的场景

示例:

#include <queue>
#include <iostream>
#include <vector>

int main() {
    std::priority_queue<int> pq;

    pq.push(5);
    pq.push(1);
    pq.push(10);

    std::cout << "最大元素: " << pq.top() << std::endl; // 10
}

执行结果
在这里插入图片描述
对的 👍 你这段话是非常关键的知识点,可以在博客里单独拉出来,做成一个 小节重点提示

我建议可以这样组织,读者会更容易理解:


二、为什么 std::queue 不是线程安全的?

C++ 标准库中的 std::queue 只是一个容器适配器,它本身不包含任何并发控制机制。
单线程环境中直接使用没有问题,但在多线程场景下,如果生产者线程在 push 的同时消费者线程在 pop,就会导致数据竞争 (data race),甚至造成程序崩溃。

如何解决?

我们需要引入额外的同步原语:

  • std::mutex(互斥锁)

    • 用于保证对队列的操作是原子性的
    • 在同一时刻,只有一个线程可以修改队列
  • std::condition_variable(条件变量)

    • 用于在队列为空时阻塞消费者线程,避免忙等 (busy waiting)
    • 当生产者放入数据时,调用 notify_one()notify_all() 来唤醒等待的消费者
      在这里插入图片描述

三、使用队列实现生产者消费者模型

下面我们实现一个 典型的生产者消费者模型,其中:

  • 生产者线程不断往队列中放入数据
  • 消费者线程不断从队列中取出数据

示例代码

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>

std::queue<int> q;
std::mutex mtx;
std::condition_variable cv;
bool done = false; // 结束标志

// 生产者
void producer(int id) {
    for (int i = 0; i < 5; i++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        std::unique_lock<std::mutex> lock(mtx);
        q.push(i + id * 100);
        std::cout << "生产者 " << id << " 生产数据: " << (i + id * 100) << std::endl;
        cv.notify_one(); // 通知消费者
    }

    // 通知结束
    std::unique_lock<std::mutex> lock(mtx);
    done = true;
    cv.notify_all();
}

// 消费者
void consumer(int id) {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [] { return !q.empty() || done; });

        if (!q.empty()) {
            int value = q.front();
            q.pop();
            lock.unlock();

            std::cout << "消费者 " << id << " 消费数据: " << value << std::endl;
        } else if (done) {
            break;
        }
    }
}

int main() {
    std::thread p1(producer, 1);
    std::thread p2(producer, 2);
    std::thread c1(consumer, 1);
    std::thread c2(consumer, 2);

    p1.join();
    p2.join();
    c1.join();
    c2.join();

    std::cout << "生产者消费者模型结束。" << std::endl;
    return 0;
}

运行结果

在这里插入图片描述

四、可直接复用的线程安全队列实现

std::mutexstd::condition_variable 封装到一个类里,避免他们每次都要写锁和条件变量逻辑。编写一个可复用的ThreadSafeQueue<T> 模板实现:

模板实现

#include <queue>
#include <mutex>
#include <condition_variable>

template <typename T>
class ThreadSafeQueue {
public:
    ThreadSafeQueue() = default;
    ThreadSafeQueue(const ThreadSafeQueue&) = delete;
    ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;

    // 入队
    void push(T value) {
        {
            std::lock_guard<std::mutex> lock(mtx_);
            queue_.push(std::move(value));
        }
        cv_.notify_one(); // 通知一个等待线程
    }

    // 出队(阻塞)
    T wait_and_pop() {
        std::unique_lock<std::mutex> lock(mtx_);
        cv_.wait(lock, [this] { return !queue_.empty(); });

        T value = std::move(queue_.front());
        queue_.pop();
        return value;
    }

    // 出队(非阻塞,返回是否成功)
    bool try_pop(T& value) {
        std::lock_guard<std::mutex> lock(mtx_);
        if (queue_.empty())
            return false;

        value = std::move(queue_.front());
        queue_.pop();
        return true;
    }

    bool empty() const {
        std::lock_guard<std::mutex> lock(mtx_);
        return queue_.empty();
    }

    size_t size() const {
        std::lock_guard<std::mutex> lock(mtx_);
        return queue_.size();
    }

private:
    mutable std::mutex mtx_;
    std::condition_variable cv_;
    std::queue<T> queue_;
};

调用代码

#include <iostream>
#include <thread>
#include "ThreadSafeQueue.h"  
ThreadSafeQueue<int> tsq;
void producer() {
    for (int i = 0; i < 10; i++) {
        tsq.push(i);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::cout << "生产: " << i << "\r\n";
    }
}
void consumer() {
    for (int i = 0; i < 10; i++) {
        int value = tsq.wait_and_pop();
        std::cout << "消费: " << value << "\r\n";
    }
}
int main() {
    std::thread t1(producer);
    std::thread t2(consumer);
    t1.join();
    t2.join();
    return 0;
}

运行结果

在这里插入图片描述
这样,在实际项目里就可以直接用 ThreadSafeQueue<T> 代替裸 std::queue<T>,代码会简洁很多。

五、总结

  • std::queue:最常用的队列容器,适合 FIFO 场景
  • std::deque:双端队列,支持从头尾高效操作
  • std::priority_queue:适合任务调度与优先级处理
  • 多线程场景下必须加锁,并配合 std::condition_variable 使用,才能实现高效的生产者-消费者模型

网站公告

今日签到

点亮在社区的每一天
去签到