linux下的posix信号量

发布于:2024-12-18 ⋅ 阅读:(78) ⋅ 点赞:(0)

目录

引言

信号量背景知识

PV操作

信号量接口

基于环形队列的PC模型

代码实现

demo模型 

具体实现


引言

在多线程编程领域,同步机制是确保数据一致性和避免竞态条件的关键技术。Linux操作系统作为开源软件的杰出代表,提供了多种同步原语,其中POSIX信号量以其跨平台的特性和强大的功能,成为了开发者实现线程同步的首选工具。POSIX信号量不仅能够有效地协调多个线程对共享资源的访问,还能在复杂的并发场景中保证线程间的正确协作。通过深入了解Linux下的POSIX信号量,我们能够更好地掌握多线程编程的核心技巧,为构建高效、稳定的并发系统奠定坚实的基础。下面,让我们一同探索POSIX信号量在Linux环境中的应用与实践。

POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX可以用于线程间同步。

信号量背景知识

看这张图,假如我们去电影院看电影,那么一定是需要先买票。买票之后,就一定存在一个属于我们的电影院观影位置,我们把这种现象称为:有限资源的预订机制。

信号量就是基于这种条件下出现的,想要对有限资源进行访问,首先得预订有限资源,这样就可以做到:这个资源只属于你。

再生产消费模型中,按道理而言,阻塞队列是不允许并发访问的,但是信号量的出现让这一条件变得可能。

对一个全局数组分成三份,每份100,允许三个线程并发访问。那如果我们把每个数组下标都认为是一个独立的资源,让信号量去管理,那么就允许多个线程并发访问。

信号量:

描述临界资源的资源数目

买票就是对电影院资源的预订机制,我们先买票再去看电影

申请到信号量的线程,就一定有一个资源让你可以去访问。

之前的cp模型中,当申请到锁之后,才判断资源有没有就绪

而信号量的存在,可以保证,只要申请到信号量,就能保证资源对于当前的线程一定就绪,只需要去执行就可以。

申请信号量的时候,已经在判断资源是否就绪了。

PV操作

在操作系统中,信号量(Semaphore)是一种用于控制多个进程访问共享资源的机制。PV操作是信号量操作的一对原语,其中:

P操作(Proberen,测试):这个操作用于减少信号量的值(通常是减1)。如果信号量的值小于等于0,则进程被阻塞,否则进程可以继续执行。P操作可以理解为“请求一个资源”。

V操作(Verhogen,增加):这个操作用于增加信号量的值(通常是加1)。如果有进程因为P操作而被阻塞,它们可能会因为V操作而解除阻塞。V操作可以理解为“释放一个资源”。

PV---申请与释放

信号量接口

1.初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0 表示线程间共享,非零表示进程间共享
value :信号量初始值

2.销毁信号量

int sem_destroy(sem_t *sem)
3.等待信号量
功能:等待信号量,会将信号量的值减 1
int sem_wait(sem_t *sem); //P()
4.发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加 1
int sem_post(sem_t *sem);//V()

基于环形队列的PC模型

之前我们实现过基于阻塞队列的PC模型,本次借助信号量,实现环形队列的PC模型。

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
判断满了:
1.预留空位置,tail永远在空位置处,这样head->next == tail就代表着满载
2.引入计数器++,cnt  == capa 就是满载。
环形队列的生产消费
只要让生产消费不访问同一个格子,那么就可以进行生产消费同时进行(并发访问)。

假设我是生产,你是消费

在这场PC的追逐中,一定是P先开始,C追逐P,且不能追上P,P也不能套圈C(上限就是C)。

指向同一个位置的时候,不能同时访问,在这样的“追逐游戏中”,只有空或者满才会指向一个位置,也就意味着:

对于一个临界资源,我们内部可以进行小块的划分。当访问不同的位置时,允许同时访问这个数组。

因此PC

我们可以定义两个信号量,分别表示空间量和数据量

初始状态下

信号量也可以实现对资源的控制,并且信号量操作是原子的!

控制:

P申请信号量本质就是:查看资源是否就绪(查看资源的属性),不就绪的话,就等待资源。

代码实现

demo模型 

#include <pthread.h>
#include <queue>
#include <semaphore.h>

using namespace std;

template <class T>
class RingQueue 
{
    static const int defaultcap = 5;
public:
    RingQueue(int cap = defaultcap)
    : _cap(cap)
    , _Cidx(0)
    , _Pidx(0)
    {
        sem_init(&_Csem, 0, 0);     //pshared:0表示线程间共享,非零表示进程间共享 value:信号量初始值
        sem_init(&_Psem, 0, _cap);
    }

    ~RingQueue()
    {
        sem_destroy(&_Csem);
        sem_destroy(&_Psem);
    }

    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }

    void V(sem_t &sem)
    {
        sem_post(&sem);
    }


