如何面对并发下的bug

发布于:2024-05-06 ⋅ 阅读:(25) ⋅ 点赞:(0)

整理总结自蒋炎岩老师的b站课程,https://jyywiki.cn/OS/2022/index.html

并发bug与应对

  • 应对bug的方法

    • 在代码里边增加很多检查(加断言)

      • #include "thread.h"
        
        unsigned long balance = 100;
        
        void Alipay_withdraw(int amt) {
          if (balance >= amt) {
            usleep(1); // unexpected delays
            balance -= amt;
          }
          assert(balance <= 10000000);
        }
        
        void Talipay(int id) {
          Alipay_withdraw(100);
        }
        
        int main() {
          create(Talipay);
          create(Talipay);
          join();
          printf("balance = %lu\n", balance);
        }
        
    • 防御性编程:把程序需要满足的条件用 assert 表达出来。

      • 在这里插入图片描述

      • assert()

      • 在这里插入图片描述

  • 并发bug:死锁 (Deadlock)

    • A deadlock is a state in which each member of a group is waiting for another member, including itself, to take action.

    • ABBA-Deadlock

      • void swap(int i, int j) {
          spin_lock(&lock[i]);
          spin_lock(&lock[j]);
          arr[i] = NULL;
          arr[j] = arr[i];
          spin_unlock(&lock[j]);
          spin_unlock(&lock[i]);
        }
        
      • 上锁的顺序很重要……

        • swap 本身看起来没有问题,swap(1, 2); swap(2, 3), swap(3, 1) → 死锁
    • 避免死锁

      • 死锁产生的四个必要条件

        • 互斥:一个资源每次只能被一个进程使用
        • 请求与保持:一个进程请求资阻塞时,不释放已获得的资源
        • 不剥夺:进程已获得的资源不能强行剥夺
        • 循环等待:若干进程之间形成头尾相接的循环等待资源关系
      • AA-Deadlock

        • 在临界条件检测
        • spinlock-xv6.c中的各种防御性编程
          • if (holding(lk)) panic();
      • ABBA-Deadlock

        • 任意时刻系统中的锁都是有限的

        • 严格按照固定的顺序获得所有锁 (lock ordering; 消除 “循环等待”)

        • class LockOrdering:
              locks = [ '', '', '' ]
          
              def tryacquire(self, lk):
                  self.locks[lk], seen = '🔒', self.locks[lk]
                  return seen == ''
          
              def release(self, lk):
                  self.locks[lk] = ''
          
              @thread
              def t1(self):
                  while True:
                      while not self.tryacquire(0): pass
                      while not self.tryacquire(1): pass
                      while not self.tryacquire(2): pass
                      self.release(0), self.release(1), self.release(2)
          
              @thread
              def t2(self):
                  while True:
                      while not self.tryacquire(1): pass
                      while not self.tryacquire(2): pass
                      self.release(1), self.release(2)
          
  • 并发bug:数据竞争 (Data Race)

    • 不同的线程同时访问同一段内存,且至少有一个是写。

    • 用互斥锁保护好共享数据,消灭一切数据竞争

    • 两种经典错误

      • // Case #1: 上错了锁
        void thread1() { spin_lock(&lk1); sum++; spin_unlock(&lk1); }
        void thread2() { spin_lock(&lk2); sum++; spin_unlock(&lk2); }
        
      • // Case #2: 忘记上锁
        void thread1() { spin_lock(&lk1); sum++; spin_unlock(&lk1); }
        void thread2() { sum++; }
        
  • 更多类型的并发 Bug

    • 忘记上锁——原子性违反 (Atomicity Violation, AV)

      • 我以为一段代码没啥事呢,但被人强势插入了

      • 在这里插入图片描述

      • 在这里插入图片描述

    • 忘记同步——顺序违反 (Order Violation, OV)

      • 在这里插入图片描述

      • 由于S4在S2之后执行,当S3执行时,造成了错误的结果

  • 应对并发 Bug 的方法

    • Lockdep: 运行时的死锁检查:lock/unlock

      • 为每一个锁确定唯一的 “allocation site”
        • lock-site.c
        • 为所分配了一个唯一性id,然后程序运行时存储上锁的顺序
        • assert: 同一个 allocation site 的锁存在全局唯一的上锁顺序
      • 检查方法:printf
        • 记录所有观察到的上锁顺序,例如[x,y,z]⇒xy,xz,yz
        • 检查是否存在 xyyx
    • ThreadSanitizer: 运行时的数据竞争检查:内存访问 + lock/unlock

      • 为所有事件建立 happens-before 关系图
        • Program-order + release-acquire
        • 对于发生在不同线程且至少有一个是写的 x,y 检查
        • 在这里插入图片描述
    • 动态分析工具:Sanitizers

    • 防御性编程:低配版 Lockdep

      • 给程序获取锁一个不可能超过的超时时间,当超过后打印日志,配合调试器和线程backtrace一秒锁定死锁

      • int spin_cnt = 0;
        while (xchg(&locked, 1)) {
            //自选100000000次,即打印日志
          if (spin_cnt++ > SPIN_LIMIT) {
            printf("Too many spin @ %s:%d\n", __FILE__, __LINE__);
          }
        }