【操作系统】同步互斥与Golang互斥锁实现

发布于:2024-09-06 ⋅ 阅读:(140) ⋅ 点赞:(0)

1 背景

现代操作系统的特性之一是允许多个程序“一起”运行。

  • 抽象进程/线程概念用于支持多程序的设计
  • CPU调度是实现多程序的机制
  • 不同调度算法是不同的策略

视角回到线程层面,那为什么程序需要多线程合作执行

1.1 独立线程

程序是独立执行

  • 不和其他线程共享资源或状态
  • 确定性(输入状态决定结果)
  • 可重现(能够重现起始条件,I/O)
  • 调度顺序不重要

1.2 合作线程

程序内多个线程共同执行实现软件目标

  • 在多个线程中共享状态
  • 不确定性
  • 不可重现

不确定性和不可重现意味着bug可能是间歇性发生

1.3 合作有风险,为什么需要合作

一种工具或实现目标的手段、技术在不同的场景下,有各自的优势和劣势;当正向收益大于风险,在能够适当规避风险、理解风险、解决潜在风险问题前提下,可以考虑引入来解决自己的场景下的主要问题。
正向收益的因素和风险因素很多,具体场景具体分析

对于多线程合作,程序收获了如下优势:

  • 共享资源
  • 加速(I/O操作和计算并行、后台阻塞不影响前台响应等)
  • 模块化、扩展性
  • 简化问题建模和设计(读者-写者、生产-消费模型等)
  • 异步(创建异步网络连接提升系统吞吐量和效率)

1.4 多协程并发执行的风险举例(Golang语言)

那风险是什么,如何解决呢?以Golang语言举例如下:

package main

import (
	"fmt"
	"sync"
)

func main() {
	var count = 0
	// 使用WaitGroup等待10个goroutine完成
	var wg sync.WaitGroup
	wg.Add(10)
	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			// 对变量count执行10次加1
			for j := 0; j < 100000; j++ {
				count++
			}
		}()
	}
	// 等待10个goroutine完成
	wg.Wait()
	fmt.Println(count)
}

使用 sync.WaitGroup 来等待所有的 goroutine 执行完毕后,再输出最终的结果。但是每次运行,你都可能得到不同的结果,基本上不会得到理想中的一百万的结果。
在这里插入图片描述
这是因为,count++ 不是一个原子操作,它至少包含几个步骤,比如读取变量 count 的当前值,对这个值加 1,把结果再保存到 count 中。因为不是原子操作,就可能有并发的问题。

注:Go通过桩代码形式可以检测race
Go race detector 是基于 Google 的 C/C++ sanitizers 技术实现的,编译器通过探测所有的内存访问,加入代码能监视对这些内存地址的访问(读还是写)。在代码运行的时候,race detector 就能监控到对共享变量的非同步访问,出现 race 的时候,就会打印出警告信息。
在这里插入图片描述

1.5 对风险的思考

  • 无论多线程指令怎样交替执行,程序必须按照符合预期的情况正常执行
    • 不确定性和不可重现
    • 不经过专门设计,调试难度高
  • 不确定性要求并行程序的正确性
    • 先考虑清楚问题,在设计程序行为
    • 切忌急于着手编写代码,碰到问题再调试

由于上述1.4产生的异常现象(称之为竞态条件Race Condition)

原因是结果依赖于并发执行或者事件的顺序/时间,导致不确定性、不可重现

那如何避免竞态?

  • 让指令不被打断(比如上述的count++不被打断)

不被打断的方法

  • 原子操作(Atomic Operation)—不可被打断操作(一次不存在任何中断或者失败的执行)
    • 该执行成功结束
    • 或者根本没有执行
    • 并且不应该发现任何部分执行的状态
      实际上操作往往不是原子的

有些看上去是原子操作,实际不是,连x++这样简单的语句,实际上是由3条指令造成的,有时候甚至连条单条机器指令都不是原子的

2 同步互斥

这就是为什么要引入同步互斥这些机制的原因

