基于Zookeeper的分布式共享鎖

基于Zookeeper的分布式共享鎖

實現(xiàn)原理

  1. 基于Zookeeper、Lock實現(xiàn)的分布共享式鎖
  2. 構(gòu)造初始化Zookeeper連接
  3. 在lock中嘗試獲取鎖(tryLock)
    1. 首先創(chuàng)建當(dāng)前連接的節(jié)點
    2. 獲取所有相關(guān)節(jié)點,并排序
    3. 若當(dāng)前為只有一個節(jié)點或為最小值,直接返回獲取鎖成功
    4. 否則獲取前一個節(jié)點,監(jiān)聽事件,讓當(dāng)前節(jié)點進(jìn)入等待狀態(tài)
  4. 如果監(jiān)聽到事件為上刪除事件,釋放鎖
  5. 刪除節(jié)點,釋放資源

代碼實現(xiàn)

  @Data
  @Slf4j
  public class MyZkDistributedLock implements Lock, Watcher {
  
      // 超時時間
      private static final int SESSION_TIMEOUT = 5000;
      // zookeeper server列表
      private String hosts;
      private String groupNode = "locks";
      private String subNode = "sub";
  
      private String lockName;
      private ZooKeeper zk;
      // 當(dāng)前client創(chuàng)建的子節(jié)點
      private String thisPath;
      // 當(dāng)前client等待的子節(jié)點
      private String waitPath;
      private List<Exception> exceptionList = new ArrayList<>();
      private CountDownLatch latch = new CountDownLatch(1);
  
      public MyZkDistributedLock(String hosts, String lockName) {
          this.hosts = hosts;
          this.lockName = lockName;
          try {
              // 連接zookeeper
              zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
              Stat stat = zk.exists(groupNode, false);
              if (stat == null) {
                  // 如果根節(jié)點不存在,則創(chuàng)建根節(jié)點
                  zk.create(groupNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                          CreateMode.PERSISTENT);
              }
          } catch (IOException e) {
              log.error("Zk連接異常", e);
              exceptionList.add(e);
          } catch (InterruptedException e) {
              log.error("Zk連接異常", e);
              exceptionList.add(e);
          } catch (KeeperException e) {
              log.error("Zk連接異常", e);
              exceptionList.add(e);
          }
      }
  
      @Override
      public void lock() {
          if (exceptionList.size() > 0) {
              throw new LockException(exceptionList.get(0));
          }
          try {
              if (this.tryLock()) {
                  log.info("------------>線程:{},鎖:{},獲得", Thread.currentThread().getName(), lockName);
                  return;
              } else {
                  // 等待鎖
                  this.latch.await();
              }
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
  
      @Override
      public void lockInterruptibly() throws InterruptedException {
  
      }
  
      @Override
      public boolean tryLock() {
          try {
              String splitStr = "_lock_";
              if (lockName.contains(splitStr)) {
                  throw new MyZkDistributedLock.LockException("鎖名有誤");
              }
              // 創(chuàng)建子節(jié)點
              thisPath = zk
                      .create("/" + groupNode + "/" + lockName + subNode + splitStr, null,
                              Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  
              // 注意, 沒有必要監(jiān)聽"/locks"的子節(jié)點的變化情況
              List<String> childrenNodes = zk.getChildren("/" + groupNode, false);
  
              // 取出所有l(wèi)ockName的鎖
              List<String> lockObjects = new ArrayList<String>();
              for (String node : childrenNodes) {
                  String _node = node.split(splitStr)[0];
                  if (_node.equals(lockName)) {
                      lockObjects.add(node);
                  }
              }
  
              // 列表中只有一個子節(jié)點, 那肯定就是thisPath, 說明client獲得鎖
              if (lockObjects.size() == 1) {
                  return true;
              } else {
                  String thisNode = thisPath.substring(("/" + groupNode + "/").length());
                  // 排序
                  Collections.sort(lockObjects);
                  int index = lockObjects.indexOf(thisNode);
                  if (index == -1) {
                      // never happened
                  } else if (index == 0) {
                      // inddx == 0, 說明thisNode在列表中最小, 獲得鎖
                      return true;
                  } else {
                      // 獲得排名比thisPath前1位的節(jié)點
                      this.waitPath = "/" + groupNode + "/" + lockObjects.get(index - 1);
                      // 在waitPath上注冊監(jiān)聽器, 當(dāng)waitPath被刪除時, zookeeper會回調(diào)監(jiān)聽器的process方法
                      zk.getData(waitPath, true, new Stat());
                  }
              }
          } catch (Exception e) {
  
          }
          return false;
  
      }
  
      @Override
      public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
          return tryLock();
      }
  
      @Override
      public void unlock() {
          try {
              log.info("釋放鎖 {}", thisPath);
              zk.delete(thisPath, -1);
              thisPath = null;
              zk.close();
          } catch (InterruptedException e) {
              e.printStackTrace();
          } catch (KeeperException e) {
              e.printStackTrace();
          }
      }
  
      @Override
      public Condition newCondition() {
          return null;
      }
  
      @Override
      public void process(WatchedEvent event) {
          try {
              // 發(fā)生了waitPath的刪除事件
              if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                  this.latch.countDown();
              }
          } catch (Exception e) {
              e.printStackTrace();
          }
  
      }
  
      public class LockException extends RuntimeException {
  
          private static final long serialVersionUID = 1L;
  
          public LockException(String e) {
              super(e);
          }
  
          public LockException(Exception e) {
              super(e);
          }
      }
  }

總結(jié)

總體來說,實現(xiàn)并不難,我認(rèn)為主要就是排序號監(jiān)聽上一個節(jié)點的刪除事件,依此類推,最后實現(xiàn)所有節(jié)點的監(jiān)聽

代碼地址

源碼GitHub地址

參考

分布式鎖與實現(xiàn)(二)——基于ZooKeeper實現(xiàn)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容