暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

怎么用go语言实现简单Redis分布式锁?

架构狂人 2021-06-29
5840

在单机单应用体系下,为了消除多线程对共享变量的访问时产生的态问题,通常需要对临界区加锁。在集群场景下,应用会被负载均衡到多台机器上进行部署,此时也会产生一系列并发安全问题。比如,多台机器上的应用需要共同维护与执行同一组定时任定时时间来临,在不加任何额外处理的情况下,任务将会被多台机器重复执行,可能造成意想不的后果因此,在多机部署场景下,也需要一套跨机器的互斥解决方案,同一个操作在同一时刻只能被一台机器所执行。

分布式锁的设计理念

单机场景中的锁不适于多机场景下使用,多机场景下自然需要一种分布式形态的锁,允许多机共同抢占。这种锁同样需要实现并满足锁的一般特点:

1.必须要保证互斥机制的稳定性。

2.加锁与解锁的操作尽量高性能。

3.可以根据具体情境考虑实现抢锁失败是否阻塞。

4.最好具备可重入与失效机制,避免死锁。


实现方式:

1.基于MySQL数据库实现,可以借助MySQL的唯一索引,进行 insert 等操作,以操作的成功与否决定是否抢锁成功。也可以借助MySQL的排它锁 for update ,对增改操作加锁,实现互斥。另外也可以通过记录版本号信息,实现乐观锁。但是对于MySQL分布式锁的实现,其加解锁需要在磁盘上进行读写,性能上可能不太如意,需要根据实际场景考虑是否采用该方式。

2.基于Redis实现,该方式也是本文将要实现的方式。通常可以借助Redis中的  SETNX  操作,实现加锁的互斥性。而Redis作为一种单工作线程的内存数据库,其所有命令操作具有天然原子性,这保证了其并发的安全性。另外,其采用的IO多路复用机制保证了其高吞吐特性。因此,在高并发场景下,通常可以采用基于Redis实现的分布式锁。

基于go-redis的设计与实现