2.1 一些概念

  • 临界区(Critical section)
    临界区是指进程中的一段需要访问共享资源并且当另一个进程处于相应代码区域时便不会被执行的代码区域。简单来说,就是访问共享资源的那段代码就是临界区。

  • 互斥(Mutual exclusion)
    当一个进程处于临界区并访问共享资源时,没有其他进程会处于临界区并且访问任何相同的共享资源。

  • 死锁(Dead lock)
    两个或以上的进程,在互相等待完成特定任务,而最终没法将自身任务进行下去。

  • 饥饿(Starvation)
    一个可执行的进程,被调度器持续忽略,以至于虽然处于可执行状态却不被执行

2.2 解决方案——保护临界区

为每一个执行对象保护一段“临界区”代码。

使用临界区的思想,问题就可以较好的解决。有了临界区的代码之后,就可以确保任何时候只有一个对象在临界区中执行,其他对象在外面等待,知道临界区中的对象离开,其他进程中的一个会进入临界区去执行。这个是比较合理的一个实现。

设计的原则,临界区中执行所拥有的属性

  • 互斥:同一个时间临界区中最多存在一个线程
  • 前进(Progress):如果一个线程想要进入临界区,那么它最终会成功,不会一直的死等。
  • 有限等待:如果一个线程i处于入口区,那么在i的请求被接受之前,其他线程进入临界区的时间是有限制的。如果是无限等待,就会出现饥饿状态,是Progress前进状态的一种进一步补充。
  • 忙等(可选属性):如果一个进程在等待进入临界区,那么在它可以进入之前会被挂起。

忙等:不需要上下文切换,但是利用率低,适用与临界区执行时间短的情况。
不忙等:需要上下文切换,上下文切换开销比较大大,适用于临界区很长,远远大于上下文切换所需要的开销。

2.3 禁用硬件中断

没有中断,也就是没有了上下文切换,因此没有并发。进入临界区时禁用中断,离开临界区时开启中断。这个方法是可以解决问题的。

缺点
1)一旦中断被禁用,线程就无法被停止
2)整个系统都会为你停下来
3)可能导致其他线程处于饥饿状态
4)要是临界区可以任意长,则无法限制响应中断所需的时间(可能存在硬件影响)

需要注意:
执行这种可屏蔽中断的指令,只是把自身的响应中断的能力屏蔽了,并不意味着也将其他cpu的响应中断能力屏蔽,所以其实其他的cpu还是可以继续产生中断,所以在多cpu的情况下是无法解决互斥问题的。

2.4 纯软件方法

有很多种方法

  • 单标志法
  • 双标志
  • 双标志后检查
  • Peterson算法

这里举例一下单标志和Peterson算法

单标志算法
两个进程在访问完临界区后会把使用临界区的权限转交给另一个进程。也就是说每个进程进入临界区的权限只能被另一个进程赋予
在这里插入图片描述
Peterson算法
在这里插入图片描述
flag用于表示是否有进入临界区的意愿,turn用来表示优先让哪个进程进入临界区。

以P0进程为例,若P0想要访问临界区,会把flag设为true,同时把turn的值设为对方的编号,也就表示可以优先让对方使用临界资源。

while(flag[1] && turn==1)用来检查P1是否想用,若对方想用,那么P0就会停留在while中,若P1不想使用,那么P0就会进入临界区,使用完后,就会将flag[0]=false

总结:
1)即使是针对两个进程的解决竞态的实现还是比较复杂的。
2)需要忙等待,浪费cpu时间。
3)没有硬件包装的情况下无真正的软件解决方案。对硬件的1需求比较低(只需要load操作和store是原子操作即可)

2.5 硬件原子操作

2.5.1 TestAndSet(TS指令/TSL指令)

简称 TS 指令,也有地方称为TestAndSetLock指令,或 TSL指令

TSL指令是用硬件实现的,执行的过程不允许被中断,只能一气呵成。其实现原子性的原理是:

执行TSL指令的CPU锁住内存总线,以禁止其他CPU在本指令结束之前访问内存。由于TSL指令是原子操作,所以不需要关中断来保证其不被打断(下面的swap指令同理),如果真的while指令在关中断状态下执行,那么TSL一直为true,不再开中断,系统可能会因此终止。

