生产消费者模型

发布于:2022-11-28 ⋅ 阅读:(254) ⋅ 点赞:(0)

目录

一丶生产者消费者模型

1.生产者消费者模型的概念

2. 生产者消费者模型的特点

二丶生产消费者模型应用

1.生产消费者模型基于阻塞队列的实现

2.实现


一丶生产者消费者模型

1.生产者消费者模型的概念

        在现实生活中,当我们缺少某些生活用品时,就会到超市去购买。当你到超市时,你的身份就是消费者,那么这些商品又是哪里来的呢,自然是供应商,那么它们就是生产者,而超市在生产者与消费者之间,就充当了一个交易场所。正是这样的方式才使得人类的交易变得高效,生产者只需要向超市供应商品,消费者只需要去超市购买商品;

        生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

 

2. 生产者消费者模型的特点

我们将其总结为321原则:3种关系、2个角色和1个场所

3种关系:

  1. 生产者VS生产者 --- 两者是互斥关系
  2. 消费者VS消费者 --- 两者是互斥关系
  3. 生产者VS消费者 --- 两者是同步+互斥关系

2个角色:生产者和消费者

1个场所:通常指的是内存中的一个缓冲区,用于数据交互
 

二丶生产消费者模型应用

1.生产消费者模型基于阻塞队列的实现

         使用队列实现生产消费者模型,队列就可以被认定为交易场所,线程1,2充当生产者,消费者,生产者生产数据消费者消费数据。之所以叫做阻塞队列,是这个队列被设置有容量大小,生产者生产到一定数量就停止生产让消费者去消费,反之消费者不能无脑消费,队列中数据为空停止消费提醒生产者去生产~

2.实现

        .hpp

//ConsumerProduct.hpp
#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;
namespace qzh
{
    const int max=5;
    template<class T>
    class BlockQueue
    {
    private:
        int _cap;
        queue<T> _bq;//临界资源
        pthread_mutex_t mtx;//互斥量
        pthread_cond_t _isfull;//条件变量,是否满满了就拿数据
        pthread_cond_t _isempty;//条件变量,是否空空了就放数据
        

    public:
    BlockQueue(int cap=max)
    :_cap(cap)
    {
        pthread_mutex_init(&mtx,nullptr);//锁的生成初始化、
        pthread_cond_init(&_isfull,nullptr);//条件变量初始化
        pthread_cond_init(&_isempty,nullptr);//条件变量初始化
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&mtx);
        pthread_cond_destroy(&_isfull);
        pthread_cond_destroy(&_isempty);
    }

    public:
        bool Isfull()//队列是否满,
        {
            return  _bq.size()==_cap;
        }
        bool Isempty()//队列是否空
        {
            return _bq.size()==0;
        }
        void LockQueue()//对互斥锁进行加锁
        {
            pthread_mutex_lock(&mtx);
        }
        void UnlockQueue()//解锁
        {
            pthread_mutex_unlock(&mtx);
        }
        void productWait()//
        {
            pthread_cond_wait(&_isempty,&mtx);//生产者在_isempty()条件下等待。等待被消费者唤醒
        }
        void consumerwait()
        {
            pthread_cond_wait(&_isfull,&mtx);//消费者在此等待,被生产者唤醒
        }
        void Wakeupcon()
        {
            pthread_cond_signal(&_isfull);//生产者在条件_isfull()满足情况下唤醒消费者
        }
        void Wakeuppro()
        {
            pthread_cond_signal(&_isempty);//消费者在条件_isempty()满足情况下唤醒生产者
        }

    public:
        void Push(const T&in)//向队列生生产数据
        {
            LockQueue();//先对临界资源进行加锁
            while(Isfull())//判断是否满的,满的情况下就进行挂起等待~
            {
                //挂起等待时候需要先将互斥锁释放,不然会导致死锁问题。就是抱着锁被挂起了~
                //这个函数调用自动释放互斥锁
                productWait();
            }
            if(_bq.size()<_cap/2)
            {
                Wakeupcon();
            }
            //不满了可以放数据
            _bq.push(in);

            //放完数据释放锁
            UnlockQueue();
        }

