Linux线程同步对象:互斥体、信号量、条件变量、读写锁

发布于:2023-01-22 ⋅ 阅读:(9) ⋅ 点赞:(0) ⋅ 评论:(0)

1. Linux互斥体

Linux互斥体与Windows的临界区对象用法很相似,一般也是通过限制多个线程同时执行某段代码来保护资源的。使用数据结构pthread_mutex_t表示一个互斥体对象(pthread.h中)。

初始化方式

  1. 使用 PTHREAD_MUTEX_INITIALIZER直接给互斥体变量赋值,示例
#include <pthread.h>
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
  1. 若互斥体是动态分配的或者需要设置相关属性,需要使用pthread_mutex_init函数初始化:
int pthread_mutex_init(pthread_mutex_t* restrict mutex,
					const pthread_mutexattr_t* restrict attr);

参数mutex是需要初始化的mutex对象的指针
attr是需要设置的互斥体属性,通常设置为NULL。
如果执行成功,返回0;失败返回1个错误吗。

使用示例:

#include <pthread.h>

pthread_mutex_t mtx;
pthread_mutex_init(&mtx, NULL);

销毁方式

当不再需要互斥体对象时,使用pthread_mutex_destroy函数销毁它:

int pthread_mutex_destroy(pthread_mutex_t* mutex);

执行成功返回0,失败返回错误码。

注意点:

  1. 无需销毁使用PTHREAD_MUTEX_INITIALIZER初始化的互斥体。
  2. 不要销毁一个已经加锁或正在被条件变量使用的互斥体对象,当互斥体处于已加锁状态或正在和条件变量配合使用时,调用销毁函数会返回 EBUSY错误。

加解锁操作

一般使用三个函数:

int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_trylock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);

他们都是执行成功返回0,失败则返回错误码。具体错误码随着互斥体对象属性类型的不同而不同。

在设置互斥体对象属性时需要创建一个pthread_mutexattr_t类型的对象:

int pthread_mutexattr_init(pthread_mutexattr_t* attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t* attr);

使用pthread_mutexattr_settype/gettype设置或获取想要的属性类型。

int pthread_mutexattr_settype(pthread_mutexattr_t* attr, int type);
int pthread_mutexattr_gettype(const pthread_mutexattr* restrict attr, 
						int* restrict type); 

设置属性

属性类型一般有以下这些值。

  1. PTHREAD_MUTEX_NORMAL(普通锁)

一个线程如果对一个已经加锁的普通锁再次加锁,那么程序会阻塞在第二次调用pthread_mutex_lock代码处。测试:

#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>

int main(void)
{
    pthread_mutex_t mymutex;
    pthread_mutexattr_t mutex_attr;
    pthread_mutexattr_init(&mutex_attr);
    pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_NORMAL);
    pthread_mutex_init(&mymutex, &mutex_attr);

    int ret = pthread_mutex_lock(&mymutex);
    printf("ret = %d\n", ret);

    // 再次加锁
    ret = pthread_mutex_lock(&mymutex);
    printf("ret = %d\n", ret);

    pthread_mutex_destroy(&mymutex);
    pthread_mutexattr_destroy(&mutex_attr);

    return 0;
}

运行结果只打印了一行,原因是阻塞在第二个lock处。

这种情况下,pthread_mutex_trylock()如果拿不到锁,则也不会阻塞,而是会立即返回EBUSY错误码。

在这里插入图片描述

在这里插入图片描述

  1. PTHRAED_MUTEX_ERRORCHECK(检错锁)

如果是这种属性的锁,对于已经加锁的互斥体对象再次加锁,则pthread_mutex_lock会返回EDEADLK。

示例:

#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>

int main(void)
{
    pthread_mutex_t mymutex;
    pthread_mutexattr_t mutex_attr;
    pthread_mutexattr_init(&mutex_attr);
    pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ERRORCHECK);
    pthread_mutex_init(&mymutex, &mutex_attr);

    int ret = pthread_mutex_lock(&mymutex);
    printf("ret = %d\n", ret);

    // 再次加锁
    ret = pthread_mutex_lock(&mymutex);
    printf("ret = %d\n", ret);
    if (ret == EDEADLK)
    {
        printf("EDEADLK\n");
    }

    pthread_mutex_destroy(&mymutex);
    pthread_mutexattr_destroy(&mutex_attr);

    return 0;
}