以下是用C语言描述的逻辑:
在这里插入图片描述
若当前临界区已经被加锁,那么在while循环中会一直为true,一直到lock被当前进程在退出区改为false,那么跳出while循环,并且该进程访问临界资源,直到此进程在退出区lock=false,临界资源被解锁,即:
在这里插入图片描述

2.5.2 SWAP

swap 指令是用硬件实现的,执行的过程不允许被中断,只能一气呵成。以下是用C语言描述的逻辑:
在这里插入图片描述
逻辑上来看 Swap和 TSL并无太大区别,都是先记录下此时临界区是否已经被上锁(记录在 old 变量上),再将上锁标记lock 设置为true,最后检查 old,如果old为false, 则说明之前没有别的进程对临界区上锁,则可跳出循环,进入临界区。

以下while循环对应两种情况:
① lock=false,swap执行一次,把lock=true,即上锁,old=false,即可以往下执行
② lock=true,一直停留在while循环中,即一直执行swap,直到lock=false后执行①
在这里插入图片描述
总结:
TSL指令和swap指令并没有处于阻塞态的进程,等待进入临界区的进程一直停留在执行while循环中,不会主动放弃CPU,一直处在运行态,直到该进程的时间片用完放弃处理机,转为就绪态,此时切换另一个就绪态的进程占用处理机(就绪—>运行)

  • 优点:实现简单,无需像软件实现方法那样严格检查是否会有逻辑漏洞;适用于多处理机环境。
  • 缺点:不满足“让权等待”原则,暂时无法进入临界区的进程会占用CPU并循环执行TSL指令,从而导致“忙等”

3 更高级的抽象——互斥锁

互斥锁(排它锁)就很好地解决了临界区问题,在 Go 标准库中,它提供了 Mutex 来实现互斥锁这个功能

3.1 Golang——Mutex

Locker 的接口定义了锁同步原语的方法集:

type Locker interface {
    Lock()
    Unlock()
}

互斥锁 Mutex 提供两个方法 Lock 和 Unlock:进入临界区之前调用 Lock 方法,退出临界区的时候调用 Unlock 方法

 func(m *Mutex)Lock()
 func(m *Mutex)Unlock()

当一个 goroutine 通过调用 Lock 方法获得了这个锁的拥有权后, 其它请求锁的 goroutine 就会阻塞在 Lock 方法的调用上,直到锁被释放并且自己获取到了这个锁的拥有权。

3.2 Mutex解决data race

问题参考1.4,共享资源是 count 变量,临界区是 count++,只要在临界区前面获取锁,在离开临界区的时候释放锁,就能完美地解决 data race 的问题

	package main
	
    import (
        "fmt"
        "sync"
    )

    func main() {
        // 互斥锁保护计数器
        var mu sync.Mutex
        // 计数器的值
        var count = 0
        // 辅助变量,用来确认所有的goroutine都完成
        var wg sync.WaitGroup
        wg.Add(10)
        // 启动10个gourontine
        for i := 0; i < 10; i++ {
            go func() {
                defer wg.Done()
                // 累加10万次
                for j := 0; j < 100000; j++ {
                    mu.Lock()
                    count++
                    mu.Unlock()
                }
            }()
        }
        wg.Wait()
        fmt.Println(count)
    }

3.3 Mutex源码实现

阅读 Go 标准库里 Mutex 的源代码,并且追溯 Mutex 的演进历史,互斥锁从一个简单易于理解的实现,到一个非常复杂的数据结构,这是一个逐步完善的过程。Go 开发者们做了种种努力,精心设计。

3.3.1 初版Mutex

   // CAS操作,当时还没有抽象出atomic包
    func cas(val *int32, old, new int32) bool
    func semacquire(*int32)
    func semrelease(*int32)
    // 互斥锁的结构,包含两个字段
    type Mutex struct {
        key  int32 // 锁是否被持有的标识
        sema int32 // 信号量专用,用以阻塞/唤醒goroutine
    }
    
    // 保证成功在val上增加delta的值
    func xadd(val *int32, delta int32) (new int32) {
        for {
            v := *val
            if cas(val, v, v+delta) {
                return v + delta
            }
        }
        panic("unreached")
    }
    
    // 请求锁
    func (m *Mutex) Lock() {
        if xadd(&m.key, 1) == 1 { //标识加1,如果等于1,成功获取到锁
            return
        }
        semacquire(&m.sema) // 否则阻塞等待
    }
    
    func (m *Mutex) Unlock() {
        if xadd(&m.key, -1) == 0 { // 将标识减去1,如果等于0,则没有其它等待者
            return
        }
        semrelease(&m.sema) // 唤醒其它阻塞的goroutine
    }    