        void Pop(T*ou)//队列拿数据
        {
            LockQueue();

            while(Isempty())//此处容易出现bug,如果是if的话,这个条件只判断一次,有可能会被虚拟唤醒
            {
                consumerwait();
                //挂起的同时就会将互斥量释放掉了,函数
            }
            *ou=_bq.front();
            _bq.pop();

            if(_bq.size()<_cap/2)
            {
                Wakeuppro();
            }
            UnlockQueue();           
        }
    };

}

 .c

//Conpro.cc
#include"ConsumerProduct.hpp"
#include"task.hpp"
#include<time.h>
#include<cstdlib>
#include<unistd.h>
using namespace qzh;
using namespace qzh1;
void*consumer(void*args)//消费者进行消费
{
    BlockQueue<Task>*c=(BlockQueue<Task>*)args;
    while(1)
    {

        Task t;
        c->Pop(&t);
        //消费者拿到数据就要进行处理.t()为处理方法~
        t();
        //sleep(2);
        // sleep(2);
        // int data=0;
        // c->Pop(&data);
        // cout<<"消费者消费数据: "<<data<<endl;
    }
}


void *product(void*args)//生产者进行生产
{
    BlockQueue<Task>*p=(BlockQueue<Task>*)args;
    string ops="+-*/%";
    while(1)
    {   //制造数据,
        int x=rand()%20+1;
        int y=rand()%20+1;
        char op=ops[rand()%5];
        sleep(2);
        Task t(x,y,op);
        cout<<"生产者发送了一个任务: "<<x<<op<<y<<"=?"<<endl;
        p->Push(t);
        // int data=rand()%20+1;
        // cout<<"生产者生产数据: "<<data<<endl;
        // p->Push(data);
    }
}


int main()
{
    srand((long long)time(nullptr));
    BlockQueue<Task>*bq=new BlockQueue<Task>();
    pthread_t c,p;
    pthread_t c1,c2,c3,c4;
    
    pthread_create(&c,nullptr,consumer,(void*)bq);
    pthread_create(&c1,nullptr,consumer,(void*)bq);
    pthread_create(&c2,nullptr,consumer,(void*)bq);
    pthread_create(&c3,nullptr,consumer,(void*)bq);
    pthread_create(&c4,nullptr,consumer,(void*)bq);
    pthread_create(&p,nullptr,product,(void*)bq);

    pthread_join(c,nullptr);
    pthread_join(c1,nullptr);
    pthread_join(c2,nullptr);
    pthread_join(c3,nullptr);
    pthread_join(c4,nullptr);
    pthread_join(p,nullptr);


    return 0;
}

Task.h

//任务的实现
#pragma once
#include<iostream>
#include<pthread.h>

namespace qzh1
{
    class Task
    {
    private:
        int _x,_y;
        char _op;
    public:
    Task()
    {}
    Task(int x,int y,char op)
    :_x(x)
    ,_y(y)
    ,_op(op)
    {}
    ~Task()
    {}
    int run()
    {
        int res = 0;
        switch (_op)
        {
        case '+':
            res =_x + _y;
            break;
        case '-':
            res =_x - _y;
            break;
        case '*':
            res =_x * _y;
            break;
        case '/':
           res =_x / _y;
            break;
        case '%':
           res =_x % _y;
            break;
        default:
            std::cout << "bug??" << std::endl;
            break;
        }
        std::cout << "当前任务正在被: " << pthread_self() << " 处理: "
                  << _x << _op << _y << "=" << res << std::endl;
        return res;
    }
    int operator()()
    {
        return run();
    }
    };
}

本文含有隐藏内容,请 开通VIP 后查看