在这里插入图片描述

如果是其他线程对这个互斥体再次调用lock,则会阻塞在该函数的调用处。

  1. PTHREAD_MUTEX_RECURSIVE(可重入锁)

该属性允许同一个线程对其持有的互斥体重复加锁,每成功调用lock一次,该互斥体对象的锁引用技术就会增加1,相反,unlock一次,引用技术减1。引用计数为0时,允许其他线程获取该锁,否则会阻塞在lock调用处。

2.Linux信号量

信号量代表一定的资源数量,可以根据当前资源的数量按需唤醒指定数量的资源消费者线程,资源消费者线程一旦获取信号量,就会让资源减少指定的数量,如果减少为0,则消费者线程将全部处于挂起状态;当有新的资源到来时,消费者线程将继续被唤醒。
另外,信号量有“资源有多份,可以同时被多个线程访问”的意思。

常用API函数:

#include <semaphore.h>

int sem_init(sem_t* sem, int pshared, unsigned int value);
int sem_destroy(sem_t* sem);
int sem_post(sem_t* sem);
int sem_wait(sem_t* sem);
int sem_trywait(sem_t* sem);
int sem_timedwait(sem_t* sem, const struct timespec* abs_timeout);

sem_int()第一个参数是信号量对象地址,第二个参数表示该信号量是否可以被共享,取值为0表示只能在同一个进程的多个线程共享,非0可以在多进程间共享,第三个参数表示初始状态资源数量。 函数调用成功返回0,失败返回-1。

sem_destroy()销毁信号量;
sem_post()用于将信号量的资源计数递增1,并解锁该信号量对象,这样因使用sem_wait()被阻塞的其他线程会被唤醒。

如果当前资源计数为0,sem_wait()会阻塞调用线程,直到大于0时被唤醒,唤醒后资源计数-1后返回;

sem_trywait()是非阻塞版本,如果资源计数为0,则立即返回,不阻塞调用线程,返回值是-1,错误码errno被设置成EAGAIN;

sem_timedwait()带有等待时间的版本,第二个参数是个结构体:

struct timespec
{
	time_t tv_sec;	// 秒
	long tv_nsec;	// 纳秒 范围[0~999999999]
};

信号量实现生产者消费者

#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <semaphore.h>
#include <list>
#include <iostream>

class Task
{
public:
    Task(int taskID)
    {
        this->taskID = taskID;
    }

    void doTask()
    {
        std::cout << "handle a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;
    }

private:
    int taskID;
};

pthread_mutex_t     mymutex;
std::list<Task*>    tasks;
sem_t               mysemaphore;

void* consumer_thread(void* param)
{
    Task* pTask = NULL;
    while (true)
    {
        if (sem_wait(&mysemaphore) != 0)
        {
            continue;
        }
        if (tasks.empty())
        {
            continue;
        }

        pthread_mutex_lock(&mymutex);
        pTask = tasks.front();
        tasks.pop_front();
        pthread_mutex_unlock(&mymutex);

        pTask->doTask();
        delete pTask;
    }

    return NULL;
}

void* producer_thread(void* param)
{
    int taskID = 0;
    Task* pTask = NULL;

    while (true)
    {
        pTask = new Task(taskID);

        pthread_mutex_lock(&mymutex);
        tasks.push_back(pTask);
        std::cout << "produce a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;

        pthread_mutex_unlock(&mymutex);

        // 释放信号量,通知消费者线程
        sem_post(&mysemaphore);

        taskID++;

        sleep(1);
    }

    return NULL;
}