private:
    int _cap;
    vector<T> _ringqueue;

    //生产消费的下标
    int _Cidx;
    int _Pidx;

    //信号量
    sem_t _Csem;    //消费者可以消费的数量
    sem_t _Psem;
};

信号量无法进行多生产与多消费者之间的互斥

原因:

数组的下标是唯一的,每次生产或消费都只能访问一个具体的下标,当进行多生产的时候,需要保证有不同的下标去访问不同的格子。所以不能进行多个生产或多个消费,当后续需要多生产与多消费的时候,依然需要加锁。

即:单个的生产与消费可以通过信号量去控制,但是多个生产、消费就可能出现并发访问,因此为了防止P内部,C内部的并发,需要对P和C单独加锁。

具体实现

#include <pthread.h>
#include <queue>
#include <semaphore.h>

using namespace std;

template <class T>
class RingQueue 
{
private:
    static const int defaultcap = 5;
    
    void P(sem_t &sem)  //--
    {
        sem_wait(&sem);    //申请信号量,当信号量不足的时候,会阻塞
    }

    void V(sem_t &sem)  //++
    {
        sem_post(&sem);
    }

    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }

    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }

public:
    RingQueue(int cap = defaultcap)
    : _cap(cap)
    , _Cidx(0)
    , _Pidx(0)
    ,_Pmutex(PTHREAD_MUTEX_INITIALIZER)
    ,_Cmutex(PTHREAD_MUTEX_INITIALIZER)
    {
        sem_init(&_Csem, 0, 0);     //pshared:0表示线程间共享,非零表示进程间共享 value:信号量初始值
        sem_init(&_Psem, 0, _cap);
    }

    void Push(const T& val)
    {
        P(_Psem);       //生产者信号量--
        Lock(_Pmutex);   //生产者互斥锁
       
        //生产者生产数据
        _ringqueue[_Pidx] = val;

        // 位置后移,维持环形特性
        _Pidx++;   //生产者下标++
        _Pidx %= _cap;

        Unlock(_Pmutex);  
        V(_Csem);    //全部完成之后,消费者信号量++
    }

    T Pop(T* out)   //输出型参数
    {
        P(_Csem);    
        Lock(_Cmutex);

        //消费者消费数据
        *out = _ringqueue[_Cidx];

        // 位置后移,维持环形特性
        _Cidx++;
        _Cidx %= _cap;

        Unlock(_Cmutex);
        V(_Psem);
    }


    ~RingQueue()
    {
        sem_destroy(&_Csem);
        sem_destroy(&_Psem);
    }
    
private:
    int _cap;
    vector<T> _ringqueue;

    //生产消费的下标
    int _Cidx;
    int _Pidx;

    //信号量
    sem_t _Csem;    //消费者信号量(虽然是计数器,但是是一个自定义类型)
    sem_t _Psem;
    pthread_mutex_t _Pmutex;    //生产者互斥锁
    pthread_mutex_t _Cmutex;    //消费者互斥锁
};

注意:1.PC用的是同一个队列,P++,C就得--。

           2.信号量本身就有wait,不需要条件变量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作(wait),达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

主函数

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "Task.hpp"

using namespace std;
struct ThreadData
{
    RingQueue<Task> *rq;
    std::string threadname;
};

void *Productor(void *args)
{
    // sleep(3);
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;
    int len = opers.size();
    while (true)
    {
        // 1. 获取数据
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);

        // 2. 生产数据
        rq->Push(t);
        cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;

        sleep(1);
    }
    return nullptr;
}

void *Consumer(void *args)
{
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;

    while (true)
    {
        // 1. 消费数据(只消费,消费完之后外部处理)
        Task t;
        rq->Pop(&t);
       
        // 2. 处理数据
        t();
        cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
        // sleep(1);

    }
    return nullptr;
}


int main()
{
    
    srand(time(nullptr) ^ getpid());
    RingQueue<Task> *rq = new RingQueue<Task>(50);  //创建环形队列

    pthread_t c[5], p[3];

    for (int i = 0; i < 1; i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Productor-" + std::to_string(i);

        pthread_create(p + i, nullptr, Productor, td);
    }
    for (int i = 0; i < 1; i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Consumer-" + std::to_string(i);

        pthread_create(c + i, nullptr, Consumer, td);
    }

    for (int i = 0; i < 1; i++)
    {
        pthread_join(p[i], nullptr);
    }
    for (int i = 0; i < 1; i++)
    {
        pthread_join(c[i], nullptr);
    }

    return 0;
}

那我们应该申请信号量之前加锁还是申请信号量之后加锁呢?

1.应该申请信号量之后加锁,只有信号量存在,才能保证临界资源准备就绪

2.信号量的申请是原子的

3.一旦一个线程申请了锁释放之后,那么也不能立即去申请下一个锁,因为得去现申请信号量(同步),这样别的线程就可以去申请锁。

所以:先买票订座位,再排队入影院(先买票一定是在最前面的!先预定资源)