暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Golang 之 Mutex 源码详解

李木子啊 2021-08-23
1018

前言

Mutex(mutual exclusion)是 golang 提供的同步工具,用于处理多个协程之间对同一块数据(临界区)并发访问的问题,它是互斥锁的一种实现。

Mutex 的使用很简单,我们只需要在访问临界区之前执行加锁操作,在退出临界区之前执行解锁操作即可,示例代码如下:

    var i = 0
    var mu sync.Mutex


    func Inc() {
    mu.Lock() // 加锁
    i++ // i++ 不是一个原子操作
    mu.Unlock() // 解锁
    }
    复制

    除此之外,Mutex 在 golang 并发编程中的地位很高,是很多并发工具实现的基础。接下来就让我们走入 Mutex 源码,具体解读一下如何实现一个完善的互斥锁工具。

    以下源码基于 go1.17 的实现

    Mutex 的实现大量使用了信号量这一同步原语,我们先简单了解一些信号量相关的知识。

    信号量

    信号量又称 PV 原语,它并不是 golang 或者某些编程语言特有的能力,而是被广泛地运用到不同的操作系统中,是并发编程领域的一个重要模型。

    信号量模型可以简单概括为三个部分,分别是一个计数器、一个等待队列和三个操作计数器和等待队列的方法:

    init()
    : 设置计数器的初始值;
    down()
    : P 操作,将计数器的值减 1;减 1 之后如果计数器的值小于 0,那么当前线程被阻塞,否则当前线程可以继续执行;
    up()
    : V 操作,将计数器的值加 1;加 1 之后如果计数器的值小于或等于 0,代表依然有线程被阻塞,那么唤醒等待队列中的一个线程,并将其从等待队列中移除。

    这三个方法中,down()
     和 up()
     是成对出现的,并且是先调用 down()
     获得锁,处理完成再调用 up()
     释放锁,这样的话,如果计数器的值 > 0,则意味着没有阻塞的线程,up() 方法也就没必要去唤醒等待队列中的一个线程。

    golang sync 包中提供了以上对于信号量的操作方法,方法声明在 sync/runtime.go 中,实际的实现在 runtime/sema.go 中:

      // sync/runtime.go


      // P 操作,等待 *s > 0,然后原子递减它
      func runtime_Semacquire(s *uint32)


      // 和 runtime_Semacquire 作用类似,用来阻塞互斥对象
      // lifo:如果为 true 的话,将会被插入在等待队列的头部,将会被第一个唤醒
      // skipframes:从 runtime_SemacquireMutex 的调用者开始计算跟踪期间要忽略的帧数
      func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)


      // V 操作,阻塞等待被唤醒
      // handoff:如果为 false,那么会传递信号到队列头部的 waiter
      // skipframes:从 runtime_Semrelease 的调用者开始计算跟踪期间要忽略的帧数
      func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
      复制

      信号量作为并发编程领域的重要手段,并没有在 golang 标准库中直接对外提供,而是通过第三方的扩展库提供出来的,感兴趣的同学可以自行去研究,传送门[1]

      好了,关于信号量的部分限于篇幅暂时讲这么多,接下来我们开始步入正题。

      Mutex 的设计与实现

      Mutex 对外暴露了一个结构体和两个方法。

      其中 Lock()
       和 Unlock()
       两个方法实现了 Locker 接口,分别用于加锁和解锁操作:

        type Locker interface {
        Lock() // 加锁
        Unlock() // 解锁
        }
        复制

        Mutex 结构体

          // A Mutex is a mutual exclusion lock.
          // The zero value for a Mutex is an unlocked mutex.
          //
          // A Mutex must not be copied after first use.
          type Mutex struct {
          // 锁状态,保护四部分含义,下面会进行详细解读
          state int32
          // 信号量,用于阻塞等待或者唤醒
          sema uint32
          }
          复制

          Mutex 的 state 属性是一个 32 位的整型变量,它被分为四部分含义:

          Locked:表示该 mutex 是否被锁定,0 表示没有,1 表示处于锁定状态;Woken:表示是否有协程被唤醒,0 表示没有,1 表示有协程处于唤醒状态,并且在加锁过程中;Starving:Go1.9 版本之后引入,表示 mutex 是否处于饥饿状态,0 表示没有,1 表示有协程处于饥饿状态;Waiter: 等待锁的协程数量。

            const (
            // 标识 mutex 被锁住,在低位,值 1
            mutexLocked = 1 << iota
            // 标识有协程被唤醒,处于 state 中的第二个 bit 位,值 2
            mutexWoken
            // 标识 mutex 处于饥饿模式,处于 state 中的第三个 bit 位,值 4
            mutexStarving
            // 值 3,state 值通过右移三位可以得到 waiter 的数量
            // 同理,state += 1 << mutexWaiterShift,可以累加 waiter 的数量
            mutexWaiterShift = iota
            // 标识协程处于饥饿状态的最长阻塞时间,当前被设置为 1ms
            starvationThresholdNs = 1e6
            )
            复制

            Locker 和 Waiter 好理解,那么中间两个状态我们该怎么理解呢?

            其实这两个状态以及 Waiter 的值都是在 Mutex 的版本迭代中加进去的。

            Woken

            Woken 是在 2011 年的时候引入的,同时还包括 Waiter,也正是在这个时候,state (之前是 key 字段) 被赋予了组合含义。

            引入它的主要原因就是在老版本的实现中有一个痛点:请求锁的协程需要排队等待,这种机制是一种比较公平的机制,毕竟先到先得嘛,但是其性能却不是最优的。这是因为当前正在尝试获取锁的协程在发现前面还有等待的协程的时候,需要强制让出 CPU 时间片,会有协程上下文切换,这种情况,肯定是没有直接把锁让给当前协程效率高的,在高并发的情况下,性能问题暴露地将会更加明显。

            通过引入 Woken 标识,能够让 Mutex 知道当前是否有已经被唤醒的协程在尝试获取锁,如果有的话,它能够在锁被其他协程释放的时候马上获取到锁。这个可以从下文 lockSlow 的实现中看出。

            Starving

            Woken 解决了性能问题,但是又带来了一个问题:由于新来的协程会有更大的机会获取到锁,这导致等待队列中的协程可能会迟迟无法获取到锁,从而产生“饥饿”问题,相对于性能问题,貌似饥饿问题显得更加严重,因此在 Go1.9 版本中,增加饥饿模式,通过 Starving 位来标识。如果一个协程等待获取锁的时间超过了 1ms,那么它就会处于饥饿状态,Mutex 会让处于饥饿状态的协程更有机会获取到锁。

            Lock

              // 加锁
              func (m *Mutex) Lock() {
              // state 从 0 直接加锁成功,直接返回就行
              if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
              if race.Enabled {
              race.Acquire(unsafe.Pointer(m))
              }
              return
              }
              m.lockSlow()
              }
              复制

              这里通过将 slow path 外联,可以使 fast path 内联。那么什么是内联呢?

              我们都知道栈分配内存会比堆分配高效地多,那么我们就会希望对象能尽可能被分配在栈上。在 golang 中,一个 goroutine 会有一个单独的栈,栈又会包含多个栈帧,栈帧是函数调用时在栈上为函数所分配的区域。但其实,函数调用是存在一些固定开销的,例如维护帧指针寄存 器BP、栈溢出检测等。因此,对于一些代码行比较少的函数,编译器倾向于将它们在编译期展开从而消除函数调用,这种行为就是内联。

              例如假设有以下函数调用:

                func main() {
                var a, b = 1, 2
                res := max(a, b)
                fmt.Println(res)
                }


                func max(a, b int) int {
                if a > b {
                return a
                }
                return b
                }
                复制

                启用内联优化之后,呈现给编译器的样子,看起来如下:

                  func main() {
                  var a, b = 1, 2
                  var res int
                  // 函数展开
                  if a > b {
                  res = a
                  } else {
                  res = b
                  }
                  fmt.Println(res)
                  }
                  复制

                  像这种通过 fast path 和 slow path 来进行内联优化的方式,golang 源码中随处可见。

                  lockSlow

                  在加锁过程中有两种情况:

                  Mutex 处于非饥饿模式:这个时候如果协程加锁不成功不会立刻进入阻塞队列,而是判断自己是否满足自旋的条件,如果满足,则启动自旋,在自旋过程中尝试获取锁。Mutex 处于饥饿模式:如果一个协程阻塞等待的时间过长(超过 1ms),那么 mutex 会被标记为饥饿模式,此时协程抢锁过程中不会开启自旋,而是一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取到锁。

                  非饥饿模式下,协程会开启自旋操作。自旋能够避免协程切换,使当前自旋的协程有机会更快获取到锁。自旋对应 CPU 的 PAUSE 指令,相当于 CPU 进行空转。由于自旋操作会很大程度上给 CPU 带来一定的压力,因此自旋不能无限制进行下去,所有在这里,会通过 sync_runtime_canSpin(int)
                   判断能否进入自旋,允许自旋的条件如下:

                  自旋次数要不超过 4 次,这个是根据入参确定的;CPU 的核数要大于 1,否则自旋没有意义,因为没有其他的协程能够在当前协程自旋期间获取到时间片而释放锁,自旋只会白白浪费时间;gomaxprocs 要大于 1,也就是 GMP 中的 P (Processor);至少有一个本地的 P 队列,并且其可运行的 G 队列为空。

                    func (m *Mutex) lockSlow() {
                    var waitStartTime int64
                    // 标识是否处于饥饿模式
                    starving := false
                    // 唤醒标记
                    awoke := false
                    // 自旋次数
                    iter := 0
                    old := m.state
                    for {
                    // 非饥饿模式下,开启自旋操作
                    // 从 runtime_canSpin(iter) 的实现中(runtime/proc.sync_runtime_canSpin)可以知道,
                    // 如果 iter 的值大于 4,将返回 false
                    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
                    // 如果没有其他 waiter 被唤醒,那么将当前协程置为唤醒状态,同时 CAS 更新 mutex 的 Woken 位
                    if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                    }
                    // 开启自旋
                    runtime_doSpin()
                    iter++
                    // 重新检查 state 的值
                    old = m.state
                    continue
                    }
                    new := old
                    // 非饥饿状态
                    if old&mutexStarving == 0 {
                    // 当前协程可以直接加锁
                    new |= mutexLocked
                    }
                    // mutex 已经被锁住或者处于饥饿模式
                    // 那么当前协程不能获取到锁,将会进入等待状态
                    if old&(mutexLocked|mutexStarving) != 0 {
                    // waiter 数量加 1,当前协程处于等待状态
                    new += 1 << mutexWaiterShift
                    }
                    // 当前协程处于饥饿状态并且 mutex 依然被锁住,那么设置 mutex 为饥饿模式
                    if starving && old&mutexLocked != 0 {
                    new |= mutexStarving
                    }
                    if awoke {
                    if new&mutexWoken == 0 {
                    throw("sync: inconsistent mutex state")
                    }
                    // 清除唤醒标记
                    // &^ 与非操作,mutexWoken: 10 -> 01
                    // 此操作之后,new 的 Locked 位值是 1,如果能够成功写入到 m.state 字段,那么当前协程获取锁成功
                    new &^= mutexWoken
                    }
                    // CAS 设置新状态成功
                    if atomic.CompareAndSwapInt32(&m.state, old, new) {
                    // 旧的锁状态已经被释放并且处于非饥饿状态
                    // 这个时候当前协程正常请求到了锁,就可以直接返回了
                    if old&(mutexLocked|mutexStarving) == 0 {
                    break
                    }
                    // 处理当前协程的饥饿状态
                    // 如果之前已经处于等待状态了(已经在队列里面),那么将其加入到队列头部,从而可以被高优唤醒
                    queueLifo := waitStartTime != 0
                    if waitStartTime == 0 {
                    // 阻塞开始时间
                    waitStartTime = runtime_nanotime()
                    }
                    // P 操作,阻塞等待
                    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
                    // 唤醒之后,如果当前协程等待超过 1ms,那么标识当前协程处于饥饿状态
                    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
                    old = m.state
                    // mutex 已经处于饥饿模式
                    if old&mutexStarving != 0 {
                    // 1. 如果当前协程被唤醒但是 mutex 还是处于锁住状态
                    // 那么 mutex 处于非法状态
                    //
                    // 2. 或者如果此时 waiter 数量是 0,并且 mutex 未被锁住
                    // 代表当前协程没有在 waiters 中,但是却想要获取到锁,那么 mutex 状态非法
                    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                    }
                    // delta 代表加锁并且将 waiter 数量减 1 两步操作
                    delta := int32(mutexLocked - 1<<mutexWaiterShift)
                    // 非饥饿状态 或者 当前只剩下一个 waiter 了(就是当前协程本身)
                    if !starving || old>>mutexWaiterShift == 1 {
                    // 那么 mutex 退出饥饿模式
                    delta -= mutexStarving
                    }
                    // 设置新的状态
                    atomic.AddInt32(&m.state, delta)
                    break
                    }
                    awoke = true
                    iter = 0
                    } else {
                    old = m.state
                    }
                    }


                    if race.Enabled {
                    race.Acquire(unsafe.Pointer(m))
                    }
                    }
                    复制

                    Unlock

                      同样做了内联优化:

                      // 解锁操作
                      func (m *Mutex) Unlock() {
                      if race.Enabled {
                      _ = m.state
                      race.Release(unsafe.Pointer(m))
                      }


                      // mutexLocked 位设置为 0,解锁
                      new := atomic.AddInt32(&m.state, -mutexLocked)
                      // 如果此时 state 值不是 0,代表其他位不是 0(或者出现异常使用导致 mutexLocked 位也不是 0)
                      // 此时需要进一步做一些其他操作,比如唤醒等待中的协程等
                      if new != 0 {
                      m.unlockSlow(new)
                      }
                      }
                      复制

                      unlockSlow

                      解锁操作会根据 Mutex.state 的状态来判断需不需要去唤醒其他等待中的协程。

                        func (m *Mutex) unlockSlow(new int32) {
                        // new - state 字段原子减 1 之后的值,如果之前是处于加锁状态,那么此时 new 的末位应该是 0
                        // 此时 new+mutexLocked 正常情况下会将 new 末位变成 1
                        // 那么如果和 mutexLocked 做与运算之后的结果是 0,代表 new 值非法,解锁了一个未加锁的 mutex
                        if (new+mutexLocked)&mutexLocked == 0 {
                        throw("sync: unlock of unlocked mutex")
                        }
                        // 如果不是处于饥饿状态
                        if new&mutexStarving == 0 {
                        old := new
                        for {
                        // old>>mutexWaiterShift == 0 代表没有等待加锁的协程了,自然不需要执行唤醒操作
                        // old&mutexLocked != 0 代表已经有协程加锁成功,此时没有必要再唤醒一个协程(因为它不可能加锁成功)
                        // old&mutexWoken != 0 代表已经有协程被唤醒并且在加锁过程中,此时不需要再执行唤醒操作了
                        // old&mutexStarving != 0 代表已经进入了饥饿状态,
                        // 以上四种情况,皆不需要执行唤醒操作
                        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                        return
                        }
                        // 唤醒一个等待中的协程,将 state woken 位置为 1
                        // old - 1<<mutexWaiterShift waiter 数量减 1
                        new = (old - 1<<mutexWaiterShift) | mutexWoken
                        if atomic.CompareAndSwapInt32(&m.state, old, new) {
                        runtime_Semrelease(&m.sema, false, 1)
                        return
                        }
                        old = m.state
                        }
                        } else {
                        // 饥饿模式
                        // 将 mutex 的拥有权转移给下一个 waiter,并且交出 CPU 时间片,从而能够让下一个 waiter 立刻开始执行
                        runtime_Semrelease(&m.sema, true, 1)
                        }
                        }
                        复制

                        由源码来看使用 Mutex 的一些注意事项

                        Lock() / Unlock() 方法一定要成对出现

                        也就是在调用 Lock()
                         方法之后一定要有对应的 Unlock()
                         方法。如果忘记调用 Unlock()
                         方法(或者 Unlock()
                         方法由于程序异常而没有机会执行),那么会导致其他等待加锁的协程永远获取不到锁,从而产生死锁;如果 Unlock()
                         方法调用次数过多(或者只有 Unlock()
                         而没有 Lock()
                        ),会导致 Mutex 内部报 panic。这两种情况都会导致程序异常的发生,因此使用的时候一定要小心,较好的实践就是配合 defer
                         使用:

                          var mu sync.Mutex
                          mu.Lock()
                          defer mu.Unlock()
                          复制

                          小心拷贝 Mutex

                          Mutex 是一个结构体类型,属于值类型中的一种,作为函数入参和出参以及把它赋值给一个变量等操作都会导致 Mutex 副本的产生,也就是 copy 出了一个新的 Mutex。之所以不允许这么操作,主要原因就是 Mutex 是一个有状态的数据结构,对一个 copy 而来的 Mutex 进行操作往往可能会产生一些意料之外的结果。当然通过传递指针的方式可以避免这种情况的发生,但是一般情况下,大家还是尽量不要这么操作。

                          Lock() / Unlock() 尽量在同一个协程里面

                          Unlock()
                           方法可以被任意的 goroutine 调用释放锁,即使是没持有这个互斥锁的 goroutine,也可以进行这个操作。这是因为,Mutex 本身并没有包含持有这把锁的 goroutine 的信息,所以,Unlock()
                           也不会对此进行检查。其它 goroutine 可以强制释放锁,这是一个非常危险的操作,因为在临界区的 goroutine 可能不知道锁已经被释放了,还会继续执行临界区的业务操作,这可能会带来意想不到的结果,因为这个 goroutine 还以为自己持有锁呢,有可能导致 data race 问题。所以,我们在使用 Mutex 的时候,必须要保证 goroutine 尽可能不去释放自己未持有的锁,一定要遵循“谁申请,谁释放”的原则。

                          往期 Golang 源码系列

                          深入浅出 WaitGroup 及其升级版

                          参考

                          极客时间(王宝令)-java 并发编程实战之 Semaphore极客时间《Go 语言核心 36 讲》Go 专家编程 - 任洪彩

                          文章转载自李木子啊,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                          评论