暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

redisson分布式锁-Multi锁原理

程序猿西蒙 2021-11-23
2524


NO.1


背景


对于程序员来说,或多或少都会听过死锁这个概念,它是多个线程并发的竞争系统资源产生相互等待导致的,本质原因还是因为系统资源有限以及进程推进顺序不合理.
死锁的产生一般都会4个必要条件:
  1. 互斥条件: 某种资源一次只允许一个进程访问,即该资源一旦分配给一个进程使用,其他进程就不允许获取该资源,直到进程访问结束释放该资源

  2. 占有且等待条件: 一个进程本身占有一个或多个资源,同时还再等待其他线程释放需要的资源

  3. 不可抢占条件: 别的进程占有了一个资源,你不能直接将资源抢过来,不能强行剥夺

  4. 循环等待条件: 若干进程之间形成一种头尾相接的循环等待资源关系

当上述四个条件均满足,就会造成死锁,导致程序无法执行下去,那么怎么来解决死锁的问题呢?


当然实际解决方案有很多,我们这里只说说预防死锁的方案:

  1. 破坏“互斥”条件

    1. 就是在系统里取消互斥。若资源不被一个进程独占使用,那么死锁是肯定不会发生的。但一般来说在所列的四个条件中,“互斥”条件是无法破坏的。因此,在死锁预防里主要是破坏其他几个必要条件,而不去涉及破坏“互斥”条件

  2. 破坏"占有且等待"条件

    1. 破坏“占有并等待”条件,就是在系统中不允许进程在已获得某种资源的情况下,申请其他资源。即要想出一个办法,阻止进程在持有资源的同时申请其他资源,比如:所有的进程在开始运行之前,必须一次性地申请其在整个运行过程中所需要的全部资源

  3. 破坏“不可抢占”条件

    1. 破坏“不可抢占”条件就是允许对资源实行抢夺,如果占有某些资源的一个进程进行进一步资源请求被拒绝,则该进程必须释放它最初占有的资源

  4. 破坏“循环等待”条件

    1. 破坏“循环等待”条件的一种方法,是将系统中的所有资源统一编号,进程可在任何时刻提出资源申请,但所有申请必须按照资源的编号顺序(升序)提出。这样做就能保证系统不出现死锁

上面描述了一下,死锁的产生条件,以及一种预防方案,当我们在分布式系统中,使用分布式锁进行某些操作的时候,可能也会因为操作不当,导致系统间产生死锁,我们为了解决这个问题,可能会选择一次性的申请多个资源,这样更简单一点,当我们使用redisson来实现分布式锁的时候,能否支持呢?


NO.2


redisson使用

查阅redisson的官网,我们看到redisson支持一种Multi锁,这个锁能支持我们同时申请多个锁资源,我们看看怎么使用
Config config = new Config();
config.useSingleServer()
.setAddress("redis://192.168.1.17:6379");
RedissonClient client = Redisson.create(config);
RLock lock1 = client.getLock("lock1");
RLock lock2 = client.getLock("lock2");
RLock lock3 = client.getLock("lock3");


RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
lock.unlock();
复制
  1. 使用还是比较简单,同时声明多个锁然后交给RedissonMultiLock对象来管理

  2. 通过RedissonMultiLock的lock()/unlock()来进行操作


方法表面上使用很简单,但是实现原理呢?怎么来同时申请多个锁资源的呢?接下来我们来一起分析下它的实现原理


@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}


@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}


@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
if (leaseTime == -1) {
waitTime = baseWaitTime;
} else {
leaseTime = unit.toMillis(leaseTime);
waitTime = leaseTime;
if (waitTime <= 2000) {
waitTime = 2000;
} else if (waitTime <= baseWaitTime) {
waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
} else {
waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
}
}

while (true) {
if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
return;
}
}
}
复制
  1. 根据锁的个数先计算一个基础的等待时间,如果调用者没有自己指定则就使用计算好的等待时间 默认为lockNum*1500ms

  2. 执行循环逻辑来尝试加锁,直到加锁成功跳出循环

  3. 看到这里我们发现加锁的核心逻辑就是tryLock方法,这里才是我们重点分析的地方


我们重点分析一下tryLock()方法,来一探究竟

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
if (waitTime == -1) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime)*2;
}
}

long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);

int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}

if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}


if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}

if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}


if (leaseTime != -1) {
acquiredLocks.stream()
.map(l -> (RedissonLock) l)
.map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS))
.forEach(f -> f.syncUninterruptibly());
}

return true;
}


protected long calcLockWaitTime(long remainTime) {
return remainTime;
}
protected int failedLocksLimit() {
return 0;
}
复制
  1. 计算一个加锁剩余的时间remainTime和尝试加锁需要等待的时间lockWaitTime(默认就等于reaminTime)

  2. 查看允许加锁失败的个数,即对所有的锁对象,允许有几个锁对象加锁失败

  3. 创建一个集合来保存加锁成功的锁对象

  4. 遍历所有的锁对象,依次调用带超时参数的tryLock()来尝试进行加锁

  5. 如果在规定时间内加锁成功,则保存到acquiredLocks中

  6. 如果加锁失败则需要判断是否允许有加锁失败,这里failedLocksLimit为0,即只要有一个锁对象尝试加锁失败,那么就要释放掉之前成功的所有锁

    1. 为什么要这样呢?这就是上面预防死锁的方案中提到的破坏不可抢占条件,这样可以预防客户端加锁的时候产生死锁

  7. 判断加锁的时间是否超时,如果超时了则释放掉所有的锁资源,这样做也会防止死锁

  8. 如果tryLock加锁失败,那么lockInterruptibly()方法的while逻辑就不会跳出循环,他会继续尝试不断的进行加锁,直到所有的锁资源都成功获取


我们发现其实原理很简单,就是在规定时间内尝试获取声明的每一把锁,只要有一个锁获取失败,那么就放弃之前获取的所有的锁资源,然后从头再来,继续尝试加锁,直到所有的锁资源都加锁成功



NO.3


释放锁原理


锁资源既然获取成功了,在我们执行完对应的操作之后,理应释放掉获取的锁资源,那么针对多个锁资源,RedissonMultiLock又是怎么来释放的呢?
下面我们来分析下其原理
public void unlock() {
    List<RFuture<Void>> futures = new ArrayList<>(locks.size());
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
    }
for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
}
复制
  1. 我们发现这个释放锁更简单,就是遍历所有的锁资源,然后调用其自身的释放锁方法,进行释放

  2. 同步等待所有的资源释放完毕,结束运行


释放锁的原理何其简单,只是一些循环逻辑,调用不同资源本身的释放逻辑来完成锁资源释放

点个在看你最好看




文章转载自程序猿西蒙,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论