1、连锁
到此,关于 Redisson 的可重入锁、公平锁、读写锁的加锁和释放锁的基本原理都分析完毕了,如果感兴趣的同学,可以到下面的文章看一看:
Redisson分布式锁学习总结:可重入锁 RedissonLock#lock 获取锁源码分析
Redisson分布式锁学习总结:可重入锁 RedissonLock#unlock 释放锁源码分析
Redisson分布式锁学习总结:公平锁 RedissonFairLock#lock 获取锁源码分析
Redisson分布式锁学习总结:公平锁 RedissonFairLock#unLock 释放锁源码分析
Redisson分布式锁学习总结:读锁 RedissonReadLock#lock 获取锁源码分析
Redisson分布式锁学习总结:读锁 RedissonReadLock#unLock 释放锁源码分析
Redisson分布式锁学习总结:写锁 RedissonWriteLock#lock 获取锁源码分析
Redisson分布式锁学习总结:写锁 RedissonWriteLock#unLock 释放锁源码分析
但我们都知道,关于上面的锁,其实锁的都是一个资源;但如果我们需要同时锁定N个资源呢:例如下单的时候,我们需要同时锁定订单、库存、商品等,那 Redisson 有提供对应的分布式锁么?
答案:有
Redisson 提供了一种叫 RedissonMultiLock 的分布式锁,我们这里就叫它连锁吧,就是同时需要连续给指定的N个锁加锁成功,我们才算成功持有锁。
我们看看使用demo:
public class RedissonMultiLockDemo {public static void main(String[] args) {RedissonClient redissonClient = RedissonClientUtil.getClient("");RLock rLock1 = redissonClient.getLock("lock1");RLock rLock2 = redissonClient.getLock("lock2");RLock rLock3 = redissonClient.getLock("lock3");RLock multiLock = redissonClient.getMultiLock(rLock1, rLock2, rLock3);multiLock.lock();multiLock.unlock();}}
关于 RedissonMultiLock 的原理,其实非常的简单。我们在分析源码之前,可以先简单说一下:
配置N个 RedissonLock,加锁就是循环调用 RedissonLock 获取锁的方法,如果三个 RedissonLock 都能成功获取锁,那么 RedissonMultiLock 就成功获取锁;
释放锁的话,也是同样的道理,就是遍历调用 RedissonLock 释放锁的方法即可。
到这里,我们可以明白,分析 RedissonMultiLock 不再需要分析 lua 脚本了,因为它是等同于在 RedissonLock 上面做的扩展了。
这里还可以提到一个点,Redisson 实现 RedLock 算法又是基于 RedissonMultiLock 上面改动的,并且改动点少得可怜,等我们分析完 RedissonMultiLock,接着就是 RedissonRedLock 了。
2、源码分析
2.1、同步获取连锁源码分析:
关于源码分析,加锁我们只需要研究 lock() 即可,其他的带等待时间和超时时间的加锁或者是异步获取连锁,其实原理基本都是一致的,底层还是调用 RedissonLock 的加锁方法,只是对于 waitTime、leaseTime 的计算和异步结果的处理会不太一样。
1、lock() 方法
调用无参数的 lockInterruptibly() 方法。
@Overridepublic void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
2、lockInterruptibly() 方法
调用 lockInterruptibly(long leaseTime, TimeUnit unit) 方法,指定 leaseTime 为 -1,即无限持有锁。
@Overridepublic void lockInterruptibly() throws InterruptedException {lockInterruptibly(-1, null);}
3、lockInterruptibly(long leaseTime, TimeUnit unit) 方法
主要是计算 waitTime,毕竟这两个是一对的,计算后,进入死循环尝试获取锁,下面我们直接在源码处加注释进行分析。
@Overridepublic void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {// 连锁的锁数量 * 1500,假设我们这里是3个锁,那么就是3*1500 = 4500 毫秒long baseWaitTime = locks.size() * 1500;long waitTime = -1;// leaseTime 等于 -1,waitTime 被赋值为 baseWaitTimeif (leaseTime == -1) {waitTime = baseWaitTime;} else {// leaseTime 转毫秒数leaseTime = unit.toMillis(leaseTime);// waitTime 初始值等于 leaseTimewaitTime = leaseTime;// 如果 waitTime 小于2000毫秒,直接赋值为2000毫秒if (waitTime <= 2000) {waitTime = 2000;// 如果 waitTIme 小于等于 baseWaitTime,重新赋值为 [waitTime/2,waitTime] 之间的随机数} else if (waitTime <= baseWaitTime) {waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);// 如果 waitTime 大于 baseWaitTIme,重新赋值为 [baseWaitTIme,waitTime] 之间的随机数} else {waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);}}// 死循环,尝试获取锁,直到成功while (true) {// 最后,waitTime 是 [2250,4500] 之间的随机数// leaseTime 还是 -1if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {return;}}}
4、tryLock(long waitTime, long leaseTime, TimeUnit unit)
这个方法是本次加锁源码分析的重头戏,连锁的核心实现原理就在这了,我们接着在源码中添加注释进行分析。
@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// try {// return tryLockAsync(waitTime, leaseTime, unit).get();// } catch (ExecutionException e) {// throw new IllegalStateException(e);// }long newLeaseTime = -1;if (leaseTime != -1) {if (waitTime == -1) {newLeaseTime = unit.toMillis(leaseTime);} else {newLeaseTime = unit.toMillis(waitTime)*2;}}long time = System.currentTimeMillis();long remainTime = -1;// waiteTime 不是-1,remainTime 赋值为 waitTimeif (waitTime != -1) {remainTime = unit.toMillis(waitTime);}// 这里计算锁等待时间,RedissonMultiLock 是直接返回传入的 remainTimelong lockWaitTime = calcLockWaitTime(remainTime);// 允许获取锁失败次数,RedissonMultiLock 限制为0,即任何锁都不能获取失败int failedLocksLimit = failedLocksLimit();// 记录获取成功的锁List<RLock> acquiredLocks = new ArrayList<>(locks.size());// 遍历for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {// 获取等待时间,因为 RedissonMultiLock 的处理是 lockWaitTime = remainTime,所以 awaitTime 就等于 remainTime,也就是调用方法时传入的 waitTimelong awaitTime = Math.min(lockWaitTime, remainTime);// 调用 RedissonLock 的 tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法// 这个方法就不做详细介绍了,其实就是尝试获取锁,如果获取失败,最长等待时间为 awaitTime,如果获取成功了,持有锁时间最长为 newLeaseTime,我们这里 newLeaseTime 虽然 -1,但并不是锁没有设置过期时间,大家要记得 RedissonLock 的 watchdog 机制哦!lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {// 如果redis返回响应超时,释放当前锁,为了兼容成功获取锁,但是redis响应超时的情况。unlockInner(Arrays.asList(lock));// 加锁结果置为falselockAcquired = false;} catch (Exception e) {// 捕获异常,加锁结果置为falselockAcquired = false;}// 如果获取锁成功,将当前锁加入成功列表中if (lockAcquired) {acquiredLocks.add(lock);} else {// 假设 锁数量-成功锁数量等于失败上限,则跳出循环。在 RedissonMultiLock 中不会出现这种情况,主要是为了 RedissonRedLock 服务的,因为 RedLock 算法中只要一半以上的锁获取成功,就算成功持有锁。if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}// 如果失败上限为0,证明已经没有机会再失败了,执行下面的操作if (failedLocksLimit == 0) {// 释放成功获取的锁记录unlockInner(acquiredLocks);// 如果等待时间为-1,直接返回false表示获取连锁失败if (waitTime == -1) {return false;}// 重置失败上限failedLocksLimit = failedLocksLimit();// 清理成功成功记录acquiredLocks.clear();// reset iterator// 重置锁列表,后续所有锁都需要重新获取while (iterator.hasPrevious()) {iterator.previous();}} else {// 如果失败上限不为0,递减1failedLocksLimit--;}}// 如果remainTime不等于-1,即传入的waitTime不为-1if (remainTime != -1) {// remainTime 减去当前时间减去尝试获取锁时的当前时间remainTime -= System.currentTimeMillis() - time;// 重置timetime = System.currentTimeMillis();// 如果remainTime小于等于0,即等待时间已经消耗完了,释放所有成功获取的锁记录,返回false表示尝试获取锁失败if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}// 到这里,表示已经成功获取指定数量的锁,判断 leaseTime 是否不为-1,如果不是,即锁需要设定持有时间if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());// 遍历所有成功获取的锁记录,异步调用 pexpire 为锁key 设置超时时间for (RLock rLock : acquiredLocks) {RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}// 遍历获取异步调用结果for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}// 返回true表示获取连锁成功return true;}
到这里,我们已经将 RedissonMultiLock 加锁的核心源码都分析了一遍了,下面我做一下简单的总结:
RedissonMultiLock 是利用N个 RedissonLock 来完成连锁的功能,同步获取连锁最后都是调用 RedissonLock 的
tryLock(long waitTime, long leaseTime, TimeUnit unit)
方法。RedissonMultiLock 获取锁即使不传入waitTime,也会根据 leaseTime 和 baseWaitTime 来计算,而 baseWaitTime 等于锁个数乘以1500。
在等待时间内,只要出现一个锁获取失败,就会释放所有成功获取的锁记录,重置锁列表,重新遍历然后尝试获取锁。
如果超过等待时间,跳出获取锁的循环,但是外面的死循环会使得再次进入获取锁的代码中。
在循环获取锁中,只要遇到 redis 响应超时异常,都会释放之前所有成功获取的锁记录,当作获取连锁失败处理。
因为 RedissonMultiLock 底层是利用 RedissonLock 完成的,所以即使 leaseTime 等于-1,也不会出现死锁,因为 RedissonLock会利用 watchdog 为锁续命。
2.2、同步释放锁源码分析:
同步释放锁的源码就更不用分析了,相当的简单,直接循环调用 RedissonLock 释放锁的方法就ok,当然了,异步释放的也同样的简单,也不会单独进行分析了。
@Overridepublic void unlock() {List<RFuture<Void>> futures = new ArrayList<>(locks.size());// 遍历锁列表,调用RedissonLock异步释放锁的方法for (RLock lock : locks) {futures.add(lock.unlockAsync());}// 遍历异步调用返回的 future,同步等待获取结果for (RFuture<Void> future : futures) {future.syncUninterruptibly();}}
3、最后
到此,关于 RedissonMultiLock 的基本原理已经分析得差不多了,至于带参数的同步获取锁、异步获取锁、异步释放锁等就不需要再详细分析了,基本原理都是差不多的,大家自己看一下源码就阔以了~




