Zookeeper實現(xiàn)分布式鎖

利用Zookeeper來實現(xiàn)分布式鎖,主要基于其臨時(或臨時有序)節(jié)點和watch機(jī)制.

為什么是臨時節(jié)點?
臨時節(jié)點的特性,在連接斷開的情況下節(jié)點能被刪除,所以即使客戶端發(fā)生故障鎖也能被釋放,避免死鎖的發(fā)生.

為什么是有序節(jié)點?
當(dāng)然不用有序節(jié)點也是可以實現(xiàn)的.每個客戶端嘗試創(chuàng)建同一個臨時節(jié)點,創(chuàng)建者獲得鎖,創(chuàng)建失敗的客戶端監(jiān)聽這個鎖節(jié)點.
但是當(dāng)客戶端太多的時候,會形成羊群效應(yīng),因為只有一個客戶端能獲取鎖,其他客戶端都因失敗而需要監(jiān)聽這個鎖節(jié)點的刪除事件,當(dāng)獲得鎖的客戶端完成業(yè)務(wù)后釋放鎖即刪除這個鎖節(jié)點時,zk要給所有監(jiān)視的客戶端發(fā)送通知,這樣大量的消息通知可能會造成ZK的阻塞.
在這種場景下,更優(yōu)化的方式是使用有序節(jié)點.
每個未獲得鎖的客戶端只需要監(jiān)聽排在他前面的那個節(jié)點,每次節(jié)點刪除也只需要通知一個客戶端即可.

Curator(Zookeeper的Java客戶端)就是用的臨時有序節(jié)點和watch機(jī)制來實現(xiàn)分布式鎖的.
步驟如下(這里本來想畫一張圖的,但畫圖能力有限,還沒有畫出滿意的圖來)

  1. 每個客戶端基于節(jié)點/mylock創(chuàng)建臨時有序子節(jié)點/mylock/lock-,比如第一個創(chuàng)建的/mylock/lock-0000000000,第二個/mylock/lock-0000000001......
  2. 客戶端獲取/mylock節(jié)點的子節(jié)點列表并按升序排序,判斷自己創(chuàng)建的節(jié)點是否排在第一個.如果排在第一個則表示獲得鎖,否則監(jiān)聽前一個節(jié)點的刪除事件.
  3. 獲得鎖的客戶端進(jìn)行業(yè)務(wù)處理.完成后刪除子節(jié)點,釋放鎖.監(jiān)聽該子節(jié)點的客戶端收到通知,嘗試獲取鎖.

針對上述步驟考慮幾個場景
場景1. 比如當(dāng)前獲得鎖的節(jié)點是/mylock/lock-0000000000,而節(jié)點/mylock/lock-0000000001還沒有對/mylock/lock-0000000000設(shè)置好監(jiān)聽事件的時候/mylock/lock-0000000000節(jié)點刪除了
/mylock/lock-0000000001對應(yīng)的客戶端對/mylock/lock-0000000000設(shè)置監(jiān)聽的時候,如果該節(jié)點刪除了會拋出一個NoNodeException異常;這個時候可以生吞這個異常重新嘗試獲取鎖.
場景2. 比如當(dāng)前獲得鎖的節(jié)點是/mylock/lock-0000000000,而節(jié)點/mylock/lock-0000000005對應(yīng)的客戶端突然宕機(jī)了,該節(jié)點被刪除;
這個時候創(chuàng)建/mylock/lock-0000000006節(jié)點的客戶端會收到節(jié)點刪除的通知,然后嘗試獲取鎖,發(fā)現(xiàn)自己獲取不到鎖,則監(jiān)聽/mylock/lock-0000000004子節(jié)點的刪除事件.

編碼實現(xiàn)分布式鎖

看到這才是實現(xiàn)分布式鎖的正確姿勢!這篇文章才知道原來Spring早就為我們提供了分布式鎖的實現(xiàn)了.不過其實也是用的Curator來實現(xiàn)的啦.看下依賴關(guān)系就知道啦.
重點看下類org.springframework.integration.zookeeper.lock.ZookeeperLockRegistry
Spring封裝之后使用也非常簡單,大概步驟就是這樣:

