【LINUX操作系统】生产者消费者模型(下):封装、信号量与环形队列

发布于:2025-05-18 ⋅ 阅读:(17) ⋅ 点赞:(0)

1.封装、完善基于阻塞队列的productor-consumer module

前文中我们封装了自己的Mutex 

【LINUX操作系统】线程同步与互斥-CSDN博客

按照老规矩,现在我们对同步与互斥的理解更进一步了,现在把这种面向过程的语言封装成面向对象的写法

 1.1 封装条件变量

#pragma once

#include <iostream>
#include <string>
#include <pthread.h>
#include "Mutex.hpp"

namespace CondModule
{
    class Cond
    {
    public:
        Cond()
        {
            pthread_cond_init(&_cond,nullptr);
        }
        ~Cond()
        {
            pthread_cond_destroy(&_cond);
        }
        void Wait()
        {

        }
        void notify()
        {
            pthread_cond_signal(&_cond);
        }
         void notifyall()
        {
            pthread_cond_broadcast(&_cond);
        }

    private:
        pthread_cond_t _cond;
    };
}

在上面的代码中,需要注意wait函数,因为让线程在条件变量下等待时,pthread_cond_wait接口会释放线程所持有的锁,所以需要让该接口接收一个参数,用于pthread_cond_wait的第二个参数

                        

再由上一文中的对Mutex的封装:

【LINUX操作系统】线程同步与互斥-CSDN博客

                

1.2 在BlockQueue中使用自己封装的接口

完整的使用自己接口的第二版代码,将代码改成面向对象的风格

//version 2 引入自己的接口
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "Mutex.hpp"
#include "Cond.hpp"

namespace BQModule
{
    using namespace CondModule;
    using namespace LockMoudle;
    size_t gsize = 5;//整个队列最多装5个数据
    template<typename T>
    class BlockQueue
    {
    public:

        bool IsFull()
        {
            return _bq.size()==_capacity;
        }
        bool IsEmpty()
        {
            return _bq.empty();
        }

        BlockQueue()
        : _cwait_num(0),_pwait_num(0),_capacity(gsize)
        {
            //自己封装的接口都是在定义时就初始化了
            // pthread_mutex_init(&_lock,nullptr);
            // pthread_cond_init(&_con_cond,nullptr);
            // pthread_cond_init(&_pro_cond,nullptr);
        }

        void Pop(T* p_data)//designed for Consumer
        {
            //pthread_mutex_lock(&_lock);
            MutexGuard mutexguard(_lock);
            while(IsEmpty())
            {
                _cwait_num++;
                //bq中已经没有数据,消费者线程需要进入条件变量去等待
                //pthread_cond_wait(&_con_cond,&_lock);//再次理解为什么wait必须释放锁
                _con_cond.wait(_lock);
                _cwait_num--;
            }
            *p_data = _bq.front();
            _bq.pop();
            //此处,刚刚减少数据,并且还没有解锁,bq中一定没有满,可以唤醒一个生产者线程
            if(_pwait_num)
            _pro_cond.notify();

            //pthread_mutex_unlock(&_lock);
        }

        void Enqueue(const T& data)//designed for Productor
        {
            //pthread_mutex_lock(&_lock);
            MutexGuard mutexguard(_lock);
            while(IsFull())
            {
                _pwait_num++;
                //bq中数据已经满了,生产者需要进入条件变量去等待
                _pro_cond.wait(_lock);
                _pwait_num--;
            }
            _bq.push(data);
            //此处,刚刚加入数据,并且还没有解锁,bq中一定有数据,可以去唤醒一个消费者条件变量中的线程
            if(_cwait_num)
            //pthread_cond_signal(&_con_cond);
            _con_cond.notify();

            //pthread_mutex_unlock(&_lock);
        }

        ~BlockQueue()
        {
            //自己封装的类在析构时都会自动释放资源
            // pthread_mutex_destroy(&_lock);
            // pthread_cond_destroy(&_con_cond);
            // pthread_cond_destroy(&_pro_cond);
        }

    private:
        std::queue<T> _bq;
        Cond  _pro_cond; //生产者的条件变量
        Cond _con_cond; //消费者的条件变量
        Mutex _lock;//一把需要被各生产者和消费者看到的锁,用于互斥
        size_t _capacity; //容量

