暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

基于zookeeper的分布式锁

Java Miraculous 2021-08-24
271
之前写过两篇关于zookeeper(以下简称zk)的基础文章,对基础不熟的同学可以看下:初识zookeeperzookeeper的客户端操作以及集群介绍。今天我们来点高端货,基于zk的分布式锁,以及zk的一致性是怎么实现的。
  • 一、基于zk的分布式锁

在单体应用中我们可以通过synchronized或ReentrantLock来实现锁,但是在分布式系统中就不好使了,因为synchronized和ReentrantLock只能作用于一个JVM,这时候就只能借助第三方组件,比如Redisson、数据库,当然,zk同样可以。

  • 1.1、实现原理

初识zookeeper这篇文章中介绍了zk中的节点类型,一共有四种

类型

描述

PERSISTENT

持久节点

PERSISTENT_SEQUENTIAL

持久序号节点

EPHEMERAL

临时节点

EPHEMERAL_SEQUENTIAL

临时序号节点

zk的分布式锁基于临时序号节点实现,它有以下特点:
  • 节点的生命周期和客户端的会话绑定,一旦客户端和zk的会话失效,这个节点就会被删除

  • 父节点会维护子节点的创建顺序,为每个子节点分配一个递增的序号,以后缀的形式体现在子节点的名称中

  • 1.2、实现思路

  1. 客户端调用zk的create()方法,创建一个名字为/lock的临时序号节点

  2. 客户端调用zk的getChildren()方法获取/lock下所有的子节点,同时添加对/lock的节点监听

  3. 客户端获取到所有的子节点path后,如果发现自己在第1步中创建的节点是所有节点中最小的,就认为自己获得了锁

  4. 如果在第3步中发现不是最小的,那么等待下一次子节点变更通知的时候,再进行第3步

  5. 如果某个节点被删除,监听它的客户端就会收到通知,然后执行第3步获得锁

  6. 客户端释放锁很简单,就是在执行完业务逻辑或者发生异常后删除自己创建的节点即可

其实这种思路是有问题的,试想一下,假如有10个客户端过来创建了10个节点,现在客户端1执行完毕,删除了节点1,这时候由于其余9个客户端监听的都是/lock节点,它们都能收到通知,那试想一下,如果是100个节点,其中10个断开,是不是剩下的90个节点每个都会收到zk服务器发的10个通知,一共就是900个通知,节点越多,发送的通知也就越多,这就是羊群效应
改进:
  1. 客户端调用zk的create()方法,创建一个名字为/lock的临时序号节点

  2. 客户端调用zk的getChildren()方法获取/lock下所有的子节点

  3. 客户端获取到所有的子节点path后,如果发现自己在第1步中创建的节点是所有节点中最小的,就认为自己获得了锁

  4. 如果在第3步中发现不是最小的,那么找到比自己小的相邻节点并对其添加监听

  5. 如果某个节点被删除,监听它的客户端就会收到通知,然后执行第3步获得锁

  6. 客户端释放锁很简单,就是在执行完业务逻辑或者发生异常后删除自己创建的节点即可

  • 1.3、实现代码