Lock lock = zookeeperLockRegistry.obtain("lock-xh");
//tryLock()zk內(nèi)部實現(xiàn)的是一個超時接口
if (lock.tryLock()) {
    //業(yè)務(wù)邏輯
    lock.unlock();
}

我這里寫了一個Demo,代碼在github上 sb-learn-distributedlock-zk
模擬2個線程爭搶這個鎖lock-xh,讓線程B先啟動,線程A休眠一段時間再啟動.然后可以得到線程B搶到了鎖.

通過zkCli.sh查看鎖節(jié)點,發(fā)現(xiàn)在/SpringIntegration-LockRegistry/lock-xh節(jié)點下創(chuàng)建了2個臨時有序節(jié)點.

[zk: 10.45.82.76(CONNECTED) 2] ls /SpringIntegration-LockRegistry/lock-xh
[_c_413a0764-4abe-4476-b241-a33f9a4af228-lock-0000000009, _c_11e2b7fb-ca26-4a4c-8832-b3d8e8b741de-lock-0000000008]

其實從這里也能看出來,他是利用了zk的臨時有序節(jié)點來實現(xiàn)的.兩個線程都到這個鎖節(jié)點下創(chuàng)建子節(jié)點.然后按照順序誰排前面誰就獲得了鎖.

注意
我開始編碼的時候,spring-boot-starter-parent用了2.2.0.BUILD-SNAPSHOT版本,其依賴的curator4.0.1版本,而我連的zk版本是3.4.11.所以我測試的時候報錯了:

Caused by: org.apache.zookeeper.KeeperException$UnimplementedException: KeeperErrorCode = Unimplemented for /SpringIntegration-LockRegistry/lock-xh/_c_48572564-d1d3-4134-9491-b359d756acc2-lock-
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:103)
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
    at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:1525)
    at org.apache.curator.framework.imps.CreateBuilderImpl$17.call(CreateBuilderImpl.java:1181)
    at org.apache.curator.framework.imps.CreateBuilderImpl$17.call(CreateBuilderImpl.java:1158)
    at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
    at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
    at org.apache.curator.framework.imps.CreateBuilderImpl.pathInForeground(CreateBuilderImpl.java:1155)
    at org.apache.curator.framework.imps.CreateBuilderImpl.protectedPathInForeground(CreateBuilderImpl.java:605)
    at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:595)
    at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:573)
    at org.apache.curator.framework.imps.CreateBuilderImpl.forPath(CreateBuilderImpl.java:49)
    at org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver.createsTheLock(StandardLockInternalsDriver.java:54)
    at org.apache.curator.framework.recipes.locks.LockInternals.attemptLock(LockInternals.java:225)
    at org.apache.curator.framework.recipes.locks.InterProcessMutex.internalLock(InterProcessMutex.java:237)
    at org.apache.curator.framework.recipes.locks.InterProcessMutex.acquire(InterProcessMutex.java:108)
    at org.springframework.integration.zookeeper.lock.ZookeeperLockRegistry$ZkLock.tryLock(ZookeeperLockRegistry.java:300)
    ... 3 more

是的!要注意服務(wù)端zk和客戶端Curator版本的兼容性,具體請看這里ZooKeeper Version Compatibility

