基于环形队列的生产消费模型(信号量的线程级使用)

发布于:2024-11-27 ⋅ 阅读:(118) ⋅ 点赞:(0)

POSIX信号量

  • 互斥:公共资源,整体使用
  • 信号量:公共资源,单独使用
  • 信号量本身也是临界资源,也需要被保护
  • 信号量:访问资源–>申请信号量–>这是一个资源预定机制
  • 互斥锁是一个二元信号量,因为互斥锁只有加锁(P操作)和解锁(V操作)两个状态。
//信号量的大致结构
struct sem
{
	int count;//计数器
	mutex_t mutex;//锁
	cond_t cond;//条件变量
}

信号量的使用

初始化

在这里插入图片描述

  • sem:信号量
  • pshared:决定是进程共享(非0)还是线程共享(0)
  • value:计数器的初始值

销毁

在这里插入图片描述

P操作

在这里插入图片描述

  • sem_wait:阻塞等待
  • sem_trywait:非阻塞等待
  • sem_timewait:等待一定的时间

V操作

在这里插入图片描述

封装信号量

因为比较简单这里就不多解释了。

//跟封装锁,条件变量差不多
#pragma once
#include <iostream>
#include <semaphore.h>

namespace SemModule
{
    class Sem
    {
    public:
        Sem(int defaultval = 1):_val(defaultval)
        {
            sem_init(&_sem,0,_val);
        }
        ~Sem()
        {
            sem_destroy(&_sem);
        }
        void P()
        {
            int n = sem_wait(&_sem);
            (void)n;//这里不处理调用错误的情况了
        }
        void V()
        {
            sem_post(&_sem);
        }
    private:
        sem_t _sem;
        int _val;
    };
}

基于环形队列的生产消费模型

环形队列

在数据结构中就学过了,这里只讲一些重点
在这里插入图片描述

  • 本质上就是一个数组,如果环形队列的容量是N,那么就需要开辟N+1的空间,也就是多开辟一个空间,因为需要区分空和满
  • :head和tail指向同一个位置
  • :
head++;
head%=N;

如果此时的head和tail指向同一个位置,那么就是满

  • 如何实现循环的呢?
    %=N

实现单生产单消费模型

分析实现的思路

  • 任何人访问临界资源前,必须现申请信号量–>信号量描述了资源的数目
  • 资源:
  1. 数据
  2. 空间
    生产者:关注的是剩余空间
    消费者:关注的是剩余数据
//生产者
//初始化的tail
int tail = 0;
//
P(空间);
ringqueue[tail] = data;
tail++;
tial%=N;
V(数据);
//消费者
//初始化的head
int head = 0;
//
P(数据);
int data  =  ringqueue[head];
head ++;
head %= N;
V(空间);

看看上述的代码是否合适。

  • 极端情况
    生产者和消费者在同一位置
  1. :因为没有数据,消费者阻塞在P(数据)处,等待生产者生产数据。
  2. :因为没有空间,生产者会阻塞在P(空间)处,等待消费者消费数据。
    所以,实现不需要像循环队列一样多开辟一个空间。
  • 其他情况
    生产者和消费者不在同一位置
    那么生产者和消费者,说明既有空间又有数据,两者可以并行

总结

  1. 两者就像在同一个操场上跑步差不多,但无法套圈
  2. 生产者无法超过消费者
  3. 同一位置是互斥的
  4. 不同位置是并发的

代码实现单生产者单消费模型

#pragma once
#include <iostream>
#include <vector>
#include <unistd.h>
#include "Mutex.hpp"//自己封装的锁
#include "Sem.hpp"//自己封装的信号量
namespace RingQueueModule
{
    using namespace MutexModule;
    using namespace SemModule;
    int default_cp = 5;
    template<class T>
    class RingQueue
    {
    public:
        RingQueue(int cp = default_cp)
        :_cp(cp),
         _rq(cp),
         _space(cp),
         _data(0),
         _p_pos(0),
         _c_pos(0)
        {

        }
        ~RingQueue()
        {

        }
        void EQueue(T data)
        {
            //生产者
            _space.P();
            _rq[_c_pos] = data;
            _c_pos++;
            _c_pos%=_cp;
            _data.V();
        }
        void Pop(T* data)
        {
            //消费者
            _data.P();
            *data = _rq[_p_pos];
            _p_pos++;
            _p_pos%=_cp;
            _space.V();
        }
    private:
        int _cp;//容量
        std::vector<T> _rq;//环形队列
        Sem _space;//空间信号量
        Sem _data;//数据信号量
        int _p_pos;//生产者位置
        int _c_pos;//消费者位置
    };
}

测试

生产者生产的慢,生产者生产一个消费者就消费一个

