【Linux】线程互斥

发布于:2025-09-09 ⋅ 阅读:(15) ⋅ 点赞:(0)


当两个线程同时对一个变量做五万次加法,但最后结果通常小于十万。

#include <iostream>
#include <unistd.h>

int count = 0;

void* threadStart(void *args)
{
    for(int i = 0; i < 50000; i++)
        count++;
    return NULL;
}

int main()
{
    pthread_t tid1;
    pthread_t tid2;

    int n1 = pthread_create(&tid1, nullptr, threadStart, (void*)"thread1 ");
    int n2 = pthread_create(&tid2, nullptr, threadStart, (void*)"thread2 ");

    if(n1 != 0 || n2 != 0)
    {
        std::cerr << "create thread error..." << std::endl;
        return 1;
    }
    pthread_join(tid1,nullptr);
    pthread_join(tid2,nullptr);
    std::cout << "count: " << count << std::endl;
    return 0;
}

关键就在于 count++ 这行代码并不是原子操作,也就是会涉及读取、修改和写入三个操作。
在这里插入图片描述
两个线程都对 count 加了1,但其中一个线程的操作被另一个线程的操作覆盖了,看起来只加了一次。
可以通过 对共享资源的访问 进行加锁解决:
在这里插入图片描述
通过加锁(lock),确保同一时刻只有一个线程可以进入临界区(访问共享资源的那段程序)。线程A加锁后就可以对共享资源进行操作,其他线程就会被阻塞,直到线程A解锁(unlock)。
加锁时应只锁住真正需要保护的共享数据访问代码。比如只锁住 count++ 这行,因为只有这行对共享资源进行操作,若把整个for循环锁住,会导致不必要的串行化,降低并发性能。

线程同步

线程同步是指协调多个线程的执行顺序和对共享资源的访问,以保证程序的正确性。否则一个线程刚解锁,立马又加锁,其他线程就会一直获取不到锁。

条件变量

条件变量就是一种实现线程同步机制的一种方式。它允许线程等待某个条件成立,并由其他线程在条件成立时通知等待的线程。

假设有如下场景:一个线程(生产者)生产产品,其他线程(消费者)获取产品,产品就是共享资源,就会出现消费者不断获取产品也就是不断加锁,但实际上却获取不到产品。这就会导致生产者获取不到锁,无法将产品放置给生产者获取。现在通过约定,消费者若获取不到产品,就等待,此时生产者就可以获取锁来放置产品,然后通过一种东西(电话或者短信等)来通知消费者(可通知一个线程或多个线程),此时消费者就可以去获取产品了。在这个例子中,加锁是为了对共享资源的操作,东西(电话或短信)就是条件变量

pthread_cond_wait():
作用:让当前线程在条件变量cond 上等待,直到被其他线程通过 pthread_cond_signal 或 pthread_cond_broadcast 唤醒。当线程等待后,函数会释放传入的锁,其他线程就可以获取锁并操作共享数据。该线程会加入到条件变量cond 的等待队列中。线程被唤醒后,会重新参与锁的竞争,意味着函数返回时,线程持有锁。

pthread_cond_signal():
作用:唤醒一个正在 cond 上等待的线程。如果没有任何线程在等待,则此调用无任何效果。pthread_cond_broadcast 则是唤醒所有正在 cond 上等待的线程。

生产-消费者模型

BlockQueue.hpp:

#pragma once

#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>

const static int defaultCap = 5;

template<typename T>
class BlockQueue
{
private:
    bool isFull()
    {
        return _block_queue.size() == _max_cap;
    }
    bool isEmpty()
    {
        return _block_queue.empty();
    }
public:
    BlockQueue(int cap = defaultCap): _max_cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);    //初始化锁
        pthread_cond_init(&_p_cond, nullptr);    //初始化条件变量
        pthread_cond_init(&_c_cond, nullptr);    //初始化条件变量
    }
    void Pop(T *out)
    {
        pthread_mutex_lock(&_mutex);    //加锁
        //使用while循环判断,因为可能存在另一个消费者把数据拿走,导致队列为空
        while(isEmpty())
        {
            //队列为空,等待
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        //队列不空 或者 被唤醒
        *out = _block_queue.front();
        _block_queue.pop();
        pthread_mutex_unlock(&_mutex);  //解锁
        //消费者取走数据,队列不满,通知生产者生产
        pthread_cond_signal(&_p_cond);
    }
    void Equeue(const T &in)
    {
        pthread_mutex_lock(&_mutex);    //加锁
        while(isFull())
        {
            //队列已满,不能生产,需等待
            //等待时,会释放锁,让其他线程获取
            //函数返回时,还在临界区,所以会重新参与锁的竞争,重新加上锁后函数才会返回,对临界资源安全操作
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        //队列不满 或者 被唤醒
        _block_queue.push(in);  //放入到队列
        pthread_mutex_unlock(&_mutex);  //解锁
        //生产者放入数据,队列不空,通知消费者取数据
        pthread_cond_signal(&_c_cond);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex); //回收锁
        pthread_cond_destroy(&_p_cond); //回收条件变量
        pthread_cond_destroy(&_c_cond); //回收条件变量
    }

private:
    std::queue<T> _block_queue; //临界资源
    int _max_cap;   //最大容量
    pthread_mutex_t _mutex;
    pthread_cond_t _p_cond;     //生产者条件变量-用来唤醒生产者
    pthread_cond_t _c_cond;     //消费者条件变量-用来唤醒消费者
};

Task.hpp:

#pragma once
#include <string>

class Task
{
public:
    Task()
    {}
    Task(int x, int y):_x(x), _y(y)
    {}
    void Excute()
    {
        _result = _x + _y;
    }

    std::string Result()
    {
        std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
        return msg;
    }

    std::string debug()
    {
        std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
        return msg;
    }
private:
    int _x;
    int _y;
    int _result;
};

main.cc:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>

void *consumerFunc(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        
        // 1 拿数据
        Task t;
        bq->Pop(&t);

        // 2 处理数据
        t.Excute();
        std::cout << "消费者 ->:" << t.Result() << std::endl;
    }
    
}

void *producerFunc(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        sleep(1);
        // 1 生产数据
        int x = rand() % 10 + 1; //1~10
        int y = rand() % 20 + 1; //1~20
        Task t(x, y);

        // 2 放数据
        bq->Equeue(t);
        std::cout << "生产者 ->:" << t.debug() << std::endl;
    }
}

int main()
{
    BlockQueue<Task> *bq = new BlockQueue<Task>();
    pthread_t consumer, producer;
    pthread_create(&consumer, nullptr, consumerFunc, bq);
    pthread_create(&producer, nullptr, producerFunc, bq);

    pthread_join(consumer, nullptr);
    pthread_join(producer, nullptr);
    return 0;
}

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到