        int _cwait_num;
        int _pwait_num;
    };
}

这样一来,所有的锁都不再需要手动释放,而是代码在RAII风格下自动释放锁。 

1.3 实现多生产者多消费者版本

由于我们设计的本来就只有一把锁,消费者与消费者、生产者与生产者这两种关系先天就是互斥且同步的

测试结果:

1.4 传递任务的生产消费模型

交易场所不仅仅用来传递数据,也可以用来传递任务。

假设今天有个Task类,并且按照Task修改主函数中模板:

class Task
{
public:
    Task(int x = 1, int y = 1)
        : _x(x), _y(y)
    {
    }
    ~Task() {}
    void Excute()
    {
        _res = _x+_y;
    }
    int res()
    {
        return _res;
    }
private:
    int _x;
    int _y;
    int _res;
};
using namespace BQModule;
void *Consumer(void *arg) // 消费者
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);
    while (true)
    {
        sleep(2);
        // 1.从bq中获得数据
        Task data;
        bq->Pop(&data); // 输出型参数

        // 2.处理数据
        data.Excute();
        std::cout << "consumer get: " << data.res() << std::endl;
    }
}

void *Productor(void *arg) // 生产者
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);

    while (true)
    {
        // sleep(2);
        //  1.从外部获取数据
        //::data+=10;//+=操作不是原子的,可能发生进入了相同数据的情况
        int x = rand() % 5 + 1; //[1,5]
        int y = rand() % 9 + 1; //[1,9]
        Task t(x, y);

        // 2.将数据入队列
        bq->Enqueue(t);
        printf("productor asked  %d+%d=? \n", x, y);
    }
}

int main()
{
    srand(time(nullptr) ^ getpid());
    BlockQueue<Task> *bq = new BlockQueue<Task>;
    // pthread_t p1,p2,c1,c2,c3;
    pthread_t p1, c1;
    pthread_create(&p1, nullptr, Productor, (void *)bq);

    pthread_create(&c1, nullptr, Consumer, (void *)bq);

    pthread_join(p1, nullptr);

    pthread_join(c1, nullptr);

    delete bq;

    return 0;
}

也可以使用using task_t =function<void()>来实现,直接把task_t当作BlockQueue的类型即可。


2. 进一步理解 生产消费者模型

互斥访问,何以高效?

生产消费模型都是在各种锁或者信号量的控制下进行的,那是否能说明模型中的各个线程都是串行执行的呢?串行执行难道不是有违设计线程的初衷,降低了整体效率吗?

由以上的传递任务的模型不难看出,生产消费模型并不是单纯的用于在阻塞队列中去传递数据(一些网络服务器虽然确实是做这个的),他还有很多其他运用场景,比如:玩卡牌游戏,一个线程用于获得键鼠等外设上的信息(生产者),另一个(可能是好几个)线程用于处理背后的逻辑(消费者),那其实对于整个进程来说,线程A外设上等待信息的时间和线程B处理背后逻辑的时间才是真正花时间的,而等待信息和处理逻辑的时候,显然两个线程是在并行。

超市中去放货取货只是整个进程中损耗很小的一部分,不应该由此担心是不是变成了串行。


3. POSIX信号量

        信号量(Semaphore)是操作系统中用于进程或线程同步的核心工具,其本质是一个非负整数计数器,用于表示共享资源的剩余数量或访问权限的状态。通过计数器的增减操作,信号量可以协调多个进程/线程对临界资源的访问,避免竞争条件(Race Condition)和数据不一致问题。

        在进程间通信的时候我们大致介绍过基于systemV的信号量,现在我们介绍POSIX信号量:

        之前我们的mutex都是对于资源的整体预订,而信号量是一种不用去整体预定的策略。

信号量就像买电影票,买了电影票表示预定了这个座位:一个大数组,我不需要全部预定完,可以只使用一个空间。信号量就是对剩余座位的计数器,表示当下还有多少“部分资源”可供使用。

                        

POSIX的信号量多用于满足线程间同步。

就像mutex的锁一样,信号量sem本身就是临界资源,所以信号量的--(P操作)或者++(V操作)本身必须是原子的。

