初識(shí)分布式鎖(二):ZooKeeper分布式鎖原理淺析及實(shí)戰(zhàn)案例

寫(xiě)作不易,點(diǎn)贊收藏關(guān)注以便下次再看,感謝爸爸們的支持~

上回咱們說(shuō)到,用Mysql數(shù)據(jù)庫(kù)實(shí)現(xiàn)了分布式鎖。實(shí)現(xiàn)起來(lái)相對(duì)簡(jiǎn)單。

但是缺陷也相對(duì)比較明顯,一方面是SQL鎖沒(méi)有過(guò)期機(jī)制,如果不保持高可用的情況下,線程沒(méi)有釋放掉鎖就會(huì)出現(xiàn)死鎖。

另一方面是因?yàn)镾QL本身性能并不高,因此采用SQL加鎖的方式會(huì)極大拖累整個(gè)系統(tǒng)的性能。

基于以上各點(diǎn),本期咱們沿著Zookeeper展開(kāi),介紹如何使用Zookeeper實(shí)現(xiàn)相應(yīng)的分布式鎖。

Zookeeper簡(jiǎn)介

在開(kāi)始咱的文章前,先來(lái)介紹下Zookeeper是個(gè)什么東西。咱們先來(lái)看下百度百科對(duì)于Zookeeper的定義是什么。

ZooKeeper是一個(gè)分布式的,開(kāi)放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),是Google的Chubby一個(gè)開(kāi)源的實(shí)現(xiàn),是Hadoop和Hbase的重要組件。它是一個(gè)為分布式應(yīng)用提供一致性服務(wù)的軟件,提供的功能包括:配置維護(hù)、域名服務(wù)、分布式同步、組服務(wù)等。

ZooKeeper的目標(biāo)就是封裝好復(fù)雜易出錯(cuò)的關(guān)鍵服務(wù),將簡(jiǎn)單易用的接口和性能高效、功能穩(wěn)定的系統(tǒng)提供給用戶。

ZooKeeper包含一個(gè)簡(jiǎn)單的原語(yǔ)集,提供Java和C的接口。

ZooKeeper代碼版本中,提供了分布式獨(dú)享鎖、選舉、隊(duì)列的接口,代碼在$zookeeper_home\src\recipes。其中分布鎖和隊(duì)列有Java和C兩個(gè)版本,選舉只有Java版本。

換成比較通俗易懂的話來(lái)說(shuō),Zookeeper其實(shí)本質(zhì)上就像一個(gè)文件管理系統(tǒng)。其用類(lèi)似文件路徑的方式管理、監(jiān)聽(tīng)多個(gè)節(jié)點(diǎn)(Znode),同時(shí)判斷當(dāng)前每個(gè)節(jié)點(diǎn)上機(jī)器的狀態(tài)(是否宕機(jī)、是否斷開(kāi)連接等),從而達(dá)到分布式協(xié)同的操作。

如下是ZK管理功能的一個(gè)簡(jiǎn)要說(shuō)明。

四種節(jié)點(diǎn)

提到ZK,就不得不提一下ZK的四種基本節(jié)點(diǎn),他們分別是:

  • 持久化節(jié)點(diǎn)(PERSISTENT):該節(jié)點(diǎn)持久存在,不會(huì)因?yàn)榭蛻舳藬嚅_(kāi)連接而刪除。
  • 持久化順序節(jié)點(diǎn)(PERSISTENT_SEQUENTIAL):該節(jié)點(diǎn)會(huì)按照一定順序持久存在,亦不會(huì)因?yàn)榭蛻舳藬嚅_(kāi)連接而刪除。
  • 臨時(shí)節(jié)點(diǎn)(EPHEMERAL):客戶端斷開(kāi)連接后,該節(jié)點(diǎn)會(huì)被刪除。
  • 臨時(shí)順序節(jié)點(diǎn)(EPHEMERAL_SEQUENTIAL):客戶端斷開(kāi)連接后該節(jié)點(diǎn)會(huì)被刪除;會(huì)依照一定順序進(jìn)行排列。

這四種節(jié)點(diǎn)組成了最基本的ZK的功能。

事件監(jiān)聽(tīng)

除了四種節(jié)點(diǎn)以外,不得不提一下ZK本身實(shí)現(xiàn)的Watcher(事件監(jiān)聽(tīng)器),其是 ZooKeeper 中的一個(gè)很重要的特性。