在这里插入图片描述

Lock:

  • 调用 Lock 请求锁的时候,通过 xadd 方法进行 CAS 操作
  • xadd 方法通过循环执行 CAS 操作直到成功,保证对 key 加 1 的操作成功完成
  • 如果比较幸运,锁没有被别的 goroutine 持有,那么,Lock 方法成功地将 key 设置为 1,这个 goroutine 就持有了这个锁
  • 如果锁已经被别的 goroutine 持有了,那么,当前的 goroutine 会把 key 加 1,而且还会调用 semacquire 方法,使用信号量将自己休眠,等锁释放的时候,信号量会将它唤醒。

UnLock:

  • 持有锁的 goroutine 调用 Unlock 释放锁时,它会将 key 减 1
  • 如果当前没有其它等待这个锁的 goroutine,这个方法就返回了
  • 但是,如果还有等待此锁的其它 goroutine,那么,它会调用 semrelease 方法,利用信号量唤醒等待锁的其它 goroutine 中的一个。

注意点:Unlock 方法可以被任意的 goroutine 调用释放锁,即使是没持有这个互斥锁的 goroutine,也可以进行这个操作。这是因为,Mutex 本身并没有包含持有这把锁的 goroutine 的信息,所以,Unlock 也不会对此进行检查。Mutex 的这个设计一直保持至今

谁申请,谁释放
在实践中使用互斥锁很少在一个方法中单独申请锁,而在另外一个方法中单独释放锁,一般都会在同一个方法中获取锁和释放锁

3.3.2 当前处于调度状态的新G优先抢锁

   type Mutex struct {
        state int32
        sema  uint32
    }


    const (
        mutexLocked = 1 << iota // mutex is locked
        mutexWoken
        mutexWaiterShift = iota
    )

   func (m *Mutex) Lock() {
        // Fast path: 幸运case,能够直接获取到锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }

        awoke := false
        for {
            old := m.state
            new := old | mutexLocked // 新状态加锁
            if old&mutexLocked != 0 {
                new = old + 1<<mutexWaiterShift //等待者数量加一
            }
            if awoke {
                // goroutine是被唤醒的,
                // 新状态清除唤醒标志
                new &^= mutexWoken
            }
            if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
                if old&mutexLocked == 0 { // 锁原状态未加锁
                    break
                }
                runtime.Semacquire(&m.sema) // 请求信号量
                awoke = true
            }
        }
    }

   func (m *Mutex) Unlock() {
        // Fast path: drop lock bit.
        new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志
        if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁
            panic("sync: unlock of unlocked mutex")
        }
    
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者,或者有唤醒的waiter,或者锁原来已加锁
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken // 新状态,准备唤醒goroutine,并设置唤醒标志
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime.Semrelease(&m.sema)
                return
            }
            old = m.state
        }
    }

**加粗样式**

Lock:

  • 首先是通过 CAS 检测 state 字段中的标志,如果没有 goroutine 持有锁,也没有等待持有锁的 gorutine,那么,当前的 goroutine 就很幸运,可以直接获得锁,这也是注释中的 Fast path 的意思。
  • 如果想要获取锁的 goroutine 没有机会获取到锁,就会进行休眠
  • 在锁释放唤醒之后,它并不能像先前一样直接获取到锁
  • 唤醒后的goroutine还是要和正在请求锁的 goroutine 进行竞争,让 CPU 中正在执行的 goroutine 有更多的机会获取到锁,在一定程度上提高了程序的性能
  • for 循环是不断尝试获取锁,如果获取不到,就通过 runtime.Semacquire(&m.sema) 休眠,休眠醒来之后 awoke 置为 true,尝试争抢锁

