一. 线程互斥
引入
要理解线程互斥的概念,首先我们要先从,为什么线程需要互斥这个点切入。
我们先来看看下面这串代码,这是一段有关抢票的代码
#include <iostream> #include <pthread.h> #include <unistd.h> using namespace std; int tickets = 1000; void* Ticket(void *argv) { while(1) { if(tickets>0) { usleep(1000); printf("[%p]:线程:%d号\n",pthread_self(),tickets--); } else { break; } } return nullptr; } int main() { pthread_t t1,t2,t3; pthread_create(&t1,nullptr,Ticket,(void*)nullptr); pthread_create(&t2,nullptr,Ticket,(void*)nullptr); pthread_create(&t3,nullptr,Ticket,(void*)nullptr); pthread_join(t1,nullptr); pthread_join(t2,nullptr); pthread_join(t3,nullptr); return 0; }
运行结果:
我们让三个线程同时进行抢票,发现票数被抢到了负数,这样的结果当然不符合我们的预期。
票数抢到负数是因为线程产生了竞态关系,当一个线程达到 0 时,另一个线程可能刚好执行了 - - 的代码,此时当前的线程在进行 - - 就出现了负数。
当多个线程访问共享资源时,如果不考虑线程安全问题,可能会导致数据丢失,错误等原因。
1. 进程互斥基本概念与背景
(1)互斥与并发
互斥就是相互排斥,有你没我,与互斥相对的一组是并发,并发就是可以同时存在。
我们将这组概念带入线程中,互斥就是在同一时间,只能有一个线程对共享资源进行访问;线程并发就是在同一时间,线程同时竞争共享资源。
(2)同步与异步
同步与异步是相对的,同步必须等待被调用方执行完毕并返回结果才能继续执行后续操作,相当于线程被阻塞住了。异步就是多个线程自己执行自己的内容,不需要关心其他的线程。
(3)操作原子性
原子性操作是指当一个线程执行一段代码的时候,此时不能切换到其他线程,必须执行完当前线程的代码才能进行切换。
(4)临界区和临界资源
临界资源是指在多线程环境下,不能被线程同时访问和使用的资源,即被使用的共享资源就是临界资源。临界区指访问临界资源的代码块,从起点到终点,这块地方就是临界资源
为了保护这些临界资源,操作系统提供了一些机制。给临界资源进行上锁,使用信号量进行访问控制等等。
2. 互斥量 mutex
(1)案例解析
对于上述的抢票代码,票数产生了负数。在线程代码中 usleep 进行了一段休眠,这会导致很多线程进入该代码段,而且ticket- - 本身就不是一个原子操作,所以说,要解决上述的问题,我们就要对临界区进行保护,于是我们便有了锁,用来对临界资源进行上锁,使得在临界区当中,每次运行只允许一个线程执行。
(2)系统接口
操作系统给出了互斥锁(mutex)用于解决当前情况。它可以保证在相同时间下,只有一个线程可以访问共享资源或代码段。互斥锁的用法也很简单,首先定义一个互斥锁对象,在临界区处进行上锁,在临界区末尾释放锁。线程拿到锁进行原子操作,执行完临界区代码后会释放锁,其他的线程会在临界区外部等待锁,只有拿到了锁才能进入临界区进行访问。这样使得临界区每次进行访问只有拥有锁的线程才能进行访问,保证了线程执行的安全性。
下面我们来介绍一下 POSIX 线程库封装的一些线程互斥接口
POSIX 线程库中用于操作互斥量的函数都以 pthread_mutex 打头:
初始化互斥量:
静态初始化互斥量:pthread_mutex_t
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
静态初始化的互斥量不需要进行 destroy操作,程序结束会自动回收
动态初始化互斥量:pthread_mutex_init
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
参数:
mutex:指向类型变量的指针
mutexattr:给 mutex 设置属性,我们一般使用nullptr 默认值
销毁互斥量:
动态分配后的互斥锁若不使用需要进行回收
pthread_mutex_destroy:
int pthread_mutex_destroy(pthread_mutex_t *mutex);
参数:
mutex:指向类型变量的指针
互斥量加锁和解锁:
对临界区资源进行加锁解锁,当互斥量处于未锁状态,该函数会将互斥量锁定,若其他线程已经锁定该互斥量,那么该执行流会被阻塞挂起,等待互斥量解锁
pthread_mutex_lock:
pthread_mutex_unlock:
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
参数:
mutex:指向类型变量的指针
全局锁与局部锁
全局锁:
全局锁是全局变量或静态变量,所有线程都可以访问,优点是可以保护全局资源,缺点在于所有的线程都可以访问,频繁的访问导致并发效率降低
局部锁:
局部锁是定义在局部变量的锁,它服务于局部某个特定的函数当中,虽然作用范围有限,但服务于专一对象并发效率高。
(3)实现原理
上锁解锁都是原子性的操作,原子性的操作要求CPU在执行指令的时候要尽可能的短,只执行一条语句。我们来看一下伪代码
锁的操作是原子的,当一把锁被频繁的调用时,意味着它对 CPU 性能的开销也逐渐增大。
互斥锁的创建销毁,加锁解锁,等待唤醒,竞争阻塞都会对资源进行消耗。所以说,我们要合理规划使用锁资源,尽量减少互斥锁的保护范围,缩短锁的持有时间尽快释放锁,根据不同场景用不同的锁来进一步提升效率。
(4)互斥锁与自旋锁
互斥锁和自旋锁是两种常见的同步机制,它们的区别在于,当一个线程获取一个被占用的锁时,互斥锁会进行挂起等待,等待锁的释放;而自旋锁会不断地循环访问,直到锁的释放。互斥锁可以避免浪费CPU资源,不需要频繁进行访问,但是会增加上下文切换开销;自旋锁频繁访问会增加CPU资源消耗,但可以减少上下文的切换。
(5)互斥量封装
#pragma once
#include <iostream>
#include <pthread.h>
using namespace std;
class Mutex
{
public:
Mutex()
{
int n = pthread_mutex_init(&mutex, nullptr);
(void)n;
}
void Lock()
{
pthread_mutex_lock(&mutex);
}
void Unlock()
{
pthread_mutex_unlock(&mutex);
}
~Mutex()
{
int n = pthread_mutex_destroy(&mutex);
}
private:
pthread_mutex_t mutex;
};
class LockGuard
{
public:
LockGuard(Mutex &mutex)
: _mutex(mutex)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex &_mutex;
};
二. 线程同步
1. 同步的概念与竞态条件
什么是同步?同步就是在安全的前提下你,线程按照某种顺序访问临界资源。什么是竞态条件?在多线程中,同时访问或修改共享资源,最终结果依赖于线程执行的相对顺序,从而导致程序出现不可预测行为的现象。
2. 条件变量
(1)条件变量概念
条件变量就是一个条件,当线程不满足该条件时,就主动进行阻塞,不需要轮询的访问;当条件满足时,根据队列排序对线程进行唤醒。这样节省了 CPU 的资源消耗,它通常与互斥锁绑定。
(2)系统接口
下面我们来介绍一下 POSIX 线程库封装的一些线条件变量接口
POSIX 线程库中用于操作条件变量的函数都以 pthread_cond 打头:
初始化:
静态初始化条件变量:pthread_cond_t
pthread_cond_t static_cond = PTHREAD_COND_INITIALIZER;
静态初始化的条件变量不需要 destroy 进行销毁,但是只能指向一个对象
动态初始化条件变量:pthread_cond_init
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
动态创建的条件变量较为灵活,可以更改指向的锁对象,当我们不使用该条件变量时,我们需要 进行 destroy 销毁
销毁:
动态创建的条件变量在结尾需要进行销毁
pthread_cond_destroy:
int pthrad_cond_destroy(pthread_cond_t *cond);
参数:
cond:当前条件变量指针
返回值:
成功返回0,失败返回错误码
等待条件:
当条件不满足时,线程会自动释放锁进入阻塞状态,其他的线程可以拿被释放的锁执行任务,被阻塞的线程需要等待唤醒信号,被唤醒的线程将会重新获取锁
pthread_cond_wait:
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:当前条件变量指针
mutex:互斥量,即互斥锁
返回值:
成功返回0,失败返回错误码
唤醒等待:
通常与等待线程的接口一同使用,用于唤醒线程(等待前必须有锁)
pthread_cond_broadcast:
唤醒所有等待条件变量的线程
int pthread_cond_broadcast(pthread_cond_t *cond);
参数:
cond:当前条件变量指针
返回值:
成功返回0,失败返回错误码
pthread_cond_signal:
唤醒单个等待条件变量的线程
int pthread_cond_signal(pthread_cond_t *cond);
参数:
cond:当前条件变量指针
返回值:
成功返回1,失败返回错误码
(3)注意事项
线程唤醒和线程等待必须配合锁进行使用,类似于我们使用一个公共微波炉,只有持有锁的人才能使用这个微波炉。如果我们在持有锁前进行等待,就相当于微波炉是空的,但你确一直在等待。当我们进入临界区后,相当于一排人排队等着微波炉(wait等待),唤醒相当于前一个人使用完微波炉轮询到下一个。
// 正确示例 pthread_mutex_lock(&mutex); pthread_cond_wait(&cond, &mutex); // 持有锁后再等待 pthread_cond_unlock(&mutex);
pthread_mutex_lock(&mutex); queue.push(data); // 修改共享资源(线程安全) pthread_cond_signal(&cond); // 唤醒等待线程 pthread_mutex_unlock(&mutex);
下面我们来讲一下虚假唤醒,所谓的虚假唤醒是因为错误的认为“条件已满足”,从而执行不符合的操作,最典型的就是 if 语句
我们来看一下下面这串代码
//错误案例 pthread_mutex_lock(&mutex); if (queue.empty()) { // 用 if 检查条件 pthread_cond_wait(&cond, &mutex); // 若此处发生虚假唤醒 } // 错误:误以为队列非空,执行出队操作 data = queue.pop(); // 队列为空时调用 pop(),导致程序崩溃或数据错误 pthread_mutex_unlock(&mutex);
当线程队列为空时,if 语句为 true ,进入了 if 循环体当中进入线程等待,若线程等待成功后,此时不会再次进行 if 判断,此时直接执行 pop 操作,就会导致出错。
处理虚假唤醒的唯一方法就是使用 while 循环而不是 if 条件语句的判断,我们来看一下正确的代码
pthread_mutex_lock(&mutex); // 用 while 循环检查条件,而非 if while (queue.empty()) { // 即使被虚假唤醒,也会重新检查 pthread_cond_wait(&cond, &mutex); // 条件不满足则继续等待 } // 确认条件满足后再执行操作 data = queue.pop(); pthread_mutex_unlock(&mutex);
此时,线程等待成功后会再次进入 while 循环进行判断,队列不为空才继续向下执行。
线程的唤醒信号是不被缓存的,当一个线程发送了一个唤醒信号时,如果没有任何线程在等待该条件变量,这个信号就会直接“失效”不会进行缓存。例如,当我们将唤醒信号放在等待操作前,当信号丢失时,等待线程会永久阻塞。
我们来看一下正确的使用方法
int signal_count = 0; // 缓存信号的计数器 // 唤醒者 pthread_mutex_lock(&mutex); signal_count++; // 缓存信号 pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); // 等待者 pthread_mutex_lock(&mutex); while (signal_count == 0) { // 无缓存信号则等待 pthread_cond_wait(&cond, &mutex); } signal_count--; // 消费一个信号 pthread_mutex_unlock(&mutex);
我们可以使用一个计数器用来记录信号传递次数,用计数器的方式代替信号唤醒机制,这样即使信号失效了,也可以让线程知道是否需要进行等待。
避免信号丢失的关键在于用共享变量记录条件状态在等待前先检查变量
3. 生产者消费者模型
(1)模型概念
生产者消费者模型要符合 “321” 原则,即 3 种关系,2 个 角色,1 个交易场所。下面我们按照321来讲解生产者消费者模型。
类似于我们日常生活,在我们生活中,工厂一般扮演着生产者的角色生产各种各样的东西,许多工厂的货物被集中到超市进行售卖,由消费者来在超市中购买货物。工厂不会直接的把货物交到消费者的手中,消费者也不会直接的去工厂进行购入货物。将这套模型运用到计算机中,此时,工厂就是生产者,负责生产数据,任务,消息等等;超市就是共享缓冲区,来获取数据提供数据;消费者在超市中取数据将数据进行进一步处理。这套模型完成了数据生成与数据处理的解耦,提升了程序的稳定性。
1个交易场所就是共享内存,2个角色分别是生产者消费者,3种关系为生产者与生产者的关系消费者与消费者的关系,生产者与消费者的关系。
下面我们就来讲讲者这三种关系
生产者与生产者的关系是竞争关系,不同的生产者需要竞争锁,因为向共享区中写入数据需要保证原子性,它们之间需要竞争缓冲区的写入权。
消费者和消费者的关系是竞争关系,不同的消费者也需要竞争锁,保证消费者取数据不取重,消费者之间需要竞争读取数据权。
生产者与消费者之间是协助关系,生产者向共享区当中写入数据,此时应该通知消费者共享区当中有数据;消费者进入共享区当中消费数据,当数据消耗完后,需要通知生产者及时生产数据。
(2)模型优点
生产者消费者模型完成了数据生产和数据使用之间的解耦操作,生产者无需知道消费者的存在消费者也无需生产者的存在。后续可以加入新的生产者或者消费者扩展性强。同时,该模型还可以处理速度不匹配的问题,例如生产速度块,可以在缓冲区当中进行缓存。
(3)queue模拟阻塞队列的生产消费模型
#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"
using namespace std;
static int defaultnum = 10;
template <typename T>
class Blockqueue
{
private:
bool _isfull()
{
return _q.size() >= defaultnum;
}
bool _isempty()
{
return _q.empty();
}
public:
Blockqueue(int cap = defaultnum)
: _s_sleep(0), _p_sleep(0), _cap(cap)
{
}
void Enqueue(const T &in)
{
{
LockGuard(_mutex);
while (_isfull())
{
_p_sleep++;
cout << "生产队列已满" << endl;
_full_cond.Wait(_mutex);
_p_sleep--;
}
q.push(T);
if (_s_sleep > 0)
{
_empty_cond.Signal();
cout << "唤醒消费者" << endl;
}
}
}
T Pop()
{
T data;
{
LockGuard(_mutex);
while (_isempty())
{
_s_sleep++;
cout << "消费队列为空" << endl;
_empty_cond.Wait(_mutex);
_s_sleep--;
}
data = q.front();
q.pop();
if (_p_sleep > 0)
{
_full_cond.Signal();
cout << "唤醒生产者" << endl;
}
}
return data;
}
~Blockqueue()
{
}
private:
queue<T> _q; // 存放
int _cap; // 容量
int _s_sleep; // 消费者休眠数
int _p_sleep; // 生产者休眠数
Mutex _mutex;
Cond _full_cond; // 队列满条件
Cond _empty_cond; // 队列空条件
};
4. 条件变量使用规范以及封装
等待条件代码:
pthread_mutex_lock(&mutex); while (条件为假) pthread_cond_wait(cond, mutex); 修改条件 pthread_mutex_unlock(&mutex);
给条件发送信号代码:
pthread_mutex_lock(&mutex); 设置条件为真 pthread_cond_signal(cond); pthread_mutex_unlock(&mutex);
条件变量接口封装:
#pragma once
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
using namespace std;
class Cond
{
public:
Cond()
{
int n = pthread_cond_init(&cond,nullptr);
}
void Wait(Mutex &mutex)
{
pthread_cond_wait(&cond,mutex.Get());
}
void Signal()
{
pthread_cond_signal(&cond);
}
void Broadcast()
{
pthread_cond_broadcast(&cond);
}
~Cond()
{
int n = pthread_cond_destroy(&cond);
}
private:
pthread_cond_t cond;
};
5. 环形队列生产模型
(1)POSIX 信号量
POSIX 信号量与 SystemV 信号量作用相同,都是原子操作的计数器,但 POSIX 的信号量主要服务于线程和进程中。
下面我们来介绍一下相关接口:
sem_init:
信号量初始化
#include <semaphore.h> int sem_init(sem_t *sem,int pshared,unsigned int value);
参数:
sem:指向信号量的指针
pshared:0表示线程间共享,非0表示进程间共享
value:信号量初始值
返回值:
成功返回0,失败返回-1
sem_destroy:
信号量销毁
1 int sem_destroy(sem_t *sem);
参数:
sem:指向信号量的指针
返回值:
成功返回0,失败返回-1
sem_wait:
信号量等待,信号量数量-1
int sem_wait(sem_t *sem); //P()
参数:
sem:指向信号量的指针
返回值:
成功返回0,失败返回-1
sem_post:
归还信号量,信号量数量+1
int sem_post(sem_t *sem);//V()
参数:
sem:指向信号量的指针
返回值:
成功返回0,失败返回-1
(2)环形队列封装
Ringqueue.hpp:
#pragma once #include "Sem.hpp" #include "Mutex.hpp" #include <vector> static const int gcap = 8; template <typename T> class Ringqueue { public: Ringqueue(int cap = gcap) : _v(cap), _cap(cap), _p_step(0), _s_step(0), _blank_sem(cap), _data_sem(cap) { } void Enqueue(T &in) { _blank_sem.P(); { LockGuard lockguard(_p_mutex); _v[_p_step] = in; _p_step++; _p_step %= _cap; } _data_sem.V(); } T Pop() { T data; _data_sem.P(); { LockGuard lockguard(_s_mutex); data = _v[_s_step]; _s_step++; _s_step %= _cap; } _blank_sem.V(); return data; } ~Ringqueue() { } private: // 生产者 Sem _blank_sem; int _p_step; // 消费者 Sem _data_sem; int _s_step; // 存储循环体 vector<T> _v; int _cap; // 两把锁 Mutex _p_mutex; Mutex _s_mutex; };
Main.cc:
#include "Ringqueue.hpp" #include <string> #include <pthread.h> #include <unistd.h> extern int data = 1; pthread_mutex_t data_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t cum_mutex = PTHREAD_MUTEX_INITIALIZER; struct Thread { Ringqueue<int> *rq; string name; }; void* Comsumer(void* argv) { Thread *td = static_cast<Thread*>(argv); while(true) { pthread_mutex_lock(&cum_mutex); sleep(3); int t; t = td->rq->Pop(); cout << td->name << ":消费者拿到了一个数据->" << t << endl; pthread_mutex_unlock(&cum_mutex); } } void* Productor(void* argv) { Thread *td = static_cast<Thread*>(argv); while(true) { pthread_mutex_lock(&data_mutex); sleep(1); cout << td->name << ":生产了一个任务->" << data << endl; td->rq->Enqueue(data); data++; pthread_mutex_unlock(&data_mutex); } } int main() { Ringqueue<int> *rq = new Ringqueue<int>(); pthread_t p[2],c[3]; Thread *td1 = new Thread(); td1->name = "Thread-1p"; td1->rq = rq; pthread_create(&p[0],nullptr,Productor,td1); Thread *td2 = new Thread(); td2->name = "Thread-2p"; td2->rq = rq; pthread_create(&p[1],nullptr,Productor,td2); Thread *td3 = new Thread(); td3->name = "Thread-3c"; td3->rq = rq; pthread_create(&c[0],nullptr,Comsumer,td3); Thread *td4 = new Thread(); td4->name = "Thread-4c"; td4->rq = rq; pthread_create(&c[1],nullptr,Comsumer,td4); Thread *td5 = new Thread(); td5->name = "Thread-5c"; td5->rq = rq; pthread_create(&c[2],nullptr,Comsumer,td5); pthread_join(p[0], nullptr); pthread_join(p[1], nullptr); pthread_join(c[0], nullptr); pthread_join(c[1], nullptr); pthread_join(c[2], nullptr); return 0; }
三. 死锁
死锁指在进程中各个进程占有不会释放的资源,但因互相申请被其他进程所占用的资源导致处于一种永久的等待状态。
假设现在有两个线程,访问临界资源需要两把锁,此时线程A占了一把锁,线程B占了一把锁,它们互相都想要对方的锁,导致进入两边都在等待对方手里的锁因此进入了线程等待。
此时,线程A持有锁1,线程B持有锁2,线程A尝试获取锁2,线程B尝试获取锁1.它们都互相申请对方的锁但是不释放自己的锁。
那么该如何避免死锁呢?拿上述的例子来讲
方法一:
设置一个时间限制,若超出了当前的时间还未获取到锁那么就主动释放锁。
方法二:
我们可以让所有线程必须按照固定顺序获取资源,必须先获取锁1之后才能获取锁2。我们按照顺序给临界资源上锁即可
错误示范:
// 线程1:先锁A,再锁B void *thread1(void *arg) { pthread_mutex_lock(&lockA); printf("线程1持有锁A,等待锁B...\n"); pthread_mutex_lock(&lockB); // 若线程2已持有锁B,死锁 // 操作资源(省略) printf("线程1操作完成\n"); pthread_mutex_unlock(&lockB); pthread_mutex_unlock(&lockA); return NULL; } // 线程2:先锁B,再锁A(与线程1顺序相反) void *thread2(void *arg) { pthread_mutex_lock(&lockB); printf("线程2持有锁B,等待锁A...\n"); pthread_mutex_lock(&lockA); // 若线程1已持有锁A,死锁 // 操作资源(省略) printf("线程2操作完成\n"); pthread_mutex_unlock(&lockA); pthread_mutex_unlock(&lockB); return NULL; }
正确示范:
// 关键:所有线程按固定顺序获取锁(先A后B) void *thread1(void *arg) { pthread_mutex_lock(&lockA); // 先锁A printf("线程1持有锁A,等待锁B...\n"); pthread_mutex_lock(&lockB); // 再锁B(顺序固定) printf("线程1操作完成\n"); pthread_mutex_unlock(&lockB); pthread_mutex_unlock(&lockA); return NULL; } void *thread2(void *arg) { pthread_mutex_lock(&lockA); // 先锁A(与线程1顺序一致) printf("线程2持有锁A,等待锁B...\n"); pthread_mutex_lock(&lockB); // 再锁B printf("线程2操作完成\n"); pthread_mutex_unlock(&lockB); pthread_mutex_unlock(&lockA); return NULL; }
方法三:
一次性的获取所有的资源,我们可以让线程一次性获取两把锁,这样就不会造成一人一把锁的状况
正确示范:
// 定义两个需要获取的锁 pthread_mutex_t lock1 = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t lock2 = PTHREAD_MUTEX_INITIALIZER; // 一次性尝试获取所有锁 int acquire_all_locks() { // 1. 尝试非阻塞获取锁1 if (pthread_mutex_trylock(&lock1) != 0) { return -1; // 获取失败 } // 2. 尝试非阻塞获取锁2 if (pthread_mutex_trylock(&lock2) != 0) { pthread_mutex_unlock(&lock1); // 释放已获取的锁1 return -1; // 获取失败 } // 3. 所有锁获取成功 return 0; } // 线程函数:需要同时使用两个锁 void *thread_func(void *arg) { int id = *(int *)arg; // 循环重试直到获取所有锁 while (acquire_all_locks() != 0) { usleep(1000); // 短暂等待后重试,避免CPU空转 } // 成功获取所有锁,执行操作 printf("线程%d: 成功获取所有锁,执行操作...\n", id); // 释放所有锁 pthread_mutex_unlock(&lock2); pthread_mutex_unlock(&lock1); printf("线程%d: 释放所有锁\n", id); return NULL; }
总结:
线程同步与互斥是多线程编程的 “基石”,核心目标是解决 “数据安全” 和 “顺序可控” 问题:
互斥:用锁(互斥锁、读写锁)保证共享资源的原子访问,避免数据竞争;
同步:用条件变量、信号量协调线程执行顺序,满足逻辑依赖;
避坑重点:锁必须成对使用,条件变量需处理虚假唤醒,同步逻辑需避免死锁。