ZooKeeper 允許用戶在指定節(jié)點(diǎn)上注冊(cè)一些 Watcher,并且在一些特定事件觸發(fā)的時(shí)候,ZooKeeper 服務(wù)端會(huì)將事件通知到感興趣的客戶端上去,該機(jī)制是 ZooKeeper 實(shí)現(xiàn)分布式協(xié)調(diào)服務(wù)的重要特性。

同時(shí),該機(jī)制也是分布式鎖實(shí)現(xiàn)的重要依賴(lài)特性之一。

原理淺析

加鎖原理:

ZK實(shí)現(xiàn)分布式鎖主要依賴(lài)于上述的兩個(gè)機(jī)制:

1、臨時(shí)順序節(jié)點(diǎn)。

2、事件監(jiān)聽(tīng)。

首先,每個(gè)程序需要加鎖的時(shí)候,會(huì)需要一個(gè)相應(yīng)的加鎖路徑(這里我們假設(shè)為“/curatorLock”),在ZK中根據(jù)這個(gè)加鎖路徑去生成一個(gè)新的臨時(shí)節(jié)點(diǎn)node1。

假設(shè)當(dāng)前新生成的臨時(shí)節(jié)點(diǎn)a,為第一個(gè)臨時(shí)節(jié)點(diǎn)。節(jié)點(diǎn)node1做為第一個(gè)申請(qǐng)鎖的程序,自然是有權(quán)利進(jìn)行上鎖的,那么自然就是加鎖成功了。

但是如果當(dāng)前節(jié)點(diǎn)node1前面已經(jīng)有了別的節(jié)點(diǎn)加了鎖。那么這個(gè)時(shí)候顯然我們是不能獲取鎖的,因此只能采用事件監(jiān)聽(tīng)的機(jī)制,對(duì)前一個(gè)節(jié)點(diǎn)進(jìn)行監(jiān)聽(tīng),直到前一個(gè)節(jié)點(diǎn)釋放了鎖。

三個(gè)乃至更多個(gè)節(jié)點(diǎn)的情況則相似。整個(gè)加鎖的邏輯并不復(fù)雜。

解鎖原理

解鎖的主要操作跟加鎖相反,首先需要將當(dāng)前監(jiān)聽(tīng)自己的監(jiān)聽(tīng)器都刪除,從而告訴別的機(jī)器,“我用完鎖啦~”。以便其余機(jī)器重新獲取,或者重新設(shè)置監(jiān)聽(tīng)對(duì)象和監(jiān)聽(tīng)狀態(tài)。

緊接著,獲取著鎖的節(jié)點(diǎn)(node0)會(huì)將自己進(jìn)行刪除,從而使得別的節(jié)點(diǎn)可以成為首節(jié)點(diǎn),并進(jìn)行加鎖的操作。

由此一來(lái),整個(gè)解鎖的過(guò)程就實(shí)現(xiàn)了。

Zookeeper分布式鎖實(shí)戰(zhàn)

代碼實(shí)現(xiàn)

這里我們借助CuratorFramework框架以及框架自帶的InterProcessMutex互斥鎖實(shí)現(xiàn)相應(yīng)的邏輯。

@Component
@Slf4j
public class ZkClientUtil {

    //zk連接ip
    private final String zkServers = "你的zk服務(wù)器Ip";

    private CuratorFramework curatorFramework;

    // zk自增存儲(chǔ)node
    private String lockPath = "/curatorLock";

    InterProcessMutex lock;

    @PostConstruct
    public void initZKClient(){
        //如果等待時(shí)間 小于最大自旋時(shí)間則進(jìn)行自旋
        LOGGER.info(">>>>Zk連接中....");
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(zkServers) //zk 服務(wù)地址
                .sessionTimeoutMs(5000)  // 會(huì)話超時(shí)時(shí)間
                .connectionTimeoutMs(5000) // 連接超時(shí)時(shí)間
                .retryPolicy(retryPolicy)
                .build();
        curatorFramework.start();
        lock = new InterProcessMutex(curatorFramework, lockPath);
        LOGGER.info(">>>>Zk連接成功!");
    }

