这里写自定义目录标题
POSIX信号量
- 互斥:公共资源,整体使用
- 信号量:公共资源,单独使用
- 信号量本身也是临界资源,也需要被保护
- 信号量:访问资源–>申请信号量–>这是一个资源预定机制
- 互斥锁是一个二元信号量,因为互斥锁只有加锁(P操作)和解锁(V操作)两个状态。
//信号量的大致结构
struct sem
{
int count;//计数器
mutex_t mutex;//锁
cond_t cond;//条件变量
}
信号量的使用
初始化
- sem:信号量
- pshared:决定是进程共享(非0)还是线程共享(0)
- value:计数器的初始值
销毁
P操作
- sem_wait:阻塞等待
- sem_trywait:非阻塞等待
- sem_timewait:等待一定的时间
V操作
封装信号量
因为比较简单这里就不多解释了。
//跟封装锁,条件变量差不多
#pragma once
#include <iostream>
#include <semaphore.h>
namespace SemModule
{
class Sem
{
public:
Sem(int defaultval = 1):_val(defaultval)
{
sem_init(&_sem,0,_val);
}
~Sem()
{
sem_destroy(&_sem);
}
void P()
{
int n = sem_wait(&_sem);
(void)n;//这里不处理调用错误的情况了
}
void V()
{
sem_post(&_sem);
}
private:
sem_t _sem;
int _val;
};
}
基于环形队列的生产消费模型
环形队列
在数据结构中就学过了,这里只讲一些重点
- 本质上就是一个数组,如果环形队列的容量是N,那么就需要开辟N+1的空间,也就是多开辟一个空间,因为需要区分空和满。
- 空:head和tail指向同一个位置
- 满:
head++;
head%=N;
如果此时的head和tail指向同一个位置,那么就是满
- 如何实现循环的呢?
%=N
实现单生产单消费模型
分析实现的思路
- 任何人访问临界资源前,必须现申请信号量–>信号量描述了资源的数目
- 资源:
- 数据
- 空间
生产者:关注的是剩余空间
消费者:关注的是剩余数据
//生产者
//初始化的tail
int tail = 0;
//
P(空间);
ringqueue[tail] = data;
tail++;
tial%=N;
V(数据);
//消费者
//初始化的head
int head = 0;
//
P(数据);
int data = ringqueue[head];
head ++;
head %= N;
V(空间);
看看上述的代码是否合适。
- 极端情况
生产者和消费者在同一位置
- 空:因为没有数据,消费者阻塞在P(数据)处,等待生产者生产数据。
- 满:因为没有空间,生产者会阻塞在P(空间)处,等待消费者消费数据。
所以,实现不需要像循环队列一样多开辟一个空间。
- 其他情况
生产者和消费者不在同一位置
那么生产者和消费者,说明既有空间又有数据,两者可以并行。
总结
- 两者就像在同一个操场上跑步差不多,但无法套圈
- 生产者无法超过消费者
- 同一位置是互斥的
- 不同位置是并发的
代码实现单生产者单消费模型
#pragma once
#include <iostream>
#include <vector>
#include <unistd.h>
#include "Mutex.hpp"//自己封装的锁
#include "Sem.hpp"//自己封装的信号量
namespace RingQueueModule
{
using namespace MutexModule;
using namespace SemModule;
int default_cp = 5;
template<class T>
class RingQueue
{
public:
RingQueue(int cp = default_cp)
:_cp(cp),
_rq(cp),
_space(cp),
_data(0),
_p_pos(0),
_c_pos(0)
{
}
~RingQueue()
{
}
void EQueue(T data)
{
//生产者
_space.P();
_rq[_c_pos] = data;
_c_pos++;
_c_pos%=_cp;
_data.V();
}
void Pop(T* data)
{
//消费者
_data.P();
*data = _rq[_p_pos];
_p_pos++;
_p_pos%=_cp;
_space.V();
}
private:
int _cp;//容量
std::vector<T> _rq;//环形队列
Sem _space;//空间信号量
Sem _data;//数据信号量
int _p_pos;//生产者位置
int _c_pos;//消费者位置
};
}
测试
生产者生产的慢,生产者生产一个消费者就消费一个
#include "RingQueue.hpp"
using namespace RingQueueModule;
int data = 0;
void* Consumer(void* arg)
{
RingQueue<int>* rq = (RingQueue<int>*)arg;
while(true)
{
int data;
rq->Pop(&data);
std::cout<<"消费一个数据:"<<data<<std::endl;
}
}
void* Productor(void* arg)
{
RingQueue<int>* rq = (RingQueue<int>*)arg;
while(true)
{
sleep(1);
rq->EQueue(data);
std::cout<<"生产一个数据:"<<data<<std::endl;
data++;
}
}
int main()
{
RingQueue<int>* rq = new RingQueue<int>(5);
pthread_t tid1;
pthread_create(&tid1, nullptr, Consumer, (void*)rq);
pthread_t tid2;
pthread_create(&tid2, nullptr, Productor, (void *)rq);
while(true);
return 0;
}
- 结果
与预期一致
重新理解下信号量,互斥锁
//RingQueue
void EQueue(T data)
{
//生产者
_space.P();
_rq[_c_pos] = data;
_c_pos++;
_c_pos%=_cp;
_data.V();
}
//BloackQueue
void EnterQueue(T data)
{
// 访问临界资源
_mutex.Lock();
// 如果数据满了,那么等待消费者消费
//伪唤醒
while (IsFull())
{
p_wait_num++;
p_cond.Wait(_mutex);
//pthread_cond_wait(&p_cond, &_mutex);
p_wait_num--;
}
_bq.push(data);
//std::cout << "生产数据:" << data << std::endl;
// 至少有数据被生产了,唤醒消费者
if (c_wait_num)
{
//pthread_cond_signal(&c_cond);
c_cond.Notify();
}
_mutex.Unlock();
}
- 为什么信号量不用判断?
信号量本身就表示资源的数目,只要成功,就一定有,所以不需要判断。
互斥锁–>整体申请,局部使用资源–>不知道资源的使用情况–>所以需要判断
基于单生产单消费实现多生产多消费
- 需要加入的部分是:
- 生产者与生产者之间的关系: 互斥
- 消费者与消费者之间的关系: 互斥
那么需要几把锁呢?
两把,生产者与生产者一把,消费者与消费者一把加锁的位置问题?
void EQueue(T data)
{
// 生产者
// _mutex.Lock();
_space.P();
_mutex.Lock();
_rq[_c_pos] = data;
_c_pos++;
_c_pos %= _cp;
//_mutex.Unlock();
_data.V();
_mutex.Unlock();
}
加锁是在P操作之前还是在P操作之后,解锁是在V操作之前还是V操作之后?
答案:之后,因为信号量后锁,这样信号量和锁是并行的,就像去电影看电影的买票(信号量)和排队(锁)一样,如果先锁后信号量,那么锁和信号量就是串行的,效率不高,所以先信号量后锁。
代码
#pragma once
#include <iostream>
#include <vector>
#include <unistd.h>
#include "Mutex.hpp"
#include "Sem.hpp"
namespace RingQueueModule
{
using namespace MutexModule;
using namespace SemModule;
int default_cp = 5;
template <class T>
class RingQueue
{
public:
RingQueue(int cp = default_cp)
: _cp(cp),
_rq(cp),
_space(cp),
_data(0),
_p_pos(0),
_c_pos(0)
{
}
~RingQueue()
{
}
void EQueue(T data)
{
// 生产者
_space.P();
{
LockGuard lockguard(_p_mutex);
_rq[_c_pos] = data;
_c_pos++;
_c_pos %= _cp;
}
_data.V();
}
void Pop(T *data)
{
// 消费者
_data.P();
{
LockGuard lockguard(_c_mutex);
*data = _rq[_p_pos];
_p_pos++;
_p_pos %= _cp;
}
_space.V();
}
private:
int _cp; // 容量
std::vector<T> _rq; // 环形队列
Sem _space; // 空间信号量
Sem _data; // 数据信号量
int _p_pos; // 生产者位置
int _c_pos; // 消费者位置
Mutex _c_mutex; // 消费者之间的锁
Mutex _p_mutex; // 生产者之间的锁
};
}
测试
int main()
{
RingQueue<int>* rq = new RingQueue<int>(5);
pthread_t tid1;
pthread_create(&tid1, nullptr, Consumer, (void*)rq);
pthread_t tid3;
pthread_create(&tid3, nullptr, Consumer, (void*)rq);
pthread_t tid2;
pthread_create(&tid2, nullptr, Productor, (void *)rq);
pthread_t tid4;
pthread_create(&tid4, nullptr, Productor, (void*)rq);
while(true);
return 0;
}
- 结果