redisson分布式锁文章回顾
NO.1

回顾

NO.2

api使用

Config config = new Config();
config.useSingleServer()
.setAddress("redis://192.168.1.17:6379");
RedissonClient client = Redisson.create(config);
RReadWriteLock lock = client.getReadWriteLock("anyLock");
RLock readLock = lock.readLock();
readLock.lock();
readLock.unlock();
复制
声明了一个名为anyLock的读写锁对象
通过readLock()方法来获取其对应的读锁对象
调用对象的lock/unlock来完成对应的操作
首先先来分析一下readLock是怎么获取读锁对象的
@Override
public RLock readLock() {
return new RedissonReadLock(commandExecutor, getRawName());
}
复制
readLock()方法直接创建了一个RedissonReadLock对象,并且这个对象继承了RedissonLock
这样我们就能猜到,读写锁又是通过RedissonLock框架来完成自己的对应的逻辑,通过使用不同的lua脚本来完成对应功能的开发
我们继续像之前一样来着重分析lua脚本来看一下读锁是怎么来实现的吧
核心的逻辑代码
@Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[2] .. ':1', 1); " +
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"local key = KEYS[2] .. ':' .. ind;" +
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
"local remainTime = redis.call('pttl', KEYS[1]); " +
"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
"return nil; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),
unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId));
复制
熟悉的配方,依然是通过操作lua脚本来实现锁的逻辑
我们重点分析lua脚本
参数说明:
keys[1] : anyLock 就是我们声明的锁名字
keys[2] : {anyLock}:客户端唯一标识:rwlock_timeout
假设唯一标识为uuid01:threadId01,那么keys[2]就变成了{anyLock}:uuid01:threadId01:rwlock_timeout
argv[1] : 30000ms
argv[2] : 客户端唯一标识=>uuid01:threadId01
argv[3] : 客户端唯一标识:write=>uuid01:threadId01:write
local mode = redis.call('hget', KEYS[1], 'mode');
if (mode == false) then
redis.call('hset', KEYS[1], 'mode', 'read');
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('set', KEYS[2] .. ':1', 1);
redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then
local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1);
local key = KEYS[2] .. ':' .. ind;
redis.call('set', key, 1);
redis.call('pexpire', key, ARGV[1]);
local remainTime = redis.call('pttl', KEYS[1]);
redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1]));
return nil;
end;
return redis.call('pttl', KEYS[1]);
复制
我们通过不同的情景来分析加锁原理
情景1:客户端A加锁--客户端标识uuid01:thread01
获取锁anyLock的mode属性,第一次加锁,mode不存在,所哟mode==false成立,说明客户端A可以加锁
将锁的mode值设置为read,标识加读锁成功
设置持有锁的客户端标识,并设置加锁次数
set {anyLock}:uuid_01:threadId_01:rwlock_timeout:1 1设置一个标识位,标识同一个锁的加锁次数,以及加锁的超时时间
给锁和标识位设置过期时间
客户端A加锁成功,此时redis中的数据如下
{
"anyLock": {
"mode": "read",
"uuid01:threadId01": 1
}
}
{anyLock}:uuid01:threadId01:rwlock_timeout:1 1
复制情景2:客户端B加锁--客户端标识uuid02:thread02
获取锁对象的mode属性值,客户端A已经加读锁成功,那么mode肯定存在,就会执行第二个判断条件
判断是否为读锁或者说加解锁的客户端是自己本身,客户端A加的是读锁,那么判断成立
执行hincrby anyLock uuid02:threadId02 1 将客户端B加锁次数加1,并且获得ind=1
拼接key = {anyLock}:uuid02:threadId02:rwlock_timeout:1,并执行set {anyLock}:uuid02:threadId02:rwlock_timeout:1 1 设置客户端B加锁的数据 并设置过期时间为30s
比较当前锁的过期时间和30s的大小,取其中最大的值,然后设置锁的过期时间,保持锁的过期时间和最大的过期时间一致
客户端B则加读锁成功,此时redis中的数据如下
{
"anyLock": {
"mode": "read",
"uuid01:threadId01": 1,
"uuid02:threadId02": 1
}
}
{anyLock}:uuid01:threadId01:rwlock_timeout:1 1
{anyLock}:uuid02:threadId02:rwlock_timeout:1 1
复制情景3:客户端A再次加锁--客户端标识uuid01:thread01
判断anyLock的mode属性是否存在,应为当前客户端A已经加过读锁了,所以mode一定是存在的
判断mode是否为读锁,或者是写锁并且加锁的客户端是自己,这里客户端A加上的是读锁,所以条件成立
执行hincrby anyLock uuid01:threadId01 1 将加锁次数加1,并且获得ind=2
拼接key = {anyLock}:uuid01:threadId01:rwlock_timeout:2,并且执行set {anyLock}:uuid01:threadId01:rwlock_timeout:2 1 设置重复加锁的数据 并设置过期时间为30s
比较当前锁的过期时间和30s的大小,取其中最大的值,然后设置锁的过期时间,保持锁的过期时间和最大的过期时间一致
如果客户端A再次重复加锁 其实就是再次设置一个新的标志set {anyLock}:uuid_01:threadId_01:rwlock_timeout:3 1 然后将锁的过期时间设置为最大的过期时间,此时redis中的数如下所示
{
"anyLock": {
"mode": "read",
"uuid01:threadId01": 2,
"uuid02:threadId02": 1
}
}
{anyLock}:uuid01:threadId01:rwlock_timeout:1 1
{anyLock}:uuid01:threadId01:rwlock_timeout:2 1
{anyLock}:uuid01:threadId01:rwlock_timeout:3 1
{anyLock}:uuid02:threadId02:rwlock_timeout:1 1
复制
NO.3

总结


点个在看你最好看