所以对于锁来说(mutex),其实就是一个二元信号量去控制,也就是对于该信号量:非0即1。整体资源还存在、没被拿就是1,被拿了(在讲锁的原理的时候提到过,本质是swap,是原子性的汇编操作)就是0

  • P操作(sem_wait:也叫等待信号量
    减少信号量的值。若结果为负,则阻塞调用线程(或进程,取决于信号量类型),直到信号量非负。
    • 函数原型:
      int sem_wait(sem_t *sem);
    • 行为:
      • 若信号量值 > 0,则减1并继续执行。
      • 若信号量值 = 0,则线程被挂起,直到其他线程调用sem_post增加信号量值。
  • V操作(sem_post:也叫发布信号量
    增加信号量的值。若信号量原本为0(即有线程因sem_wait被阻塞),则唤醒一个阻塞的线程。
    • 函数原型:
      int sem_post(sem_t *sem);
    • 行为:
      • 信号量值加1。
      • 若存在因sem_wait阻塞的线程,则唤醒其中一个。

顺便再把semaphore中其他常用的接口也记录一下(semaphore头文件编译的时候也需要加上-lpthread)

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表⽰线程间共享,⾮零表⽰进程间共享
value:信号量初始值

销毁信号量
int sem_destroy(sem_t *sem);

 value表示一开始有的“部分资源”的数量。比如这种情况就该设置为16,如果已经有一个被使用就设置成15。

                

中间的参数都默认设置成0即可。


4. 基于环形队列的⽣产消费模型

        循环队列版本的生产消费模型本质就是利用了信号量实现并发处理,在前面的阻塞队列版本,不论是单生产、消费线程版本还是多生产、消费线程版本,都无法做到生产的同时消费,但是实际上如果可以以信号量来控制资源的使用逻辑就不影响了。

        因此,今天我们选择用一个数组vector来作为存放数据的基本容器

循环队列原理

head是循环队列的头,tail是循环队列的尾。整个队列为空时,head和tail同时指向同一个位置(如图)。一旦开始插入数据,head指向下一个会被使用的数据(也就是整个队列的第一个数据,队列的头,消费者下一次就从head处拿出数据),tail指向下一个即将插入数据的位置(也就是整个队列的第一个空位置,生产者生产出数据就放在tail指向的位置),整个循环队列满的时候,head和tail也都指向同一个位置。可以用取模的方法保证tail会去转圈。

                ​​​​​​​        

资源,分为数据空间。

对于生产者来说,tail从0开始,对空间资源进行P操作(--),对数据资源进行V操作(++)

对于消费者来说,head从0开始,对空间资源进行V操作,对数据资源进行P操作

        ​​​​​​​        

分析:刚开始没有数据的时候,head和tail指向同一个位置,难道不会让两个线程发生竞态条件吗?此时没有数据,数据资源的计数器----数据资源的信号量为0,该信号量自动不让消费者进入,保证生产者先进行原子性生产(对空间资源进行V操作)

最后数据满的时候,head和tail又指向同一个位置,此时没有空间了,所有的空间都有数据,空间资源的计数器----空间资源的信号量为0,该信号量自动不让生产者进入,保证消费者先进性原子性消费(对数据进行P操作)。

注意,至于每次等待的队列到底是FIFO还是随机争抢,这个取决于操作系统的实现方法 

代码实现

借鉴下之前写BlockQueue的主程序,稍加改造

【LINUX操作系统】生产者消费者模型(上):概念与阻塞队列-CSDN博客

(记得再改改变量名,bq是blockqueue的缩写......)

构建ringbuffer类:

 

namespace RingBufferModule
{
    template <typename T>
    class ringbuffer
    {
    public:
        ringbuffer(int init_sem_num=DEFAULTSENMVAL)
        :_p_pos(0)
        ,_c_pos(0)
        ,_ring(SIZE)
        ,_size(SIZE)
        {
            int n = sem_init(&_data_sem,0,init_sem_num);
            if(n<0)
            {
                std::cerr<<"sem init error"<<std::endl;
            }

            int m = sem_init(&_space_sem,0,init_sem_num);
            if(m<0)
            {
                std::cerr<<"sem init error"<<std::endl;
            }
        }

        void Enqueue(const T& in)//给生产者用
        {
            //1.获取数据

            //2.入环
        }

        void Pop(T* out)//给消费者用
        {

        }

        ~ringbuffer()
        {
            sem_destroy(&_data_sem);
            sem_destroy(&_space_sem);
        }

        ringbuffer
    private:
        std::vector<T> _ring; //存储数据的基本容器,也是重点临界资源
        size_t _size; //整个环的大小
        sem_t _data_sem;//数据资源信号量
        sem_t _space_sem;//空间资源信号量
        int _p_pos;//生产者下标位置,即tail,表示下一次放数据的地方
        int _c_pos;//消费者下标位置,即head,表示下一次取数据的地方
    };
}

对于这个循环队列,需要有Enqueue和Pop的接口,分别给生产者和消费者用于放数据。

前几次都是先用最基本的C库接口封装,然后自己封装一个面向对象的类,再替换接口,这次直接先封装,再调接口。

就可以改变我们刚刚的构造函数

并且完成具体逻辑的函数,将各种sem_wait还有sem_post就都封装起来了。

在一开始数据为空的时候,不用担心消费者进来,因为此时_data_sem无法进行P操作,会阻塞在原地。

测试

这样,我们的循环队列保证在为空为满时都是进行“互斥与同步”,而在其余时刻,各自访问各自的信号量,提升了一丢丢的效率。


5. 多生产多消费的循环队列生产消费模型 

        再加入其他生产者消费者之后,关系又恢复成了需要控制:生产者与生产者、消费者与消费者,生产者与消费者。我们采取如下思路:给生产者们加一个锁,所有的生产者竞争出来一个线程去参加这次循环队列的sem的PV操作;给所有的消费者一个锁,所有的消费者竞争出来一个线程去参加这次循环队列的sem的PV操作。

依然是直接引进我们自己实现的RAII风格的锁。【LINUX操作系统】线程同步与互斥-CSDN博客

现在的问题是,锁在哪里加效率更高呢?

比如Pop,是在MutexGuard后使用data_sem.P()还是在MutexGuard之前呢? 

        

两者都能完成只竞争出一个进入循环队列的栈,区别在于:

在1,先锁住,只有一个去进行数据信号量的“取电影票”,然后正常执行。

在2,先让可以取电影票的线程(消费者们)都取一张电影票,然后再锁住,依次让人进入电影院获取资源。自然,方案2效率更高。一次性的并发_data_sem.P()可以减少之后串行调用该接口的时间。

        这样依然不担心生产者与消费者的互斥,因为能拿到票这一步已经天然的构成了一道屏障,只要拿到了票就一定不会冲突,可以各自使用各自的部分资源。

最终代码:

#define SIZE 10

using namespace LockMoudle;

namespace RingBufferModule
{
    template <typename T>
    class ringbuffer
    {
    public:
        ringbuffer()
            : _p_pos(0), _c_pos(0), _ring(SIZE), _size(SIZE), _data_sem(0), _space_sem(SIZE)
        {
        }

        void Enqueue(const T &in) // 给生产者用
        {
            _space_sem.P();
            
            {
                MutexGuard lockguard(_p_mutex);
                _ring[_p_pos] = in;
                _p_pos++;
                _p_pos %= _size;
            }

            _data_sem.V();
        }

        void Pop(T *out) // 给消费者用
        {
            // MutexGuard lockguard(_c_mutex);//锁在这里1
            _data_sem.P();

            {
                MutexGuard lockguard(_c_mutex); // 锁在这里2
                *out = _ring[_c_pos];
                _c_pos++;
                _c_pos %= _size;
            }

            _space_sem.V();
        }

        ~ringbuffer()
        {
            // 都自动销毁了
        }

    private:
        std::vector<T> _ring; // 存储数据的基本容器,也是重点临界资源
        size_t _size;         // 整个环的大小
        int _p_pos;           // 生产者下标位置,即tail,表示下一次放数据的地方
        int _c_pos;           // 消费者下标位置,即head,表示下一次取数据的地方

        Sem _data_sem;  // 数据资源信号量
        Sem _space_sem; // 空间资源信号量

        Mutex _p_mutex;
        Mutex _c_mutex;
    };
}


网站公告

今日签到

点亮在社区的每一天
去签到