用代码模拟抢火车票,15个人抢10张票,最后剩下的应该是5张,不能出现超卖。
分布式锁
    package com.ayo.zklock;


    import com.ayo.zk.ZooKeeperClient;
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;


    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;


    public class Distributed_ZKLock implements Lock, Watcher {


    private ZooKeeper zk = null;
    // 根节点
    private String ROOT_LOCK = "/lock";
    // 竞争的资源
    private String lockName;
    // 等待的前一个锁
    private String WAIT_LOCK;
    // 当前锁
    private String CURRENT_LOCK;
    // 计数器
    private CountDownLatch countDownLatch;


    private int sessionTimeout = 10000;


    public Distributed_ZKLock(String lockName) {
    this.lockName = lockName;
    try {
    zk = new ZooKeeper("192.168.209.128:2181,192.168.209.129:2181,192.168.209.130:2181", sessionTimeout, this);
    Stat stat = zk.exists(ROOT_LOCK, false);
    if (stat == null) {
    //如果根节点不存在,就创建根节点,注意,根节点是持久节点
    zk.create(ROOT_LOCK, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    }catch (Exception e) {
    e.printStackTrace();
    }
    }


    @Override
    public void lock() {
    try {
    if (this.tryLock()) {
    System.out.println(Thread.currentThread().getName() + "-" + lockName + "获得了锁");
    return;
    } else {
    waitForLock(WAIT_LOCK, sessionTimeout);
    }
    }catch (Exception e) {
    e.printStackTrace();
    }
    }


    @Override
    public void lockInterruptibly() throws InterruptedException {
    this.lock();
    }


    @Override
    public boolean tryLock() {
    try {
    //之所以定义这个变量,是因为不同业务之间的锁都在一起放,但是lockname不同
    String spiltFlag = "_lock_";
    //创建的是临时序号节点
    CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + spiltFlag, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    //取出来/lock节点下的所有子节点
    List<String> childrenNodes = zk.getChildren(ROOT_LOCK, false);
    //根据lockname筛选出指定业务的锁
    List<String> locks = new ArrayList<>();
    childrenNodes.forEach(node -> {
    //根据spiltFlag分隔开后,就是lockName和节点的序号了
    String _node = node.split(spiltFlag)[0];
    if (_node.equals(lockName)) {
    locks.add(node);
    }
    });
    //排序
    Collections.sort(locks);
    //判断当前节点是否是最小的临时序号节点,是的话,返回获取锁成功
    if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + locks.get(0))) {
    return true;
    }
    //如果当前节点不是最小的节点,就找到自己的前一个节点
    String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
    WAIT_LOCK = locks.get(Collections.binarySearch(locks, prevNode) - 1);
    } catch (Exception e) {
    e.printStackTrace();
    }
    return false;
    }


    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    try {
    if (this.tryLock()) {
    return true;
    }
    return waitForLock(WAIT_LOCK, time);
    } catch (Exception e) {
    e.printStackTrace();
    }
    return false;
    }


    private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
    Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
    if (stat != null) {
    this.countDownLatch = new CountDownLatch(1);
    //计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
    this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
    this.countDownLatch = null;
    }
    return true;
    }


    @Override
    public void unlock() {
    try {
    zk.delete(CURRENT_LOCK, -1);
    CURRENT_LOCK = null;
    zk.close();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }


    @Override
    public Condition newCondition() {
    return null;
    }


    @Override
    public void process(WatchedEvent watchedEvent) {
    if (this.countDownLatch != null) {
    this.countDownLatch.countDown();
    }
    }
    }


    复制
    测试类:
      package com.ayo.zklock;




      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;


      /**
      * 测试类,模拟河南人抢票
      */
      public class TestZkLock {


      private static final Logger LOG = LoggerFactory.getLogger(TestZkLock.class);


      private Integer ticketNum = 10;


      class TicketThread implements Runnable {


      @Override
      public void run() {
      Distributed_ZKLock lock = new Distributed_ZKLock("train");
      lock.lock();
      try {
      if (ticketNum > 0) {
      LOG.info(Thread.currentThread().getName() + "正在抢票,余票为:" + --ticketNum);
      }else {
      LOG.info(Thread.currentThread().getName() + "没抢住票");
      }
      }finally {
      lock.unlock();
      }
      }
      }


      public void ticketStart() {
      TicketThread thread = new TicketThread();
      for (int i = 1; i < 16; i++) {
      Thread t = new Thread(thread, "河南第" + i + "人");
      t.start();
      }
      }


      public static void main(String[] args) {
      new TestZkLock().ticketStart();
      }
      }


      复制
      可以自己跑着试试,观察下。思路出来了,写代码只是一个体力活!
      文章转载自Java Miraculous,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论