int main(void)
{
    pthread_mutex_init(&mymutex, NULL);
    // 初始化信号量资源计数为0
    sem_init(&mysemaphore, 0, 0);

    // 创建5个消费者线程
    pthread_t consumerThreadID[5];
    for (int i = 0; i < 5; ++i)
    {
        pthread_create(&consumerThreadID[i], NULL, consumer_thread, NULL);
    }

    // 创建一个生产者线程
    pthread_t producerThreadID;
    pthread_create(&producerThreadID, NULL, producer_thread, NULL);

    pthread_join(producerThreadID, NULL);

    for (int i = 0; i < 5; ++i)
    {
        pthread_join(consumerThreadID[i], NULL);
    }

    sem_destroy(&mysemaphore);
    pthread_mutex_destroy(&mymutex);

    return 0;
}

输出结果:

在这里插入图片描述

3.条件变量

条件变量为什么要与互斥体结合使用

我们假设条件变量不与互斥体结合,看看效果:

伪代码:

// m是互斥体,cv是条件变量
pthread_mutex_lock(&m);
while (condition_is_false)
{
	pthread_mutex_unlock(&m);
	// 解锁之后,等待之前,可能条件已经满足,信号已经发出,但该信号可能被错过
	cond_wait(&cv);
	pthread_mutex_lock(&m);
}

假设线程A在执行完第五行代码后CPU时间片被剥夺,此时另一个线程B获得互斥体m,,然后发送条件信号,等线程A重新获得时间片后,由于该信号已经被错过,可能会导致线程A在第七行无限阻塞下去。

造成这一现象的根源是释放互斥体对象与条件变量等待唤醒不是原子操作。

条件变量的使用

初始化和销毁:

int pthread_cond_init(pthread_cond_t* cond, const pthread_condattr_t* attr);
int pthread_cond_destroy(pthread_cond_t* cond);

也可以这样初始化:

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

等待条件变量被唤醒:

int pthread_cond_wait(pthread_cond_t* restrict cond, pthread_mutex_t* restrict mutex);
int pthread_cond_timedwait(pthread_cond_t* restrict cond, pthread_mutex_t*
						restrict mutex, const struct timespec* restrict abstime);

如果条件变量等待的条件没有被满足,那么调用pthread_cond_wait()的线程会一直等待下去。
timedwait()是非阻塞版本。

因调用pthread_cond_wait()而等待的线程可以被以下API函数唤醒:

int pthread_cond_signal(pthread_cond_t* cond);
int pthread_cond_broadcast(pthread_cond_t* cond);

signal表示一次唤醒一个线程,而且是随机的;
broadcast唤醒所有调用pthread_cond_wait()等待的线程,相当于广播。。。

条件变量实现生产者消费者

将刚才信号量的代码稍微改一改~

#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <semaphore.h>
#include <list>
#include <iostream>

class Task
{
public:
    Task(int taskID)
    {
        this->taskID = taskID;
    }

    void doTask()
    {
        std::cout << "handle a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;
    }

private:
    int taskID;
};

pthread_mutex_t     mymutex;
std::list<Task*>    tasks;
pthread_cond_t      mycv;
//sem_t               mysemaphore;

void* consumer_thread(void* param)
{
    Task* pTask = NULL;
    while (true)
    {
        pthread_mutex_lock(&mymutex);
        while (tasks.empty())
        {
            // 如果获得了互斥锁,但是条件不合适,则pthread_cond_wait会释放锁,不往下执行
            // 发生变化后,如果条件合适,则pthread_cond_wait直接获得锁
            pthread_cond_wait(&mycv, &mymutex);
        }

        pTask = tasks.front();
        tasks.pop_front();

        pthread_mutex_unlock(&mymutex);

        if (pTask == NULL)
            continue;

        pTask->doTask();
        delete pTask;
        pTask = NULL;
    }

    return NULL;
}

void* producer_thread(void* param)
{
    int taskID = 0;
    Task* pTask = NULL;

    while (true)
    {
        pTask = new Task(taskID);

        pthread_mutex_lock(&mymutex);
        tasks.push_back(pTask);
        std::cout << "produce a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;

        pthread_mutex_unlock(&mymutex);

        // 释放信号量,通知消费者线程
        //sem_post(&mysemaphore);
        pthread_cond_signal(&mycv);

        taskID++;

        sleep(1);
    }

    return NULL;
}