    /**
     * 獲取對(duì)應(yīng)的節(jié)點(diǎn)鎖
     */
    @SneakyThrows
    public void getLock(){
        //設(shè)置超時(shí)時(shí)間
        boolean acquire = lock.acquire(50, TimeUnit.SECONDS);
        if (acquire){
            LOGGER.info("ZK加鎖成功:"+Thread.currentThread().getId());
        }else {
            LOGGER.info("ZK加鎖失?。?+Thread.currentThread().getId());
        }
    }

    /**
     * 對(duì)應(yīng)節(jié)點(diǎn)進(jìn)行解鎖
     */
    @SneakyThrows
    public void unlock(){
        lock.release();
        LOGGER.info("ZK解鎖成功"+Thread.currentThread().getId());
    }
}

然后只需要對(duì)咱們?cè)鹊纳弦黄诖a做一點(diǎn)小小的改動(dòng)~

    @SneakyThrows
    public synchronized Boolean deductProduct(ProductPO productPO){
        CompletableFuture<Exception> subThread = CompletableFuture.supplyAsync(()->{
            try{
                zkClientUtil.getLock(); // 替換關(guān)鍵的加鎖代碼
                                ....
            }finally {
                zkClientUtil.unlock(); // 替換關(guān)鍵的解鎖代碼
            }
        });
        Exception exception = subThread.get();
        if (exception !=null){
            throw exception;
        }
        return true;
    }

然后自豪的運(yùn)行代碼,就得到運(yùn)行的結(jié)果如下:

可以看到結(jié)果確實(shí)是符合預(yù)期~

源碼淺析

然而作為全宇宙最靚的崽,光學(xué)會(huì)用怎么能滿足我呢,大家肯定也都好奇curatorFramework底層原理是咋實(shí)現(xiàn)的吧~

首先我們看看加鎖部分,關(guān)鍵代碼主要是acquire部分:

public boolean acquire(long time, TimeUnit unit) throws Exception {
  return this.internalLock(time, unit);
}

acquire部分代碼緊接著深入到internalLock方法中查看具體的邏輯。

private boolean internalLock(long time, TimeUnit unit) throws Exception {
  Thread currentThread = Thread.currentThread();
  // 從記錄表中嘗試獲取線程的鎖數(shù)據(jù)
  InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
  if (lockData != null) {
    // 數(shù)據(jù)不為空,實(shí)現(xiàn)重入,計(jì)數(shù)+1且返回加鎖成功
    lockData.lockCount.incrementAndGet();
    return true;
  } else {
    // 數(shù)據(jù)為空,進(jìn)行加鎖操作 (關(guān)鍵代碼,深入查看)
    String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
    if (lockPath != null) {
      //將鎖的記錄保存到ThreadData中方便存儲(chǔ)
      InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
      this.threadData.put(currentThread, newLockData);
      return true;
    } else {
      return false;
    }
  }
}

再追入嘗試加鎖的模塊代碼中,其中最關(guān)鍵的代碼是createTheLock方法和internalLockLoop方法。

    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception{
        final long      startMillis = System.currentTimeMillis(); // 獲取當(dāng)前的系統(tǒng)時(shí)間
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null; // 單位轉(zhuǎn)換相同
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;
        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone ){
            isDone = true;
            try{
              /*關(guān)鍵方法>>>>> 根據(jù)path創(chuàng)建臨時(shí)順序節(jié)點(diǎn)并獲取到節(jié)點(diǎn)相應(yīng)路徑*/
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
              /*關(guān)鍵方法>>>>> 這里根據(jù)對(duì)應(yīng)的鎖的子節(jié)點(diǎn),去判斷對(duì)應(yīng)要監(jiān)視的對(duì)象*/
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }catch ( KeeperException.NoNodeException e ){
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ){
                  //如果重試策略允許重試,則進(jìn)行重試。
                    isDone = false;
                }else{
                    throw e;
                }
            }
        }
        if ( hasTheLock ){
          //如果持有鎖了,則返回加鎖加點(diǎn)的路徑
            return ourPath;
        }
        return null;
    }

createTheLock方法,會(huì)創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn),以供后續(xù)的加鎖使用。

    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception{
        String ourPath;
        if ( lockNodeBytes != null ) {
            ourPath = client
              .create()
              .creatingParentContainersIfNeeded()
              .withProtection()
              .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
              .forPath(path, lockNodeBytes);
        }else{
            ourPath = client
              .create()
              .creatingParentContainersIfNeeded()
              .withProtection()
              .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
              .forPath(path);
        }
        return ourPath;
    }

internalLockLoop方法,會(huì)首先根據(jù)當(dāng)前鎖的路徑獲取對(duì)應(yīng)子節(jié)點(diǎn)(即已經(jīng)上鎖的節(jié)點(diǎn)),緊接著會(huì)根據(jù)一個(gè)關(guān)鍵變量maxLeases(默認(rèn)為1,大概率可以通過(guò)修改maxLeases來(lái)控制一把鎖是否可以多人同時(shí)獲取),來(lái)判斷當(dāng)前的節(jié)點(diǎn)能否獲取分布式鎖。

