暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Golang 之 RWMutex 源码解读

李木子啊 2021-08-23
694

前言

在上一篇源码系列【Golang 之 Mutex 源码详解】我们讲到,Mutex 能够解决对临界区并发访问的问题。然而 Mutex 的使用相当于把对临界区的访问完全串行化了,这在读多写少之类的场景,其性能有时候完全不能满足需求。对此,基于 Mutex,Golang 实现了 RWMutex。RWMutex 翻译成中文的读法应该是“读写互斥锁”,简称“读写锁”。读写锁设计的关键点就在于读操作可以并发执行,从而能够极好地满足读多写少的场景。

读写锁实现机制

读写锁的设计一般满足如下规则:

写写互斥:这个很好理解,与一般的互斥锁语义相同;

读写互斥,包含两部分含义,都是为了避免读不一致的情况产生:

    •在拥有写锁的时候其他协程不能获取到读锁;    •在拥有读锁的时候其他协程不能获取到写锁;

    •读读不互斥:不同的协程可以同时获取到读锁,这个是提高读性能的关键所在。


读不一致的意思就是在读锁加锁期间,读取同一个变量的结果不相同,这个和 MySQL 中的“幻读”很像。比如考虑如下场景,一读一写两个协程同时对变量 a 进行读或者写操作,我们假设读锁不会阻塞写锁:



读加锁
读 a = 1写加锁

写 a = 2
读 a = 2(不一致)写解锁

可以发现读操作由于写操作的乱入,导致读的两次结果产生了不一致的情况。因此,读写互斥是很有必要的

RWMutex 的实现正是遵循了上述规则,接下来就让我们走进 RWMutex 源码,一起来解读一下其实现的相关细节吧。

RWMutex 源码解析

首先我们来看一下 RWMutex 对外暴露的结构体,理解其字段的含义是理解下面加锁和解锁细节的关键。

