NO.1

背景

互斥条件: 某种资源一次只允许一个进程访问,即该资源一旦分配给一个进程使用,其他进程就不允许获取该资源,直到进程访问结束释放该资源
占有且等待条件: 一个进程本身占有一个或多个资源,同时还再等待其他线程释放需要的资源
不可抢占条件: 别的进程占有了一个资源,你不能直接将资源抢过来,不能强行剥夺
循环等待条件: 若干进程之间形成一种头尾相接的循环等待资源关系


当上述四个条件均满足,就会造成死锁,导致程序无法执行下去,那么怎么来解决死锁的问题呢?

当然实际解决方案有很多,我们这里只说说预防死锁的方案:
破坏“互斥”条件
就是在系统里取消互斥。若资源不被一个进程独占使用,那么死锁是肯定不会发生的。但一般来说在所列的四个条件中,“互斥”条件是无法破坏的。因此,在死锁预防里主要是破坏其他几个必要条件,而不去涉及破坏“互斥”条件
破坏"占有且等待"条件
破坏“占有并等待”条件,就是在系统中不允许进程在已获得某种资源的情况下,申请其他资源。即要想出一个办法,阻止进程在持有资源的同时申请其他资源,比如:所有的进程在开始运行之前,必须一次性地申请其在整个运行过程中所需要的全部资源
破坏“不可抢占”条件
破坏“不可抢占”条件就是允许对资源实行抢夺,如果占有某些资源的一个进程进行进一步资源请求被拒绝,则该进程必须释放它最初占有的资源
破坏“循环等待”条件
破坏“循环等待”条件的一种方法,是将系统中的所有资源统一编号,进程可在任何时刻提出资源申请,但所有申请必须按照资源的编号顺序(升序)提出。这样做就能保证系统不出现死锁


上面描述了一下,死锁的产生条件,以及一种预防方案,当我们在分布式系统中,使用分布式锁进行某些操作的时候,可能也会因为操作不当,导致系统间产生死锁,我们为了解决这个问题,可能会选择一次性的申请多个资源,这样更简单一点,当我们使用redisson来实现分布式锁的时候,能否支持呢?

NO.2

redisson使用

Config config = new Config();
config.useSingleServer()
.setAddress("redis://192.168.1.17:6379");
RedissonClient client = Redisson.create(config);
RLock lock1 = client.getLock("lock1");
RLock lock2 = client.getLock("lock2");
RLock lock3 = client.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
lock.unlock();
复制
使用还是比较简单,同时声明多个锁然后交给RedissonMultiLock对象来管理
通过RedissonMultiLock的lock()/unlock()来进行操作
方法表面上使用很简单,但是实现原理呢?怎么来同时申请多个锁资源的呢?接下来我们来一起分析下它的实现原理
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
if (leaseTime == -1) {
waitTime = baseWaitTime;
} else {
leaseTime = unit.toMillis(leaseTime);
waitTime = leaseTime;
if (waitTime <= 2000) {
waitTime = 2000;
} else if (waitTime <= baseWaitTime) {
waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
} else {
waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
}
}
while (true) {
if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
return;
}
}
}
复制
根据锁的个数先计算一个基础的等待时间,如果调用者没有自己指定则就使用计算好的等待时间 默认为lockNum*1500ms
执行循环逻辑来尝试加锁,直到加锁成功跳出循环
看到这里我们发现加锁的核心逻辑就是tryLock方法,这里才是我们重点分析的地方
我们重点分析一下tryLock()方法,来一探究竟
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
if (waitTime == -1) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime)*2;
}
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
acquiredLocks.stream()
.map(l -> (RedissonLock) l)
.map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS))
.forEach(f -> f.syncUninterruptibly());
}
return true;
}
protected long calcLockWaitTime(long remainTime) {
return remainTime;
}
protected int failedLocksLimit() {
return 0;
}
复制
计算一个加锁剩余的时间remainTime和尝试加锁需要等待的时间lockWaitTime(默认就等于reaminTime)
查看允许加锁失败的个数,即对所有的锁对象,允许有几个锁对象加锁失败
创建一个集合来保存加锁成功的锁对象
遍历所有的锁对象,依次调用带超时参数的tryLock()来尝试进行加锁
如果在规定时间内加锁成功,则保存到acquiredLocks中
如果加锁失败则需要判断是否允许有加锁失败,这里failedLocksLimit为0,即只要有一个锁对象尝试加锁失败,那么就要释放掉之前成功的所有锁
为什么要这样呢?这就是上面预防死锁的方案中提到的破坏不可抢占条件,这样可以预防客户端加锁的时候产生死锁
判断加锁的时间是否超时,如果超时了则释放掉所有的锁资源,这样做也会防止死锁
如果tryLock加锁失败,那么lockInterruptibly()方法的while逻辑就不会跳出循环,他会继续尝试不断的进行加锁,直到所有的锁资源都成功获取
NO.3

释放锁原理

public void unlock() {
List<RFuture<Void>> futures = new ArrayList<>(locks.size());
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
}
for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
}
复制
我们发现这个释放锁更简单,就是遍历所有的锁资源,然后调用其自身的释放锁方法,进行释放
同步等待所有的资源释放完毕,结束运行
释放锁的原理何其简单,只是一些循环逻辑,调用不同资源本身的释放逻辑来完成锁资源释放

点个在看你最好看