如果這個(gè)時(shí)候,子節(jié)點(diǎn)數(shù)組的長(zhǎng)度超過(guò)了maxLeases,那么我當(dāng)前節(jié)點(diǎn)沒(méi)法獲取到鎖,也就需要對(duì)數(shù)組長(zhǎng)度length-maxLeases的節(jié)點(diǎn)進(jìn)行監(jiān)聽(tīng),以期待獲取相應(yīng)的鎖。同時(shí),該組件還對(duì)超時(shí)的情況做了特殊的處理,以避免死鎖或不斷等待的情況出現(xiàn)。

    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception{
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try{
            if ( revocable.get() != null ){
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ){
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() ){
                    haveTheLock = true;
                } else{
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                    synchronized(this){
                        try{
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null ){
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 ){
                                    doDelete = true;    // 監(jiān)聽(tīng)超時(shí)了,節(jié)點(diǎn)會(huì)自動(dòng)釋放,避免死鎖
                                    break;
                                }
                                wait(millisToWait);
                            }
                            else{
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e ){
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }catch ( Exception e ){
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }finally{
            if ( doDelete ){ //超時(shí)or報(bào)錯(cuò)了,會(huì)將節(jié)點(diǎn)刪除
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

由此一來(lái),整個(gè)加鎖的邏輯就比較清晰了。

解鎖:

解鎖部分的代碼基本類(lèi)似。源代碼如下:

    public void release() throws Exception{
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null ){
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 ){
            return;
        }
        if ( newLockCount < 0 ){
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try{
            internals.releaseLock(lockData.lockPath);
        }finally{
            threadData.remove(currentThread);
        }
    }

首先是會(huì)根據(jù)當(dāng)前線程從記錄表中去獲取其對(duì)應(yīng)的鎖信息,如果鎖信息不存在,拋出異常。

如果鎖信息存在,首先判斷其是否重入了,如果是重入鎖,則計(jì)數(shù)-1。

否則的話,執(zhí)行釋放鎖的操作,這里就是先刪除節(jié)點(diǎn)下對(duì)應(yīng)的所有觀察者,然后將臨時(shí)節(jié)點(diǎn)刪除點(diǎn),完成鎖的釋放。

    final void releaseLock(String lockPath) throws Exception
    {
        client.removeWatchers(); // 移除觀察者
        revocable.set(null); 
        deleteOurPath(lockPath); // 刪除對(duì)應(yīng)路徑的鎖
    }

由此,整個(gè)加鎖解鎖的流程就全部解析完啦~

優(yōu)劣性分析

優(yōu)點(diǎn):

  1. ZK現(xiàn)成的框架支持相對(duì)完善,使用起來(lái)較為方便,而且支持了超時(shí)刪除鎖的機(jī)制,避免了可能出現(xiàn)的死鎖。
  2. curatorFramework本質(zhì)是一種按照創(chuàng)建順序排隊(duì)的實(shí)現(xiàn)。這種方案效率高,避免了“驚群”效應(yīng),當(dāng)鎖釋放時(shí)只有一個(gè)客戶端會(huì)被喚醒。
  3. ZK天生設(shè)計(jì)就是分布式協(xié)調(diào),強(qiáng)一致性。鎖的模型健壯、簡(jiǎn)單易用、適合做分布式鎖。
  4. ZK實(shí)現(xiàn)分布式鎖時(shí),如果節(jié)點(diǎn)獲取不到鎖,只需添加監(jiān)聽(tīng)器即可,不用一直輪詢(xún),性能消耗較小。

缺點(diǎn):

  1. ZK為了保持高一致性,會(huì)導(dǎo)致在集群leader掛掉的情況下,重新選舉的算法相對(duì)耗時(shí)較久,因此可能導(dǎo)致在較長(zhǎng)的一段時(shí)間內(nèi),加鎖、解鎖的邏輯是不可用的。
  2. 如果有較多的客戶端頻繁的申請(qǐng)加鎖、釋放鎖,對(duì)于zk集群壓力較大。

參考文獻(xiàn)

分布式鎖之Zk(zookeeper)實(shí)現(xiàn)

你還在使用復(fù)雜的 zkclient 開(kāi)發(fā) zookeeper 么?是時(shí)候用 Curator 了 !

肝一下ZooKeeper實(shí)現(xiàn)分布式鎖的方案,附帶實(shí)例!

七張圖徹底講清楚ZooKeeper分布式鎖的實(shí)現(xiàn)原理【石杉的架構(gòu)筆記】

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容