int main(void)
{
    pthread_mutex_init(&mymutex, NULL);
    pthread_cond_init(&mycv, NULL);

    // 创建5个消费者线程
    pthread_t consumerThreadID[5];
    for (int i = 0; i < 5; ++i)
    {
        pthread_create(&consumerThreadID[i], NULL, consumer_thread, NULL);
    }

    // 创建一个生产者线程
    pthread_t producerThreadID;
    pthread_create(&producerThreadID, NULL, producer_thread, NULL);

    pthread_join(producerThreadID, NULL);

    for (int i = 0; i < 5; ++i)
    {
        pthread_join(consumerThreadID[i], NULL);
    }

    //sem_destroy(&mysemaphore);
    pthread_cond_destroy(&mycv);
    pthread_mutex_destroy(&mymutex);

    return 0;
}

在这里插入图片描述

条件变量最关键的一个地方,就是需要弄清楚pthread_cond_wait()在条件满足于不满足时的两种行为:

  1. pthread_cond_wait()在阻塞时,会释放器绑定的互斥体并阻塞线程。因此在调用该函数前应该对互斥体有个加锁操作。
  2. 收到条件信号时,pthread_cond_wait()会返回并对其绑定的互斥体进行加锁,因此在其下面一定由个对互斥体解锁的操作。

条件变量的虚假唤醒

再将互斥体和条件变量配合使用的代码中,有个while()语句,条件变量醒来之后再次判断条件是否满足:

while (tasks.empty())
{
	pthread_cond_wait(&mycv, &mymutex);
}

为什么不写成下面代码呢?

if (tasks.empty()) {...}

原因:
因为某次操作系统唤醒pthread_cond_wait()时tasks.empty()可能仍为true,即操作系统可能在某些情况下唤醒条件变量,也就是说存在没有其他线程向条件变量发送信号,但等待此条件变量的线程有可能醒来的情形。我们将这种行为称为 虚假唤醒(spurious wakeup)。
因此将条件放在while循环中意味着光唤醒条件变量不行,还必须满足条件,程序才能继续执行正常逻辑。

为什么会存在虚假唤醒?
一个原因是pthread_cond_wait()是futex系统调用,属于阻塞型系统调用,当系统调用被信号中断时,会返回-1,并把errno错误码置为EINTR。

4. 读写锁

读写锁的应用场景

在实际应用中,对共享变量的访问大多有个特点:大多数情况下,线程只是读取共享变量的值,只有在极少数情况下才会真正修改共享变量的值。对于这种情况,读请求之间无须同步,他们之间的并发访问是安全的。然而写请求必须锁住读请求和其他写请求。

读写锁的应用方法

读写锁在Linux系统中使用pthread_rwlock_t类型表示,初始化和销毁API:

#include <pthread.h>

int pthread_rwlock_init(pthread_rwlock_t* rwlock, const pthread_rwlockattr* attr);

int pthread_rwlock_destroy(pthread_rwlock_t* rwlock);

rwlock参数是需要初始化和销毁的读写锁对象的地址,attr参数用于设置读写锁的属性,一般设置为NULL,表示使用默认属性。若函数调用成功返回0,失败返回非0值,通过errno错误码判断错误原因。

如果不需要动态创建或者设置非默认属性的读写锁对象,可以使用:

pthread_rwlock_t myrwlock = PTHREAD_RWLOCK_INITIALIZER;

请求读锁的系统API:

int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_timedrdlock(pthread_rwlock_t* rwlock, const struct timespec* abstime);

请求写锁的系统API:

int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_timedwrlock(pthread_rwlock_t* rwlock, const struct timespec* abstime);

读锁用于共享模式:如果当前读写锁已经被某线程以读模式占有,则其他线程调用【请求读锁】时会立刻获得读锁;如果当前读写锁已经被某线程以读模式占有,则其他线程调用pthread_rwlock_wrlock【请求写锁】时会陷入阻塞;

写锁用于独占模式:如果当前读写锁被某线程以写模式占有,则无论是调用rdlock还是wrlock,都会陷入阻塞。即在写模式下不允许任何读锁请求通过,读锁请求和写锁请求都要陷入阻塞中,直到线程释放写锁。