请求锁的 goroutine 有两类,一类是新来请求锁的 goroutine,另一类是被唤醒的等待请求锁的 goroutine。锁的状态也有两种:加锁和未加锁。

Unlock:

  • 尝试将持有锁的标识设置为未加锁的状态,这是通过减 1 而不是将标志位置零的方式实现。
  • 还会检测原来锁的状态是否已经未加锁的状态,如果是 Unlock 一个未加锁的 Mutex 会直接 panic。
  • 一些等待这个锁的 goroutine(有时候称之为 waiter)需要通过信号量的方式唤醒它们中的一个
    • 第一种情况,如果没有其它的 waiter,说明对这个锁的竞争的 goroutine 只有一个,那就可以直接返回了;如果这个时候有唤醒的 goroutine,或者是又被别人加了锁,那么,无需我们操劳,其它 goroutine 自己干得都很好,当前的这个 goroutine 就可以放心返回
    • 第二种情况,如果有等待者,并且没有唤醒的 waiter,那就需要唤醒一个等待的 waiter。在唤醒之前,需要将 waiter 数量减 1,并且将 mutexWoken 标志设置上,这样,Unlock 就可以返回了

相对于初版的设计,这次的改动主要就是,新来的 goroutine 也有机会先获取到锁,甚至一个 goroutine 可能连续获取到锁,打破了先来先得的逻辑。但是,代码复杂度也显而易见。

3.3.3 更好的利用调度性能

当前唤醒的goroutine和新groutine抢不到锁直接休眠,发生系统调用,浪费系统性能(当前CPU调度该G中);引入忙等方式进行自旋,实践中结合了上述的 忙等+阻塞的优势

忙等适合临界区代码很短的情况
阻塞适合临界区代码不确定的情况

在 2015 年 2 月的改动中,如果新来的 goroutine 或者是被唤醒的 goroutine 首次获取不到锁,它们就会通过自旋(spin,通过循环不断尝试,spin 的逻辑是在runtime 实现的)的方式,尝试检查锁是否被释放。在尝试一定的自旋次数后,再执行原来的逻辑

   func (m *Mutex) Lock() {
        // Fast path: 幸运之路,正好获取到锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }

        awoke := false
        iter := 0
        for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
            old := m.state // 先保存当前锁的状态
            new := old | mutexLocked // 新状态设置加锁标志
            if old&mutexLocked != 0 { // 锁还没被释放
                if runtime_canSpin(iter) { // 还可以自旋
                    if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                        awoke = true
                    }
                    runtime_doSpin()
                    iter++
                    continue // 自旋,再次尝试请求锁
                }
                new = old + 1<<mutexWaiterShift
            }
            if awoke { // 唤醒状态
                if new&mutexWoken == 0 {
                    panic("sync: inconsistent mutex state")
                }
                new &^= mutexWoken // 新状态清除唤醒标记
            }
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                if old&mutexLocked == 0 { // 旧状态锁已释放,新状态成功持有了锁,直接返回
                    break
                }
                runtime_Semacquire(&m.sema) // 阻塞等待
                awoke = true // 被唤醒
                iter = 0
            }
        }
    }

对于临界区代码执行非常短的场景来说,这是一个非常好的优化。因为临界区的代码耗时很短,锁很快就能释放,而抢夺锁的 goroutine 不用通过休眠唤醒方式等待调度,直接 spin 几次,可能就获得了锁

3.3.4 饥饿处理

因为新来的 goroutine 也参与竞争,有可能每次都会被新来的 goroutine 抢到获取锁的机会,在极端情况下,等待中的 goroutine 可能会一直获取不到锁,这就是饥饿问题。

