一、基于zk的分布式锁
在单体应用中我们可以通过synchronized或ReentrantLock来实现锁,但是在分布式系统中就不好使了,因为synchronized和ReentrantLock只能作用于一个JVM,这时候就只能借助第三方组件,比如Redisson、数据库,当然,zk同样可以。
1.1、实现原理
类型 | 描述 |
PERSISTENT | 持久节点 |
PERSISTENT_SEQUENTIAL | 持久序号节点 |
EPHEMERAL | 临时节点 |
EPHEMERAL_SEQUENTIAL | 临时序号节点 |
节点的生命周期和客户端的会话绑定,一旦客户端和zk的会话失效,这个节点就会被删除
父节点会维护子节点的创建顺序,为每个子节点分配一个递增的序号,以后缀的形式体现在子节点的名称中
1.2、实现思路
客户端调用zk的create()方法,创建一个名字为/lock的临时序号节点
客户端调用zk的getChildren()方法获取/lock下所有的子节点,同时添加对/lock的节点监听
客户端获取到所有的子节点path后,如果发现自己在第1步中创建的节点是所有节点中最小的,就认为自己获得了锁
如果在第3步中发现不是最小的,那么等待下一次子节点变更通知的时候,再进行第3步
如果某个节点被删除,监听它的客户端就会收到通知,然后执行第3步获得锁
客户端释放锁很简单,就是在执行完业务逻辑或者发生异常后删除自己创建的节点即可
客户端调用zk的create()方法,创建一个名字为/lock的临时序号节点
客户端调用zk的getChildren()方法获取/lock下所有的子节点
客户端获取到所有的子节点path后,如果发现自己在第1步中创建的节点是所有节点中最小的,就认为自己获得了锁
如果在第3步中发现不是最小的,那么找到比自己小的相邻节点并对其添加监听
如果某个节点被删除,监听它的客户端就会收到通知,然后执行第3步获得锁
客户端释放锁很简单,就是在执行完业务逻辑或者发生异常后删除自己创建的节点即可
1.3、实现代码
package com.ayo.zklock;
import com.ayo.zk.ZooKeeperClient;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Distributed_ZKLock implements Lock, Watcher {
private ZooKeeper zk = null;
// 根节点
private String ROOT_LOCK = "/lock";
// 竞争的资源
private String lockName;
// 等待的前一个锁
private String WAIT_LOCK;
// 当前锁
private String CURRENT_LOCK;
// 计数器
private CountDownLatch countDownLatch;
private int sessionTimeout = 10000;
public Distributed_ZKLock(String lockName) {
this.lockName = lockName;
try {
zk = new ZooKeeper("192.168.209.128:2181,192.168.209.129:2181,192.168.209.130:2181", sessionTimeout, this);
Stat stat = zk.exists(ROOT_LOCK, false);
if (stat == null) {
//如果根节点不存在,就创建根节点,注意,根节点是持久节点
zk.create(ROOT_LOCK, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void lock() {
try {
if (this.tryLock()) {
System.out.println(Thread.currentThread().getName() + "-" + lockName + "获得了锁");
return;
} else {
waitForLock(WAIT_LOCK, sessionTimeout);
}
}catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
this.lock();
}
@Override
public boolean tryLock() {
try {
//之所以定义这个变量,是因为不同业务之间的锁都在一起放,但是lockname不同
String spiltFlag = "_lock_";
//创建的是临时序号节点
CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + spiltFlag, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//取出来/lock节点下的所有子节点
List<String> childrenNodes = zk.getChildren(ROOT_LOCK, false);
//根据lockname筛选出指定业务的锁
List<String> locks = new ArrayList<>();
childrenNodes.forEach(node -> {
//根据spiltFlag分隔开后,就是lockName和节点的序号了
String _node = node.split(spiltFlag)[0];
if (_node.equals(lockName)) {
locks.add(node);
}
});
//排序
Collections.sort(locks);
//判断当前节点是否是最小的临时序号节点,是的话,返回获取锁成功
if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + locks.get(0))) {
return true;
}
//如果当前节点不是最小的节点,就找到自己的前一个节点
String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
WAIT_LOCK = locks.get(Collections.binarySearch(locks, prevNode) - 1);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
try {
if (this.tryLock()) {
return true;
}
return waitForLock(WAIT_LOCK, time);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
if (stat != null) {
this.countDownLatch = new CountDownLatch(1);
//计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
this.countDownLatch = null;
}
return true;
}
@Override
public void unlock() {
try {
zk.delete(CURRENT_LOCK, -1);
CURRENT_LOCK = null;
zk.close();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void process(WatchedEvent watchedEvent) {
if (this.countDownLatch != null) {
this.countDownLatch.countDown();
}
}
}
复制
package com.ayo.zklock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 测试类,模拟河南人抢票
*/
public class TestZkLock {
private static final Logger LOG = LoggerFactory.getLogger(TestZkLock.class);
private Integer ticketNum = 10;
class TicketThread implements Runnable {
@Override
public void run() {
Distributed_ZKLock lock = new Distributed_ZKLock("train");
lock.lock();
try {
if (ticketNum > 0) {
LOG.info(Thread.currentThread().getName() + "正在抢票,余票为:" + --ticketNum);
}else {
LOG.info(Thread.currentThread().getName() + "没抢住票");
}
}finally {
lock.unlock();
}
}
}
public void ticketStart() {
TicketThread thread = new TicketThread();
for (int i = 1; i < 16; i++) {
Thread t = new Thread(thread, "河南第" + i + "人");
t.start();
}
}
public static void main(String[] args) {
new TestZkLock().ticketStart();
}
}
复制