#include "RingQueue.hpp"
using namespace RingQueueModule;
int data = 0;
void* Consumer(void* arg)
{
    RingQueue<int>* rq = (RingQueue<int>*)arg;
    while(true)
    {
        int data;
        rq->Pop(&data);
        std::cout<<"消费一个数据:"<<data<<std::endl;
    }
}
void* Productor(void* arg)
{
    RingQueue<int>* rq  = (RingQueue<int>*)arg;
    while(true)
    {
        sleep(1);
        rq->EQueue(data);
        std::cout<<"生产一个数据:"<<data<<std::endl;
        data++;

    }
}
int main()
{
    RingQueue<int>* rq = new RingQueue<int>(5);
    pthread_t tid1;
    pthread_create(&tid1, nullptr, Consumer, (void*)rq);
    pthread_t tid2;
    pthread_create(&tid2, nullptr, Productor, (void *)rq);
    while(true);
    return 0;
}
  • 结果
    与预期一致
    在这里插入图片描述

重新理解下信号量,互斥锁

//RingQueue
        void EQueue(T data)
        {
            //生产者
            _space.P();
            _rq[_c_pos] = data;
            _c_pos++;
            _c_pos%=_cp;
            _data.V();
        }
//BloackQueue
        void EnterQueue(T data)
        {
            // 访问临界资源
            _mutex.Lock();
            // 如果数据满了,那么等待消费者消费
            //伪唤醒
            while (IsFull())
            {
                p_wait_num++;
                p_cond.Wait(_mutex);
                //pthread_cond_wait(&p_cond, &_mutex);
                p_wait_num--;
            }
            _bq.push(data);
            //std::cout << "生产数据:" << data << std::endl;

            // 至少有数据被生产了,唤醒消费者
            if (c_wait_num)
            {
                //pthread_cond_signal(&c_cond);
                c_cond.Notify();
            }
            _mutex.Unlock();
        }
  • 为什么信号量不用判断?
    信号量本身就表示资源的数目,只要成功,就一定有,所以不需要判断。
    互斥锁–>整体申请,局部使用资源–>不知道资源的使用情况–>所以需要判断

基于单生产单消费实现多生产多消费

  • 需要加入的部分是:
  1. 生产者与生产者之间的关系: 互斥
  2. 消费者与消费者之间的关系: 互斥
  • 那么需要几把锁呢?
    两把,生产者与生产者一把,消费者与消费者一把

  • 加锁的位置问题?

        void EQueue(T data)
        {
            // 生产者
           // _mutex.Lock();
            _space.P();
            _mutex.Lock();

            _rq[_c_pos] = data;
            _c_pos++;
            _c_pos %= _cp;

            //_mutex.Unlock();
            _data.V();
            _mutex.Unlock();
        }

加锁是在P操作之前还是在P操作之后,解锁是在V操作之前还是V操作之后?
答案:之后,因为信号量后锁,这样信号量和锁是并行的,就像去电影看电影的买票(信号量)和排队(锁)一样,如果先锁后信号量,那么锁和信号量就是串行的,效率不高,所以先信号量后锁

代码

#pragma once
#include <iostream>
#include <vector>
#include <unistd.h>
#include "Mutex.hpp"
#include "Sem.hpp"
namespace RingQueueModule
{
    using namespace MutexModule;
    using namespace SemModule;
    int default_cp = 5;
    template <class T>
    class RingQueue
    {
    public:
        RingQueue(int cp = default_cp)
            : _cp(cp),
              _rq(cp),
              _space(cp),
              _data(0),
              _p_pos(0),
              _c_pos(0)
        {
        }
        ~RingQueue()
        {
        }
        void EQueue(T data)
        {
            // 生产者
            _space.P();

            {
                LockGuard lockguard(_p_mutex);
                _rq[_c_pos] = data;
                _c_pos++;
                _c_pos %= _cp;
            }

            _data.V();
        }
        void Pop(T *data)
        {
            // 消费者
            _data.P();
            {
                LockGuard lockguard(_c_mutex);
                *data = _rq[_p_pos];
                _p_pos++;
                _p_pos %= _cp;
            }
            _space.V();
        }

    private:
        int _cp;            // 容量
        std::vector<T> _rq; // 环形队列
        Sem _space;         // 空间信号量
        Sem _data;          // 数据信号量
        int _p_pos;         // 生产者位置
        int _c_pos;         // 消费者位置
        Mutex _c_mutex;     // 消费者之间的锁
        Mutex _p_mutex;     // 生产者之间的锁
    };
}

测试

int main()
{
    RingQueue<int>* rq = new RingQueue<int>(5);
    pthread_t tid1;
    pthread_create(&tid1, nullptr, Consumer, (void*)rq);
    pthread_t tid3;
    pthread_create(&tid3, nullptr, Consumer, (void*)rq);
    pthread_t tid2;
    pthread_create(&tid2, nullptr, Productor, (void *)rq);
    pthread_t tid4;
    pthread_create(&tid4, nullptr, Productor, (void*)rq);
    while(true);
    return 0;
}
  • 结果
    在这里插入图片描述