Mutex 不能容忍这种事情发生。所以,2016 年 Go 1.9 中 Mutex 增加了饥饿模式让锁变得更公平,等待时间限制在 1 毫秒 ,并且修复了一个大 Bug:总是把唤醒的 goroutine 放在等待队列的尾部,会导致更加不公平的等待时间
在这里插入图片描述
只需要记住,Mutex 绝不容忍一个 goroutine 被落下,永远没有机会获取锁。不抛弃不放弃是它的宗旨,而且它也尽可能地让等待较长的 goroutine 更有机会获取到锁

   type Mutex struct {
        state int32
        sema  uint32
    }
    
    const (
        mutexLocked = 1 << iota // mutex is locked
        mutexWoken
        mutexStarving // 从state字段中分出一个饥饿标记
        mutexWaiterShift = iota
    
        starvationThresholdNs = 1e6
    )
    
    func (m *Mutex) Lock() {
        // Fast path: 幸运之路,一下就获取到了锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }
        // Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
        m.lockSlow()
    }
    
    func (m *Mutex) lockSlow() {
        var waitStartTime int64
        starving := false // 此goroutine的饥饿标记
        awoke := false // 唤醒标记
        iter := 0 // 自旋次数
        old := m.state // 当前的锁的状态
        for {
            // 锁是非饥饿状态,锁还没被释放,尝试自旋
            if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
                continue
            }
            new := old
            if old&mutexStarving == 0 {
                new |= mutexLocked // 非饥饿状态,加锁
            }
            if old&(mutexLocked|mutexStarving) != 0 {
                new += 1 << mutexWaiterShift // waiter数量加1
            }
            if starving && old&mutexLocked != 0 {
                new |= mutexStarving // 设置饥饿状态
            }
            if awoke {
                if new&mutexWoken == 0 {
                    throw("sync: inconsistent mutex state")
                }
                new &^= mutexWoken // 新状态清除唤醒标记
            }
            // 成功设置新状态
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
                if old&(mutexLocked|mutexStarving) == 0 {
                    break // locked the mutex with CAS
                }
                // 处理饥饿状态

                // 如果以前就在队列里面,加入到队列头
                queueLifo := waitStartTime != 0
                if waitStartTime == 0 {
                    waitStartTime = runtime_nanotime()
                }
                // 阻塞等待
                runtime_SemacquireMutex(&m.sema, queueLifo, 1)
                // 唤醒之后检查锁是否应该处于饥饿状态
                starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
                old = m.state
                // 如果锁已经处于饥饿状态,直接抢到锁,返回
                if old&mutexStarving != 0 {
                    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                        throw("sync: inconsistent mutex state")
                    }
                    // 有点绕,加锁并且将waiter数减1
                    delta := int32(mutexLocked - 1<<mutexWaiterShift)
                    if !starving || old>>mutexWaiterShift == 1 {
                        delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清除饥饿标记
                    }
                    atomic.AddInt32(&m.state, delta)
                    break
                }
                awoke = true
                iter = 0
            } else {
                old = m.state
            }
        }
    }
    
    func (m *Mutex) Unlock() {
        // Fast path: drop lock bit.
        new := atomic.AddInt32(&m.state, -mutexLocked)
        if new != 0 {
            m.unlockSlow(new)
        }
    }
    
    func (m *Mutex) unlockSlow(new int32) {
        if (new+mutexLocked)&mutexLocked == 0 {
            throw("sync: unlock of unlocked mutex")
        }
        if new&mutexStarving == 0 {
            old := new
            for {
                if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                    return
                }
                new = (old - 1<<mutexWaiterShift) | mutexWoken
                if atomic.CompareAndSwapInt32(&m.state, old, new) {
                    runtime_Semrelease(&m.sema, false, 1)
                    return
                }
                old = m.state
            }
        } else {
            runtime_Semrelease(&m.sema, true, 1)
        }
    }

增加饥饿模式,将饥饿模式的最大等待时间阈值设置成了 1 毫秒,这就意味着,一旦等待者等待的时间超过了这个阈值,Mutex 的处理就有可能进入饥饿模式,优先让等待者先获取到锁,新来的同学主动谦让一下,给老同志一些机会。通过加入饥饿模式,可以避免把机会全都留给新来的 goroutine,保证了请求锁的 goroutine 获取锁的公平性,对于使用锁的业务代码来说,不会有业务一直等待锁不被处理。

4 经典同步问题

4.1 读写者问题

  • 读者优先——读写者问题
  • 写者优先——读写者问题

动机: 共享数据的访问