RWMutex 结构体

    type RWMutex struct {
    // 写操作锁,通过它实现写写操作互斥
    w Mutex
    // 写锁信号量
    writerSem uint32
    // 读锁信号量
    readerSem uint32
    // 当前读操作的数量,包含所有已经获取到读锁或者被写操作阻塞的等待获取读锁的读操作数量
    readerCount int32
    // 获取写锁需要等待读锁释放的数量
    readerWait int32
    }
    复制

    读写锁中写操作需要在等待读锁释放的时候才能获取写锁,然而这就会存在一个问题,就是写操作在等待读锁释放期间可能还会有新的读操作获取到了锁,尤其是在读并发量特别大的场景下。如果读操作一直能够获取到锁,那么写操作就会一直阻塞下去,从而造成“写饥饿”(咦,饥饿?好熟悉)。那么这种情况该如何避免呢?

    RWMutex 利用 readerCount readerWait 属性,提供了一个非常巧妙的思路。写操作到来的时候,会把 readerCount 的值复制给 readerWait,用来标记在当前写操作之前的读操作的数量。当读锁释放的时候,会递减 readerWait,当它变成 0 的时候,就代表前面的读操作全部完成了,此时写操作会被唤醒。至于如何告诉 RWMutex 当前有正在阻塞中的写操作,这里其实是通过将 readerCount-rwmutexMaxReaders 来实现的,由于 rwmutexMaxReaders 是一个极大值,现实情况并发读的数量远不可能达到这个值,因此读操作一旦发现 readerCount 的值小于 0,便可以知道当前有写操作阻塞了,具体实现可以看下面的相关源码。

    RWMutex 对外共提供了五个方法,分别是:

    Lock()
     / Unlock()
    :写加锁和解锁操作;
    RLock()
     / RUnlock()
    :读加锁和解锁操作;
    RLocker()
    :读锁操作对象,实现 Locker 接口,可以通过 Locker 接口的方法实现读锁的加锁和解锁。

    Lock()

    写锁加锁。一方面需要获取互斥锁,用于写写互斥,基于 Mutex 实现;同时告诉读操作有写操作正在排队,如果当前有正在进行中的读操作的话,那么会阻塞等待所有进行中的读操作完成。

      func (rw *RWMutex) Lock() {
      if race.Enabled {
      _ = rw.w.state
      race.Disable()
      }
      // 首先竞争获取到读锁
      // 这里实现的是写写互斥
      rw.w.Lock()
      // 接下来是读写互斥
      // 首先需要通知读锁,有一个写操作被阻塞了
      // 这里正是设计的巧妙之处
      // 如果这里赋值成功,那么后续读锁加锁的时候,都不可能使 readerCount > 0
      // 当然以上是基于读并发数量不可能 > rwmutexMaxReaders 这个前提
      // 当读加锁的时候发现 readerCount < 0 了,就可以知道当前有被阻塞的写操作
      r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
      // 等待读锁全部释放,两个条件,满足其一即可:
      // 1. r = 0
      // 2. atomic.AddInt32(&rw.readerWait, r) = 0
      // 否则需要阻塞等待写信号量
      if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
      // 和 RUnlock() 释放写锁信号量相呼应
      runtime_SemacquireMutex(&rw.writerSem, false, 0)
      }
      if race.Enabled {
      race.Enable()
      race.Acquire(unsafe.Pointer(&rw.readerSem))
      race.Acquire(unsafe.Pointer(&rw.writerSem))
      }
      }
      复制

      Unlock()

      写锁解锁,需要唤醒被阻塞的读操作,告诉它们可以继续了;同时需要释放写写互斥锁。

        func (rw *RWMutex) Unlock() {
        if race.Enabled {
        _ = rw.w.state
        race.Release(unsafe.Pointer(&rw.readerSem))
        race.Disable()
        }


        // 和加锁时减 rwmutexMaxReaders 相呼应,告诉读协程没有活跃中的写操作了
        // 这之后 readerCount 又能恢复正常(> 0),代表当前读操作的数量
        r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
        // 异常情况
        if r >= rwmutexMaxReaders {
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
        }
        // 唤醒读操作,与下文 RLock() 读操作等待信号量相呼应
        for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
        }
        // 释放写锁
        rw.w.Unlock()
        if race.Enabled {
        race.Enable()
        }
        }
        复制

        RLock()

        读锁加锁,readerCount 减 1 即可,同时如果写锁正在被占用,需要等待写锁释放。写锁被占用的标志就是 readerCount 的值小于 0。

          func (rw *RWMutex) RLock() {
          if race.Enabled {
          _ = rw.w.state
          race.Disable()
          }
          // 如果 atomic.AddInt32(&rw.readerCount, 1) < 0,代表写锁正在被占用,需要等待写锁释放
          if atomic.AddInt32(&rw.readerCount, 1) < 0 {
          // 阻塞等待写锁释放读信号量,和上文 Unlock() 释放读锁信号量相呼应
          runtime_SemacquireMutex(&rw.readerSem, false, 0)
          }
          if race.Enabled {
          race.Enable()
          race.Acquire(unsafe.Pointer(&rw.readerSem))
          }
          }
          复制

          RUnlock()

          读锁解锁,需要考虑写操作被阻塞的情况。

            func (rw *RWMutex) RUnlock() {
            if race.Enabled {
            _ = rw.w.state
            race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
            race.Disable()
            }
            // readerCount 减 1
            // 如果此时 readerCount < 0,代表有写操作被阻塞,需要走 slow-path,择机唤醒写操作
            if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
            // Outlined slow-path to allow the fast-path to be inlined
            rw.rUnlockSlow(r)
            }
            if race.Enabled {
            race.Enable()
            }
            }
            复制

            rUnlockSlow

            最后一个释放的读锁需要唤醒等待中的写加锁操作。

              // r -> rw.readerCount,此时是读锁解锁减去 1 之后的值
              func (rw *RWMutex) rUnlockSlow(r int32) {
              // 如果 r+1 = 0,则 r = -1,代表解锁了一个未加锁的锁
              // 如果 r+1 = -rwmutexMaxReaders,还记得写锁加锁的操作嘛,这里代表解锁了一个写锁
              // 这两种情况都是非法的
              if r+1 == 0 || r+1 == -rwmutexMaxReaders {
              race.Enable()
              throw("sync: RUnlock of unlocked RWMutex")
              }
              // 写操作需要等待读操作释放的数量减 1
              // 如果此时值变成了 0,代表前面的读操作全部释放了
              if atomic.AddInt32(&rw.readerWait, -1) == 0 {
              // 唤醒一个写锁操作,和 Lock() 等待写锁信号量相呼应
              runtime_Semrelease(&rw.writerSem, false, 1)
              }
              }
              复制

              RLocker()

              返回一个读操作的接口对象。用于操作读锁相关的加锁和解锁方法。

                func (rw *RWMutex) RLocker() Locker {
                return (*rlocker)(rw)
                }


                type rlocker RWMutex


                func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
                func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
                复制

                使用示例

                我们都知道缓存是一个典型的读多写少的系统,接下来我们利用 RWMutex 来实现一个简单的缓存系统。

                我们的缓存系统主要对外提供两个方法:

                  // 回源函数
                  type Load func(string) interface{}


                  type Cache interface {
                  Get 从缓存中取值,如果缓存中没有,那么通过 load 方法回源并加载进缓存
                  Get(string, Load) interface{}


                  // Put 往缓存中写数据
                  Put(key, value string)
                  }
                  复制

                  遵循面向接口的编程范式,我们定义了 Cache 接口,包含两个方法。其中 Get()
                   操作需要加读锁防止并发写,如果数据不存在,则需要升级为写锁,回源数据并写缓存;Put()
                   操作需要加写锁,防止并发读写问题。具体实现如下:

                    // 利用 RWMutex 实现一个缓存
                    // 缓存是一个典型的读多写少的系统,这与读写锁的使用场景不谋而合
                    type MCache struct {
                    cacheMap map[string]interface{}
                    sync.RWMutex
                    }


                    // Get 读,按需加载
                    func (mc *MCache) Get(key string, load func(string) interface{}) (value interface{}) {
                    mc.RLock()
                    if val, exist := mc.cacheMap[key]; exist {
                    value = val
                    mc.RUnlock()
                    return value
                    }
                    mc.Lock()
                    defer mc.Unlock()
                    // 双重检查
                    if val, exist := mc.cacheMap[key]; exist {
                    value = val
                    return
                    }
                    // 查询数据源并写到缓存中
                    value = load(key)
                    mc.cacheMap[key] = value
                    return
                    }


                    // Put 写,需要添加写锁
                    func (mc *MCache) Put(key, value interface{}) {
                    mc.Lock()
                    mc.cacheMap[key] = value
                    mc.Unlock()
                    }
                    复制

                    so easy!

                    使用 RWMutex 的注意事项

                    同样,我们通过上述源码分析来总结一下使用 RWMutex 应该注意的地方吧。

                    1) 不可复制

                    RWMutex 结构体是有状态的,一旦被使用,它的几个字段就会记录其运行状态,如果复制了这把锁,就会同时把状态也给复制过来,这对后续新锁的使用会产生不可预知的影响。因此我们在使用读写锁的过程中,需要避免锁的值传递或者值赋值。

                    2) 加锁和解锁要成对出现

                    和 Mutex 类似,RWMutex 的 Lock()
                     与 Unlock()
                    ,以及 RLock()
                     与 RUnlock()
                     都需要成对出现,只有加锁而没有解锁会导致死锁,只有解锁而没有加锁则会导致程序 panic。在使用过程中我们尤其要注意函数提前退出导致解锁操作未被执行的情况,因此解锁操作可以配合 defer 使用。

                    3) 避免重入

                    Golang 标准库提供的锁都是不可重入的:

                    由于 Mutex 不可重入,所以基于 Mutex 的写锁不可重入,否则会产生死锁;在读操作的占用读锁的同时需要获取写锁,会产生死锁;如果读写锁之间相互循环依赖,也会产生死锁问题。

                    总结

                    可以看出 RWMutex 的设计体现出了写操作优先的原则,如果有一个写操作在等待请求锁的话,它会阻止新来的读请求获取到锁,从而保障写优先。这种设计能够很好避免写操作在读并发比较高的情况下造成的“写饥饿”现象。

                    往期 Golang 源码系列

                    深入浅出 WaitGroup 及其升级版Golang 之 Mutex 源码详解


                    文章转载自李木子啊,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论