参考教材:《操作系统概念(原书第九版)》
三、并发与同步
信号
- 信号是内核发出的一个消息,用来通知进程:系统中发生了某种类型的事件。
- 信号从内核发往进程(有时信号的发送是由另一个进程发起的)
- 每一种信号都用一个整数ID来表示(例如,Linux支持的ID为1-30)
- 通常情况下只有信号ID会被发送给进程
- 使用信号的两个主要目的为:
-
- 通知进程:某种特殊的事件发生了
-
- 迫使进程执行 信号处理程序(signal handler)
- 迫使进程执行 信号处理程序(signal handler)
-
1. 发送信号( Sending a Signal )
- 内核如何发送信号到目的进程:更改目的进程上下文中的某些状态( updating some state in the context of the destination process )
- 内核发送信号的两个原因:
- 内核检测到了一个系统事件的发生,例如除零(SIGFPE)和子进程终止(SIGCHLD)
- 进程调用了 kill 来显式地请求内核向目标进程发送信号
- 一个进程可以发送信号给它自己
- 内核发送信号的两个原因:
- 使用/bin/kill程序:进程通过调用kill函数向其他进程(包括它自己)发送信号
int kill(pid_t pid, int sig);
pid>0:发送信号sig给进程pid
pid=0:发送信号sig给调用进程的进程组内的所有进程(包括其自身)
pid <0:发送信号sig给进程组 𝑝𝑖𝑑 (pid的绝对值)内的所有进程
例:
- int pause(void);
- 函数描述:使调用进程(或线程)休眠,等待信号唤醒。调用该系统调用的进程将处于阻塞状态直到有信号到达将其唤醒。
- int pause(void);
2. 接收信号( Receiving a Signal )
当目的进程被内核强制以某种方式对信号的发送做出反应时,它就接收了这个信号
一些可能的反应:
- 忽略(ignore)该信号(什么也不做)
- 终止(terminate)该进程(核心转储 core dump)
- 通过执行一个用户级的信号处理程序(signal handler)来接收(catch)该信号
- 类似于在响应异步中断时调用的硬件异常处理程序
- 类似于在响应异步中断时调用的硬件异常处理程序
假设内核正在从异常处理程序返回,并准备将控制权传递给进程p
- 内核检查 pnb = pending & ~blocked
- 进程p 的未被阻塞的待处理信号(挂起)的集合
- If (pnb == 0) 如果集合为空
- 将控制传递到 p的逻辑控制流中的下一条指令
- Else 不为空
- 选择集合pnb中最小的非零位k ,强制p 处理信号k (清0) –收到信号会触发进程p采取某种行为
- 对所有的非零k重复
- 控制传递到p 的逻辑控制流中的下一条指令
- 内核检查 pnb = pending & ~blocked
每一种信号都有一个预设的默认操作,包括:
- 进程终止(process terminates)
- 进程终止并转储内存(process terminates and dumps core)
- 进程停止(挂起)直到被SIGCONT信号唤起
- 进程忽略该信号
通过signal函数修改一个信号(signum)的默认操作
- sighandler_t signal(int signum, sighandler_t handler)
- 其中SIGSTOP和SIGKILL信号除外:这两个信号的默认操作不可被修改
- handler取不同值时:
- SIG_IGN:忽略signum信号
- SIG_DFL:将signum信号重设为默认操作
- 否则,handler就是用户定义的信号处理程序的函数地址
- 当进程接收到signum信号时调用该程序
- 当处理程序执行return时,控制会传递到控制流中被信号接收所中断的指令处
- 将处理程序的地址传递到signal函数从而改变默认行为,这就叫作设置信号处理程序
- 例:
- int fflush(FILE *stream);
- 函数描述:刷新流 stream 的输出缓冲区。
- 参数:stream: 指向 FILE 对象的指针,该 FILE 对象指定了一个缓冲流。
- 返回值:如果成功,该函数返回 0。如果发生错误,则返回 EOF。
- int fflush(FILE *stream);
3. 待处理(pending)和阻塞(block)信号
- 一个已经被发送出去但没有被接收的信号就是待处理信号
- 对一个进程而言,每个类型的信号最多只能有一个待处理信号
- 注意:信号不会排队
- 假设一个进程有一个ID为k的待处理信号,那么后面被发送到该进程的信号k会被丢弃,而不是排队等待- 进程可以选择性地阻塞某些信号的接收
- 被阻塞的信号也可以传输,但在它被解除阻塞之前,该信号不能被接收
- 一个待处理信号最多只能被接收一次
- 挂起/阻塞位( Pending/Blocked Bits )
- 内核会在每个进程的上下文中维护挂起和阻塞位向量(pending and blocked bit vectors)
- 挂起位向量(待处理信号的集合):
- 当信号k被传输时,内核设置挂起向量的第k位(置1)
- 当信号k被接收时,内核将挂起向量第k位清除(置0)
- 阻塞位向量(阻塞信号的集合):
- 可以用sigprocmask函数设置和清除
- 也被称为信号屏蔽掩码(signal mask)
- 挂起位向量(待处理信号的集合):
- 内核会在每个进程的上下文中维护挂起和阻塞位向量(pending and blocked bit vectors)
- 进程可以选择性地阻塞某些信号的接收
- 阻塞与解除阻塞
- 隐式阻塞机制
- 内核默认阻塞与当前正在处理信号类型相同的待处理信号
- 如: 一个SIGINT 信号处理程序不能被另一个 SIGINT信号中断(此时另一个SIGINT信号被阻塞)
- 显式阻塞和解除阻塞机制
- sigprocmask 函数及其辅助函数可以明确地阻塞/解除阻塞选定的信号:SIG_BLOCK/UNBLOCK/SET_MASK
- int sigprocmask(int how, const sigset_t *restrict set, sigset_t *restrict oset);
- 函数描述:根据 how 参数指定的方法修改进程的信号屏蔽字,新的信号屏蔽字由参数 set 指定,原先的信号屏蔽字将保存在 oset 中。
- 参数:
- how:用于指定信号修改的方式,可能选择有三种:
- SIG_BLOCK //加入信号到进程屏蔽。
- SIG_UNBLOCK //从进程屏蔽里将信号删除。
- SIG_SETMASK //将 set 的值设定为新的进程屏蔽。
- set:为指向信号集的指针,在此专指新设的信号集,如果仅想读取现在的屏蔽值,可将其置为 NULL。
- oldset:也是指向信号集的指针,在此存放原来的信号集。
- how:用于指定信号修改的方式,可能选择有三种:
- 返回值:成功执行时,返回 0;失败返回-1,errno 被设为 EINVAL。
- 辅助函数
- sigemptyset (sigset_t *set): 初始化set为空集合
- sigfillset(sigset_t *set):把每个信号都添加到set中
- sigaddset(sigset_t *set, int signum) : 把指定的信号signum添加到set
- sigdelset(sigset_t *set, int signum): 从set中删除指定的信号signum
- 隐式阻塞机制
4、安全的信号处理
- 信号处理程序和主程序或其他信号处理程序是并发执行的,因此会带来很多问题。
- 例如: 如果处理程序和主程序并发访问同样的全局数据结构,那么结果就是未知的甚至是致命的
- 一些保守的编写处理程序的原则
G0: 处理程序尽可能地简单
G1: 在处理程序中只调用异步信号安全的函数
- 异步信号安全(简称安全):可重入的(例如只访问局部变量,之后会介绍)或者不能被信号处理程序中断。
- 常用的安全函数:_exit, fork, accept, kill, execve, signal, waitpid, write…
- 安全的I/O(SIO)包
G2: 保存和恢复errono
- 许多异步安全的函数都会在出错返回时设置errno,而在处理程序中调用这样的函数可能会干扰主程序中其他依赖于errno的部分。
- 解决方法就是在进入处理程序时把errno保存在一个局部变量中,在处理程序返回前恢复它。(注意,只有在处理程序要返回时才有此必要,若处理程序调用_exit终止该进程,那么就不需要这样做了)
G3: 阻塞所有的信号,保护对共享全局数据结构的访问
- 如果处理程序和主程序或其他处理程序共享一个全局数据结构,那么在访问(读或写)该数据结构时,你的处理程序和主程序应该暂时阻塞所有的信号。
- 如果处理程序和主程序或其他处理程序共享一个全局数据结构,那么在访问(读或写)该数据结构时,你的处理程序和主程序应该暂时阻塞所有的信号。
G4: 用volatile声明全局变量
- 用volatile类型限定符来定义一个变量,告诉编译器不要缓存这个变量。Volatile限定符强迫编译器每次在代码中引用g时,都要从内存中读取。
- volatile int g;
- 用volatile类型限定符来定义一个变量,告诉编译器不要缓存这个变量。Volatile限定符强迫编译器每次在代码中引用g时,都要从内存中读取。
G5: 用sig_atomic_t声明标志
- 在常见的处理程序设计中,处理程序会写全局标志来记录收到了信号。主程序周期性地读这个标志,响应信号,再清除该标志。
- 对于通过这种方式来共享的标志,C提供一种整形数据类型sig_atomic_t,对它的读和写保证会是原子的(不可中断)。
- volatile sig_atomic_t flag;
- 正确的信号处理
- 未处理的信号是不排队的,因为pending位向量中每种类型的信号只对应有一位,所以每种类型最多只能有一个未处理的信号。
- 如果两个类型为k的信号发送给一个目的进程,而目的进程当前正在执行信号k的处理程序,所以第一个信号k被阻塞了,第二个信号就简单地被丢弃了;它不会排队。
- 关键思想:如果存在一个未处理地信号就表明至少有一个信号到达了。
- 例:
- pid_t wait(int *wstatus);
- 函数描述:暂停调用进程的执行,直到其一个子进程终止。
- 参数:wstatus 用于存放子进程退出时的状态。
- 返回值:成功时,返回终止子进程的进程 ID;出现错误时,返回-1。
- pid_t wait(int *wstatus);
同步(Synchronization)
线程同步概念
- 概念:
- 并发进程在一些关键点上可能需要互相等待或互通消息
- 这种相互制约的等待或互通消息称为进程同步。
- 同步与互斥:
- 互斥(间接制约)是指某一资源同时只允许一个访问者对其进行访问,具有唯一性 和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。
- 同步(直接制约)是指在互斥的基础上(大多数情况),通过其它机制实现访问者 对资源的有序访问。 少数情况下,同步允许多个访问者同时访问资源。
- 同步机制应遵循的准则
- 空闲让进:其他进程均不处于临界区;
- 忙则等待:已有进程处于其临界区;
- 有限等待:等待进入临界区的进程不能"死等";
- 让权等待:不能进入临界区的进程,应释放CPU(如转换到等待状态)
线程内存模型
概念模型:
- 一组并发线程运行在一个进程的上下文中
- 每个线程都有它独立的线程上下文,包括线程ID、栈、栈指 针、程序计数器、条件码和通用目的寄存器值。
- 所有线程共享进程上下文的剩余部分
- 代码、数据、堆以及共享库代码和数据区域
- 打开文件和已安装的信号处理程序
从实际操作的角度来说,这个模型不是那么严格的:
- 寄存器值其实是不被共享的,但虚拟内存总是共享的
- 一个线程可以读写另一个线程的栈
概念模型和实际操作模型之间的区别是困惑和错误的来源之一
线程化的C程序中哪些变量是共享的?
- 问题的答案不是像“全局变量是共享的,栈变量是私有的”那么简单。
- 定义:一个变量x是共享的,当且仅当它的一个实例被一个以上的线程引用。
将变量映射到内存
- 全局变量
- Def:定义在函数之外的变量
- 虚拟内存只包含每个全局变量的一个实例
- 本地自动变量
- Def: 定义在函数内部但没有static属性的变量
- 每个线程都包含它自己的所有本地自动变量的实例
- 本地静态变量
- Def: 定义在函数内部并且有static属性的变量
- 虚拟内存只包含一个本地静态变量的实例
- 例:
- 全局变量
不正确的同步
- 一般而言,没有办法预测操作系统是否将为线程选择一个正确的指令顺序
- 进度图(Progress Graphs)
- 一个进度图描述的是并发线程的离散执行状态空间
- 每个坐标轴对应了一个线程的指令执行顺序
- 每个点对应一个可能的执行状态(Inst1, Inst2)
- 例如:(L1, S2) 表示线程1完成了L1 ,并且线程2完成了S2
- 轨迹( trajectory )是一系列合法的状态转换,它描述了线程的一个可能的并发执行。
- 例如:H1, L1, U1, H2, L2, S1, T1, U2, S2, T2
- 临界区和不安全区
- 操作共享变量cnt的指令L, U, 和 S组成了一个关于共享变量cnt的临界区( critical section )
- 临界区内的指令不应和其他临界区内的指令交替执行
- 两个临界区的交集形成的状态空间 称为不安全区( unsafe regions )
- 如何保证轨迹正确?
- Semaphores(信号量)
- 其他方法(不在我们的讨论范围)
- 锁和条件变量(Pthreads)
- Monitors (Java)
- 一般而言,没有办法预测操作系统是否将为线程选择一个正确的指令顺序
信号量
利用信号量来调度共享资源(实现同步)
基本思想:一个线程用一个信号量来通知另一个线程,某个条件已经为真了
- 用计数信号量跟踪资源状态和通知其他线程
- 用互斥锁保护对资源的访问
信号量s:具有非负整数值的全局变量,被一对原语wait(S)和signal(S)(P(S)和V(S))(原语:等价于原子操作,由关中断/开中断指令实现)操作处理
- P(s): wait()
- 如果s非零,那么P将s减1并立即返回
- 此为原子操作
- 如果s为零,那么就挂起这个线程,直到s变为非零,等待一个V操作重启这个线程。
- 重启后,P操作将s减1,将控制权转移给调用者
- 如果s非零,那么P将s减1并立即返回
- V(s): signal
- 将s加1
- 此操作为原子操作
- 如果有任何线程阻塞在P操作等待s变成非零,那么V操作将会重启一个线程,然后将 s减1来完成它的P操作
- 将s加1
- P(s): wait()
整型信号量与记录型信号量
整型信号量
记录型信号量
续上文:正确的同步:
信号与信号量
- 信号:(signal)是一种处理异步事件的方式,类似于软中断。
- 信号量:(Semaphore)进程间通信处理同步互斥的机制。
经典问题
- 生产者-消费者问题(典型同步问题)
- 需要一个互斥锁和两个计数信号量:
- mutex: 确保互斥地访问缓冲区
- slots: 计数缓冲区中可用的槽位(生产者)
- items: 计数缓冲区中可用的元素(消费者)
- 用一个叫做sbuf的共享缓冲区包来实现:
#include "csapp.h"
typedef struct {
int *buf; /*缓冲区数组*/
int n; /*最大槽位数*/
int front; /*buf[(front+1)%n]是第一个项目*/
int rear; /*buf[rear%n]是最后一个项目*/
sem_t mutex; /*确保互斥地访问缓冲区*/
sem_t slots; /*计数缓冲区中可用的槽位*/
sem_t items; /*计数缓冲区中可用的元素*/
}sbuf_t;
void sbuf_init(sbuf_t*sp,intn);/*初始化信号量*/
void sbuf_deinit(sbuf_t *sp); /*释放buf*/
void sbuf_insert(sbuf_t*sp,int item);/*生产者线程*/
int sbuf_remove(sbuf_t *sp); /*消费者线程*/
/*创建一个空的,有限的,共享的,先进先出的,有个槽位的缓冲区*/
void sbuf_init(sbuf_t *sp,int n) {
sp->buf=calloc(n,sizeof(int));
sp->n=n; /*Buffer最多有n个项目*/
sp->front=sp->rear=O;/*当且仅当front==rear时buffer为空*/
sem_init(&sp->mutex,0,1);/*二元信号量*/
sem_init(&sp->slots,O,n);/*最开始有n个空槽位*/
sem_init(&sp->items,O,O);/*最开始有O个项目*/
}
/*清理缓冲区*/
void sbuf_deinit(sbuf t *sp) {
free(sp->buf);
}
/*在缓冲区尾插入一个项目*/
void sbuf_insert(sbuf_t *sp,int item){
P(&sp->slots); /*等待可用槽位*/
P(&sp->mutex); /*给ouffer加锁*/
sp->buf[(++sp->rear)%(sp->n)]=item;/*插入项目*/
V(&sp->mutex); /*解锁buffer*/
V(&sp->items); /*宣告这里有可用的项目*/
}
/*移除并返回第一个项目*/
int sbuf_remove(sbuf t *sp){
int item;
P(&sp->items); /*等待可用项目*/
P(&sp->mutex); /*给buffer加锁*/
item=sp->buf[(++sp->front)%(sp->n)];/*移除项目*/
V(&sp->mutex); /*解锁buffer*/
V(&sp->slots); /*宣告这里有可用的槽位*/
return item;
}
- 读者-写者问题 (典型互斥问题)
- 问题具体说明:
- 读者线程只读共享对象
- 写者线程只修改共享对象
- 写者必须互斥地访问共享对象
- 可以有无限多个读者共享这个对象
- 问题具体说明:
- 第一类读者-写者问题(读者优先)
- 要求不让读者等待,除非已经把共享对象的使用权限给了一个写者
- 一个后来的读者也比前面的写者优先级高
semaphore wmutex=1;//实现对文件的互斥访问,表示当前是否有进程在访问共享文件
int readcount=0;//记录当前有多少个读进程在访问文件
semaphore rmutex;//用于保证对readcount变量的互斥访问
reader(){
while(1){
P(rmutex);//各读进程互斥访问readcount
readcount++;//访问文件读进程数+1
if(readcount==1)
P(wmutex);//写进程加锁,不允许在读操作过程中执行写操作
V(rmutex);
读文件... //上面这一部分使得多个读者能够同时访问
P(rmutex);//各读进程互斥访问raeadcount
readcount--;//每当一个读进程完成读操作,读者数量-1
if(readcount==0)
V(wmutex);//当没有读者,读操作结束后,写进程解锁
V(rmutex);
}
}
writer(){
while(1){
P(wmutex);//写之前加锁
写文件...
V(wmutex);//写之后解锁
}
}
- 第二类读者-写者问题(写者优先)
- 当一个写者准备好要写,它就会尽可能快地完成写操作
- 在一个写者后到达的读者必须等待,即使这个写者也在等待
int readcount=0;//记录当前有多少个读进程在访问文件
int writecount=0;//当writecount=0,唤醒读者
semaphore mutex1;//用于保证对readcount变量的互斥访问
semaphore mutex2;//用于保证对writecount变量的互斥访问
semaphore mutex;//用于保证写者之间的互斥访问,其他读者要进入rmutex之前需要在mutex上排队
semaphore rmutex=1;//当有新写者来时,停止所有的读进程。
semaphore wmutex=1;//实现对写操作地互斥访问
writer(){
while(1){
P(mutex2);//各写者进程互斥访问writecount
writecount++;//写进程数量+1
if(writecount==1)
P(rmutex);//有写进程时,对读进程加锁
V(mutex2);
P(wmutex);//写之前加锁,保证每次只有一个写者可以进行写操作
写文件...
V(wmutex);
P(mutex2);
writecount--;//每当有一个写进程完成写操作,写者数量-1
if(writecount==0)
V(rmutex);//若没有写进程正在执行,解锁读者进程
V(mutex2);
}
}
reader(){
while(1){
P(mutex);//实现写者优先访问,禁止读者在rmutex排队,保证一次只有一个读者进程访问rmutex
P(rmutex);//读者进程加锁
P(mutex1);//互斥修改readcount变量
readcount++;
if(readcount==1)
P(wmutex);//当有读者进程执行读操作时,对写者进程加锁
V(mutex1);
V(rmutex);//读者进程解锁
V(mutex);
读文件...
P(mutex1);//互斥修改readcount变量
readcount--;
if(readcount==0)
V(wmutex);//当读者数量为0,解锁写者进程,允许写者进程执行写操作
V(mutex1);
}
- 这两种情况都有可能会导致饥饿(starvation)现象(即一个线程无限期阻塞)
- 综合:基于预线程化的并发服务器
sbuf_t sbuf; /* 描述符的共享缓冲区 */
int main(int argc, char **argv) {
int i, listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
pthread_t tid;
listenfd = open_listenfd(argv[1]);
sbuf_init(&sbuf, SBUFSIZE);
for (i = 0; i < NTHREADS; i++) /* 创建工作者线程 */
pthread_create(&tid, NULL, thread, NULL);
while (1) {
clientlen = sizeof(struct sockaddr_storage);
connfd = accept(listenfd, (SA *) &clientaddr, &clientlen);
sbuf_insert(&sbuf, connfd); /* 将connfd插入缓冲区 */
}
}
//工作者线程例程:
void *thread(void *vargp) {
pthread_detach(pthread_self());
while (1) {
int connfd = sbuf_remove(&sbuf); /* 从缓冲区移除connfd */
echo_cnt(connfd); /* 服务客户端 */
close(connfd);
}
}
//echo_cnt 初始化例程:
static int byte_cnt; /* Byte counter */
static sem_t mutex; /* byte_cnt的锁 */
static void init_echo_cnt(void) {
sem_init(&mutex, 0, 1);
byte_cnt = 0;
}
// 工作者线程例程:
static int byte_cnt; //记录接收字节数
static sem_t mutex;
void echo_cnt(int connfd) {
int n;
char buf[MAXLINE];
rio_t rio;
static pthread_once_t once = PTHREAD_ONCE_INIT;
pthread_once(&once, init_echo_cnt);
rio_readinitb(&rio, connfd);
while((n = rio_readlineb(&rio, buf, MAXLINE)) != 0) {
P(&mutex);
byte_cnt += n;
printf("thread %d received %d (%d total) bytes on fd %d\n", (int) pthread_self(), n, byte_cnt, connfd);
V(&mutex);
rio_writen(connfd, buf, n);
}
}
- 经典哲学家进餐问题
线程安全
线程安全函数
线程调用的函数必须是线程安全的
- Def: 一个函数被称为线程安全的,当且仅当被多个并发线程反复调用时,它会一直产 生正确的结果
四类线程不安全函数:
Class 1: 不保护共享变量的函数
- 不保护共享变量
改正: 用信号量PV操作
Example: goodcnt.c
存在问题: 同步操作将减慢程序的执行
- 不保护共享变量
Class 2: 保持跨越多个调用的状态的函数
依赖于跨多个函数的持久状态
Example: 随机数生成器,当前调用的结果依赖于前次调用的中间结果
在参数中传递状态信息
- 以此来消除掉全局状态
- 传入值不使用共享变量
Class 3: 返回指向静态变量的指针的函数
- 返回一个指向静态变量的指针,比如ctime()
- 正在被一个线程使用的结果可能会被另一 个线程悄悄地覆盖了
- Fix 1. 重写函数,使得调用者传递存放结果的变量的地址
- 需要修改调用者和被调用者的源代码
- Fix 2. 加锁-复制(Lock-and-copy)
- 只需对调用者做一些小的修改,并且不需要修改被调用者
- 然而,调用者必须要释放分配的内存( privatep )
- 返回一个指向静态变量的指针,比如ctime()
Class 4: 调用线程不安全函数的函数
- 调用一个线程不安全函数可能会使这个调用者的整个函数都变得线程不安全
- Fix: 如果调用的是第2类函数,那么就只能改写这个函数;如果调用的是第1和第3类函数,可以用互斥锁加以保护
可重入函数
- Def: 一个函数是可重入的,当且仅当它被多个线程调用时,不会引入任何共享数据
- 是线程安全函数的子集
- 不需要同步操作
- 使第2类不安全函数变安全的唯一方法就是将其变为可重入函数
- Def: 一个函数是可重入的,当且仅当它被多个线程调用时,不会引入任何共享数据
竞争(换一种角度看同步互斥问题)
- 由于两个或多个进程使用不能被同时访问的资源,使得这些进程的结果因为时间上的 先后而出现问题,这时就发生了竞争(race)。
- 主线程中 i++ 和对等线程中读取 *vargp 产生竞争:
- 如果在 i=0 时读取,那么结果就是正确的
- 否则,对等线程会得到错误的id
- 消除竞争