两种类型的使用者
1)读者: 不需要修改数据
2)写者: 读取和修改数据

问题的约束
1)允许同一时间有多个读者,但在任何时候只有一个写者
2)当没有写者时读者才能访问数据
3)当没有读者和写者时写者才能访问数据
4)在任何时候只能有一个线程可以操作共享变量
5)读者优先,不按时间顺序 or 写者优先,不按时间顺序

无论读者优先还是写者优先,总会导致对应的写者 或 读者 请求延迟处理,这里介绍Golang的RWMutex方案

4.1.1 为什么不直接用Mutex而是RWMutex

使用Mutex来保证读写共享资源的安全性是可行的,但这在某些情况下有点“浪费”。比如说,在写少读多的情况下,即使一段时间内没有写操作,大量并发的读访问也不得不在 Mutex 的保护下变成了串行访问,这个时候,使用 Mutex,对性能的影响就比较大。

Golang是如何解决上述问题呢——RWMutex锁来区分读写操作,其实也是典型的读写者问题的一种工程实践,结合了写者优先和读者优先(个人理解哈,也有说RWMutex是写者优先的实践), 简单流程如下:

  • 当发生写操作,写与写是串行互斥的(使用Mutex实现)
  • 如果当前写操作检查发现没有读操作,则执行
  • 发现有读操作,则将已有的读操作reader计数赋值给waiter,waiter是写操作想继续执行时必须要等带完成的读操作数
  • 对于当前的写操作,必须等写操作之前所有的读操作(waiter)结束,由最后一个等待的读操作来通过信号量方式release这个写操作
  • 当前的写操作在阻塞等待中时,后续的读操作来了,阻塞起来,作为reader计数,等到当前的写操作完成,通过信号量来release所有读操作
  • 当读操作完成,reader计数减1,同时判断当前reader状态,是否存在写操作阻塞(源码里通过一个偏移实现),存在则判断当前waiter计数是否为0,0则release写操作

4.1.2 RWMutex用法

先看看用法,Golang代码如下

func main() {
    var counter Counter
    for i := 0; i < 10; i++ { // 10个reader
        go func() {
            for {
                counter.Count() // 计数器读操作
                time.Sleep(time.Millisecond)
            }
        }()
    }

    for { // 一个writer
        counter.Incr() // 计数器写操作
        time.Sleep(time.Second)
    }
}
// 一个线程安全的计数器
type Counter struct {
    mu    sync.RWMutex
    count uint64
}

// 使用写锁保护
func (c *Counter) Incr() {
    c.mu.Lock()
    c.count++
    c.mu.Unlock()
}

// 使用读锁保护
func (c *Counter) Count() uint64 {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.count
}

如果遇到可以明确区分 reader 和 writer goroutine 的场景,且有大量的并发读、少量的并发写,并且有强烈的性能需求,就可以考虑使用读写锁 RWMutex 替换 Mutex

4.1.3 RWMutex源码

type RWMutex struct {
  w           Mutex   // 互斥锁解决多个writer的竞争
  writerSem   uint32  // writer信号量
  readerSem   uint32  // reader信号量
  readerCount int32   // reader的数量
  readerWait  int32   // writer等待完成的reader的数量
}

const rwmutexMaxReaders = 1 << 30

RWMutex 包含一个 Mutex,以及四个辅助字段 writerSem、readerSem、readerCount 和 readerWait

  • 字段 w:为 writer 的竞争锁而设计
  • 字段 readerCount:记录当前 reader 的数量(以及是否有 writer 竞争锁)
  • readerWait:记录 writer 请求锁时需要等待 read 完成的 reader 的数量
  • writerSem 和 readerSem:都是为了阻塞设计的信号量
  • 这里的常量 rwmutexMaxReaders,定义了最大的 reader 数量。