關(guān)于獲取鎖的邏輯,重點看下Curatororg.apache.curator.framework.recipes.locks.LockInternals,我這里用的Curator版本4.0.1

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
    //省略其他代碼
    //創(chuàng)建臨時有序節(jié)點 如果父節(jié)點沒有也同時創(chuàng)建
    ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
    //阻塞直到獲得鎖,或者等待時間過了退出或者線程中斷退出
    hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
    //省略其他代碼
    }

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;

        try {
            if(this.revocable.get() != null) {
                ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
            }

            //不斷循環(huán)嘗試獲得鎖
            while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
                //獲取節(jié)點下的所有子節(jié)點并排序
                List<String> children = this.getSortedChildren();
                //ourPath如/SpringIntegration-LockRegistry/lock-xh/_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-0000000000
                //basePath如/SpringIntegration-LockRegistry/lock-xh
                //得到當(dāng)前線程創(chuàng)建的有序節(jié)點名稱 比如_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-0000000000
                String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
                //檢查當(dāng)前節(jié)點是否排在第一個,如果排在第一個則獲得鎖,如果沒有獲得鎖,則尋找需要監(jiān)視的節(jié)點(即有序節(jié)點列表中排在當(dāng)前節(jié)點前面的那個節(jié)點)
                PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
                //獲得鎖啦
                if(predicateResults.getsTheLock()) {
                    haveTheLock = true;
                } else {
                   //未獲得鎖,需要對前一個節(jié)點進(jìn)行監(jiān)視
                   //得到前一個有序節(jié)點的qu路徑
                    String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
                    synchronized(this) {
                        try {
                            //設(shè)置監(jiān)視器; 這里有一種場景,即設(shè)置監(jiān)視器的時候可能上一個節(jié)點已經(jīng)被刪除了.對于這種情況,會拋出NoNodeException異常;  
                            //下面直接生吞了這種異常.繼續(xù)循環(huán)嘗試獲得鎖. 
                            //這里使用getData()接口而不是checkExists()是因為,如果前一個子節(jié)點已經(jīng)被刪除了那么會拋出異常而且不會設(shè)置事件監(jiān)聽器,
                            //而checkExists雖然也可以獲取到節(jié)點是否存在的信息但是同時設(shè)置了監(jiān)聽器,這個監(jiān)聽器其實永遠(yuǎn)不會觸發(fā),對于zookeeper來說屬于資源泄露
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                            if(millisToWait == null) {
                                this.wait();
                            } else {
                                millisToWait = Long.valueOf(millisToWait.longValue() - (System.currentTimeMillis() - startMillis));
                                startMillis = System.currentTimeMillis();
                                if(millisToWait.longValue() > 0L) {
                                    this.wait(millisToWait.longValue());
                                } else {
                                    doDelete = true;
                                    break;
                                }
                            }
                        } catch (NoNodeException var19) {
                            ;
                        }
                    }
                }
            }
        } catch (Exception var21) {
            ThreadUtils.checkInterrupted(var21);
            doDelete = true;
            throw var21;
        } finally {
            if(doDelete) {
                this.deleteOurPath(ourPath);
            }

        }

        return haveTheLock;
    }

注意
上面關(guān)于鎖節(jié)點,可能會有點迷糊,為什么中間會有一串隨機(jī)數(shù)?
org.apache.curator.framework.imps.CreateBuilderImpl#adjustPath
比如/SpringIntegration-LockRegistry/lock-xh/_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-0000000000
是在/SpringIntegration-LockRegistry/lock-xh/lock-的基礎(chǔ)上調(diào)整而來的,分成路徑/SpringIntegration-LockRegistry/lock-xh和節(jié)點lock-
然后節(jié)點lock-前面拼接上"_c_" + protectedId + "-"
比如這里的protectedId=35957bd7-a9e9-4f6f-a9f3-c131b9c3734c
最后拼成的節(jié)點全路徑即:/SpringIntegration-LockRegistry/lock-xh/_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-
最后再創(chuàng)建有序節(jié)點的時候尾巴上補(bǔ)上了有序序列號0000000000

那么我們現(xiàn)在知道了這個節(jié)點的名稱的創(chuàng)建邏輯,那么既然這是串隨機(jī)數(shù),我們怎么能保證先創(chuàng)建的節(jié)點就能排在前面呢?
其實原因就在上面源碼中g(shù)etSortedChildren方法,里面排序的時候并不是按照整個節(jié)點名稱比如_c_35957bd7-a9e9-4f6f-a9f3-c131b9c3734c-lock-0000000000來排序的,而是按照后面的有序序列號比如0000000000來排序的!

//獲取節(jié)點下的所有子節(jié)點并排序
List<String> children = this.getSortedChildren();

關(guān)于zk實現(xiàn)分布式鎖的學(xué)習(xí)資料
7 張圖講清楚ZooKeeper分布式鎖實現(xiàn)原理
這才是實現(xiàn)分布式鎖的正確姿勢!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容