本文将基于go语言,使用了一个常用的go Redis客户端 go-redis库 (https://github.com/go-redis/redis) , 一步一步探索与实现一个简单的Redis分布式锁。

1.基于 SETNX 的锁初步实现

SETNX 命令用于在Redis中设置某个不存在的键的值。如果该键不存在,则设置成功,如果该键存在,则设置失败,不作任何动作。基于此可以实现一种简单的抢占机制。

首先配置连接Redis:

    var (
    client *redis.Client
      lockKey    = "my_lock"
    )


    func init() {
    client = redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
    Password: "",
    DB: 0, redis默认拥有16个db(0~15),且默认连接db0
    })
    }

    上述创建了一个Redis数据库连接,并定义了一个 lockKey 变量,作为锁的标记。

    既然是锁,那么必须要有加锁与解锁的实现,如下为加锁代码实现:

      func Lock() {
      var resp *redis.BoolCmd
      for {
      resp = client.SetNX(lockKey, 1, time.Second*10) //返回执行结果
      lockSuccess, err := resp.Result()
      if err == nil && lockSuccess {
            fmt.Println("lock success!")
      return
      } else {
      //抢锁失败,继续自旋
            fmt.Println("lock failed!", err)
          }
        }
      }

      Lock()函数封装了对锁的抢占操作,这种抢占采用了自旋的方式,进行锁状态的轮询,是一种同步非阻塞的锁实现。client.SetNX(lockKey, 1, time.Second*10)  这行代码本质上执行了如下Redis操作命令:

        set my_lock 1 ex 10 nx

        该命令为 my_lock 键以 NX 方式设置了值。

        如果持有锁的进程万一挂了,那么该键将永远存在与Redis中,其他竞争者无法进行 SETNX 操作,形成死锁。为了防范这种情况发生,这里顺便设置了过期时间为10s,这样即便持锁者挂了,锁在一定时间后依然后自动释放。这里整个 set 操作是原子性的,并对该操作的返回结果作了判断,如果成功设置,说明抢占锁成功,则函数返回,进入临界区可以继续执行下面的代码。如果设置失败,则继续for循环自旋。另外,抢锁失败时,可以适当进行线程休眠,以降低自旋空转对CPU的占用。

        接下来看看,在加锁成功并完成了临界区代码后,如何进行解锁操作:

          func Unlock() {
          delResp := client.Del(lockKey)
          unlockSuccess, err := delResp.Result()
          if err == nil && unlockSuccess > 0 {
              fmt.Println("unlock success!")
          } else {
          fmt.Println("unlock failed!", err)
          }
          }

          上述代码中,client.Del(lockKey) 对Redis中的 my_lock 键进行了删除操作,当删除后,其他竞争者才有机会对该键进行 SETNX操作。

          2.锁的防误删实现

          这样就使用Redis实现了一个简单的分布式锁,但是仍然存在一个问题:如果键过期了,其持有者仍未完成任务,那么锁可能会被其他竞争者抢走,待原持有者完成任务进行解锁操作时,解除的将是当前其他持有者的锁,即发生误删。

          为了解决这种问题,持有者可以给锁添加一个唯一标识,使之只能删除自己的锁。因此需要完善一下加解锁操作:

            func Lock() {
            var resp *redis.BoolCmd
            for {
            goId := utils.GetCurrentGoroutineId()
                resp = client.SetNX(lockKey, goId, time.Second*10//返回执行结果
            lockSuccess, err := resp.Result()
            if err == nil && lockSuccess {
            fmt.Println("lock success!")
            return
            } else {
            fmt.Println("lock failed!", err)
            }
            //抢锁失败,继续自旋
              }
            }

            修改之处在于第4、5行,这里我使用了Goroutine的id为my_lock 键设置值,来作为锁的唯一标识。

            另外顺便提一下,众所周知,Go语言官方并没有提供任何Goroutine id的获取接口。但我们注意到,Goroutine的堆栈信息具有以下特征:

              goroutine 1 [running]:
              main.main()
              D:/albert/gopath/src/go_coding/main.go:12 +0x82

              其中 goroutine 1 [running] 中蕴含了该Goroutine的id信息,因此这里我使用了这么一个技巧,通过查看Goroutine的堆栈信息来解析获取其Goroutine id,代码如下:

                func GetCurrentGoroutineId() int {
                buf := make([]byte, 128)
                buf = buf[:runtime.Stack(buf, false)]
                stackInfo := string(buf)
                goIdStr := strings.TrimSpace(strings.Split(strings.Split(stackInfo, "[running]")[0], "goroutine")[1])
                goId, err := strconv.Atoi(goIdStr)
                if err != nil {
                fmt.Println("err=", err)
                return 0
                }
                return goId
                }

                另外,除了Goroutine id外,唯一标识还可以通过其他方式实现,如随机数或者时间戳等。

                在解锁时,需要先判断锁的唯一标识值是否是与当前执行者相匹配:

                  func Unlock() {
                  if value, err := client.Get(lockKey).Result(); err != nil || value == "" {
                      fmt.Println("lock not exist")
                  return
                  } else {
                  if value != strconv.Itoa(utils.GetCurrentGoroutineId()) {
                  return
                  }
                  }
                  //确认当前锁是自己的锁后,进行删除锁
                  //确认当前锁是自己的锁之后,删除锁之前,这段时间内,锁可能会恰巧过期释放且被其他竞争者抢占
                  //那么继续删除锁则是删除别人的锁
                  //因此需要将整个解锁过程原子化,使得在解锁期间,其他竞争者的任何操作不能被redis执行
                  delResp := client.Del(lockKey)
                  unlockSuccess, err := delResp.Result()
                  if err == nil && unlockSuccess > 0 {
                      fmt.Println("unlock success!")
                  } else {
                  fmt.Println("unlock failed!", err)
                  }
                  }

                  上述代码第5行,将获取的键值与当前的Goroutine id比较之后,再决定是否执行下面的删除锁操作。

                  3.解锁的原子化实现

                  上述解锁操作中,仍存在一个问题:在确认当前锁是自己的锁之后,删除锁之前,这段时间内,锁可能会恰巧过期释放且被其他竞争者抢占,那么继续删除则删除的是别人的锁,又会出现误删问题。

                  因此需要将整个解锁过程原子化,使得在解锁期间,其他竞争者的任何操作不能被Redis执行。

                  这里我采用了Lua脚本,封装了判断标识与删除键的整个操作,通过KEYSARGV 数组将键与值传入:

                    func Unlock() {
                    script := redis.NewScript(`
                    if redis.call('get', KEYS[1]) == ARGV[1]
                    then
                    return redis.call('del', KEYS[1])
                    else
                    return 0
                    end
                    `)
                    resp := script.Run(client, []string{lockKey}, utils.GetCurrentGoroutineId())
                    if result, err := resp.Result(); err != nil || result == 0 {
                    fmt.Println("unlock failed:", err)
                    }
                    }

                    这样一来,确认锁与删除锁的整体操作进行了原子化,便可以防止上述存在的误删问题。
                    4.锁续期的看门狗实现

                    以上完成与解决了锁的期限、唯一性等问题,仍存在一个问题:当锁的持有者任务未完成,但是锁已过期。此时虽然他仍可以将任务继续完成,并且也不会误删其他持锁者的锁,但是此时可能会存在多个执行者同时执行临界区代码,使得数据的一致性难以保证,造成意外的后果,分布式锁就失去了意义

                    因此,需要一个锁的自动续期机制,分布式锁框架Redission中就有这么一个看门狗,专门为将要到期的锁进行续期。这里我们也来实现一个简单的看门狗吧:

                      var (
                        unlockCh = make(chan struct{}, 0//用户解锁通知通道
                      )
                      //自动续期看门狗
                      func watchDog(goId int) {
                      // 创建一个定时器NewTicker, 每隔8s触发一次
                      expTicker := time.NewTicker(time.Second * 8)
                      //确认锁与锁续期打包原子化
                      script := redis.NewScript(`
                      if redis.call('get', KEYS[1]) == ARGV[1]
                      then
                      return redis.call('expire', KEYS[1], ARGV[2])
                      else
                      return 0
                      end
                      `)
                      for {
                      select {
                      case <-expTicker.C: //因为上边是用NewTicker创建的定时器,所以每隔8s都会触发
                      resp := script.Run(client, []string{lockKey}, goId, 10)
                            if result, err := resp.Result(); err != nil || result == int64(0) {
                      log.Println("expire lock failed", err)
                      }
                          case <-unlockCh: //任务完成后用户解锁通知看门狗退出
                      return
                      }
                      }
                      }

                      上述代码实现了一个简单看门狗,鉴于我们的锁的默认期限是10s,因此看门狗将每隔8s触发一次,这里我们使用了go语言标准库中的Ticker实现。在select 语句中,每隔8s就会触发一次 Expire 操作进行续期,将 my_lock 键的过期时间重置为10s。注意,这里使用Lua脚本封装了确认锁与锁续期的操作,以防止误续期了其他持有者的锁。因此需要将Goroutine id传入看门狗函数中,而且不可以在看门狗函数中获取Goroutine id,因为这将获取的是看门狗线程的Goroutine id。

                      我们只需要在加锁成功时,以启动看门狗线程即可,如下第8行所示:

                        func Lock() {
                        var resp *redis.BoolCmd
                        for {
                        goId := utils.GetCurrentGoroutineId()
                            resp = client.SetNX(lockKey, goId, time.Millisecond*60//返回执行结果
                        lockSuccess, err := resp.Result()
                        if err == nil && lockSuccess { //抢锁成功,开启看门狗 并跳出,否则失败继续自旋
                        go watchDog(goId)
                        return
                        } else {
                        fmt.Println("lock failed!", err)
                        }
                        }
                        }

                        另外,当任务完成后,进行解锁操作时需要通知看门狗退出,这里使用了一个unlockCh 通道,当解锁时会向 unlockCh 发送一个信号,让select 去选择执行,使得看门狗线程return退出。因此,我们只需要在删除锁成功时,发送信号通知看门狗退出即可,如下第15行所示:

                          func Unlock() {
                          script := redis.NewScript(`
                          if redis.call('get', KEYS[1]) == ARGV[1]
                          then
                          return redis.call('del', KEYS[1])
                          else
                          return 0
                          end
                          `)
                          resp := script.Run(client, []string{lockKey}, utils.GetCurrentGoroutineId())
                          if result, err := resp.Result(); err != nil || result == 0 {
                          fmt.Println("unlock failed:", err)
                          } else {
                              //删锁成功后,通知看门狗退出
                          unlockCh <- struct{}{}
                          }
                          }

                          在分布式系统中,各个机器上的应用在自己的内存空间中独立维护自己的看门狗以及unlockCh 通道,因此看门狗不存在竞态问题,也不存在误通知其他应用的看门狗退出的现象发生。

                          最终

                          至此,我们简单完成了一个Redis分布式锁的实现,也尝试解决了一些可能存在的问题。当然,这只是分布式锁的一种粗略实现,在实际场景中,还需要结合具体业务,考虑锁的过期时间、续期时间、可重入性、公平性、阻塞方式等问题。最后,附上完整的最终代码以供参考,有疏漏错误之处,还望予以指出:

                            package main


                            import (
                            "fmt"
                            "github.com/go-redis/redis"
                            "go_coding/utils"
                            "log"
                            "sync"
                            "time"
                            )


                            var (
                            client *redis.Client
                            lockKey = "my_lock"
                            unlockCh = make(chan struct{}, 0) //用户解锁通知通道
                            )


                            func init() {
                            client = redis.NewClient(&redis.Options{
                            Addr: "127.0.0.1:6379",
                            Password: "",
                            DB: 0, //redis默认拥有16个db(0~15),且默认连接db0
                            })
                            }


                            func Lock() {
                            var resp *redis.BoolCmd
                            for {
                            goId := utils.GetCurrentGoroutineId()
                            resp = client.SetNX(lockKey, goId, time.Second*10) //返回执行结果
                            lockSuccess, err := resp.Result()
                                if err == nil && lockSuccess {
                                  //抢锁成功,开启看门狗 并跳出,否则失败继续自旋
                            go watchDog(goId)
                            return
                            }else {
                                  //time.Sleep(time.Millisecond*30) //可以适当休眠
                            }
                            }
                            }


                            //自动续期看门狗
                            func watchDog(goId int) {
                            // 创建一个定时器NewTicker, 每隔2秒触发一次,类似于闹钟
                            expTicker := time.NewTicker(time.Second * 8)
                            //确认锁与锁续期打包原子化
                            script := redis.NewScript(`
                            if redis.call('get', KEYS[1]) == ARGV[1]
                            then
                            return redis.call('expire', KEYS[1], ARGV[2])
                            else
                            return 0
                            end
                            `)
                            for {
                            select {
                            case <-expTicker.C: //因为上边是用NewTicker创建的定时器,所以每隔8s都会触发
                            resp := script.Run(client, []string{lockKey}, goId, 10)
                            if result, err := resp.Result(); err != nil || result == int64(0) {
                            //续期失败
                            log.Println("expire lock failed", err)
                            }
                            case <-unlockCh: //任务完成后用户解锁通知看门狗退出
                            return
                            }
                            }
                            }


                            func Unlock() {
                            script := redis.NewScript(`
                            if redis.call('get', KEYS[1]) == ARGV[1]
                            then
                            return redis.call('del', KEYS[1])
                            else
                            return 0
                            end
                            `)
                            resp := script.Run(client, []string{lockKey}, utils.GetCurrentGoroutineId())
                            if result, err := resp.Result(); err != nil || result == 0 {
                            fmt.Println("unlock failed:", err)
                            } else {
                            //删锁成功后,通知看门狗退出
                            unlockCh <- struct{}{}
                            }
                            }
                            文章转载自架构狂人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                            评论