读操作源码

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
            // rw.readerCount是负值的时候,意味着此时有writer等待请求锁,因为writer优先级高,所以把后来的reader阻塞休眠
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}
func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        rw.rUnlockSlow(r) // 有等待的writer
    }
}
func (rw *RWMutex) rUnlockSlow(r int32) {
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // 最后一个reader了,writer终于有机会获得锁了
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

对 reader 计数加 1。比较困惑的是,readerCount 怎么还可能为负数呢?
因为readerCount 这个字段有双重含义:没有 writer 竞争或持有锁时,readerCount 和我们正常理解的 reader 的计数是一样的;但是,如果有 writer 竞争锁或者持有锁时,那么,readerCount 不仅仅承担着 reader 的计数功能,还能够标识当前是否有 writer 竞争或持有锁,在这种情况下,请求锁的 reader 的阻塞等待锁的释放。

调用 RUnlock 需要将 Reader 的计数减去 1,因为 reader 的数量减少了一个。
但是,AddInt32 的返回值还有另外一个含义。如果它是负值,就表示当前有 writer 竞争锁,在这种情况下,还会调用 rUnlockSlow 方法,检查是不是 reader 都释放读锁了,如果读锁都释放了,那么可以唤醒请求写锁的 writer 了。

当一个或者多个 reader 持有锁的时候,竞争锁的 writer 会等待这些 reader 释放完,才可能持有这把锁

写操作源码

func (rw *RWMutex) Lock() {
    // 首先解决其他writer竞争问题
    rw.w.Lock()
    // 反转readerCount,告诉reader有writer竞争锁
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // 如果当前有reader持有锁,那么需要等待
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}


func (rw *RWMutex) Unlock() {
    // 告诉reader没有活跃的writer了
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    
    // 唤醒阻塞的reader们
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // 释放内部的互斥锁
    rw.w.Unlock()
}

为了避免 writer 之间的竞争,RWMutex 就会使用一个 Mutex 来保证 writer 的互斥。一旦一个 writer 获得了内部的互斥锁,就会反转 readerCount 字段,把它从原来的正整数 readerCount(>=0) 修改为负数(readerCount-rwmutexMaxReaders),让这个字段保持两个含义(既保存了 reader 的数量,又表示当前有 writer)

还会记录当前活跃的 reader 数量,所谓活跃的 reader,就是指持有读锁还没有释放的那些 reader。

如果 readerCount 不是 0,就说明当前有持有读锁的 reader,RWMutex 需要把这个当前 readerCount 赋值给 readerWait 字段保存下来(第 7 行), 同时,这个 writer 进入阻塞等待状态。每当一个 reader 释放读锁的时候(调用 RUnlock 方法时),readerWait 字段就减 1,直到所有的活跃的 reader 都释放了读锁,才会唤醒这个 writer

这里我的理解是:

  • 对于当前写操作完成后,后续的写操作和读操作请求过来后,看读操作和写操作谁能先执行原子操作
    • 写操作Lock() r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    • 读操作 RLock() if atomic.AddInt32(&rw.readerCount, 1) < 0
  • 若读操作先执行,那么写操作继续阻塞等到读操作waiter都完成,所以我认为这里不是单纯的写操作优先,而是在上一个写完成后,读和写自行抢夺优先级;(也有人认为是写操作优先)
  • 如果当前写完成后,自动移交优先级给下一个写操作,那么就是写者优先,但看源码不是这样的

继续看写操作释放

当一个 writer 释放锁的时候,它会再次反转 readerCount 字段。
因为当前锁由 writer 持有,所以,readerCount 字段是反转过的,并且减去 rwmutexMaxReaders变成负数。
所以反转方法就是给它增加 rwmutexMaxReaders 。
writer 释放锁,需要唤醒之后新来的 reader,不必再阻塞它们
在 RWMutex 的 Unlock 返回之前,需要把内部的互斥锁释放。释放完毕后,其他 writer 才可以继续竞争这把锁。

4.2 哲学家进餐问题(TO DO)

在这里插入图片描述
哲学家就餐问题

参考文献

1 信号量研究
2 操作系统(8)—进程的同步与互斥以及信号量机制
3【操作系统】第九章同步互斥问题
4【操作系统】第十章信号量与管程
5 Go race detector基于Google 的 C/C++ sanitizers 技术
6 Mutex:庖丁解牛看实现
7 RWMutex:读写锁的实现原理及避坑指南


网站公告

今日签到

点亮在社区的每一天
去签到