Zookeeper源碼分析1-分布式鎖

分布式鎖的基本場景

如果在多線程并行情況下去訪問某一個共享資源,比如說共享變量,那么勢必會造成線程安全問題。那么我們可以用很多種方法來解決,比如 synchronized、 比如 Lock 之類的鎖操作來解決線程安全問題,那么在分布式架構(gòu)下,涉及到多個進程訪問某一個共享資源的情況,比如說在電商平臺中商品庫存問題,在庫存只有 10 個的情況下進來100 個用戶,如何能夠避免超賣呢?所以這個時候我們需要一些互斥手段來防止彼此之間的干擾。然后在分布式情況下,synchronized 或者 Lock 之類的鎖只能控制單一進程的資源訪問,在多進程架構(gòu)下,這些 api就沒辦法解決我們的問題了。怎么辦呢?

用 zookeeper 來實現(xiàn)分布式鎖

我們可以利用 zookeeper 節(jié)點的特性來實現(xiàn)獨占鎖,就是同級節(jié)點的唯一性,多個進程往 zookeeper 的指定節(jié)點下創(chuàng)建一個相同名稱的節(jié)點,只有一個能成功,另外一個是創(chuàng)建失敗;
創(chuàng)建失敗的節(jié)點全部通過 zookeeper 的 watcher 機制來監(jiān)聽 zookeeper 這個子節(jié)點的變化,一旦監(jiān)聽到子節(jié)點的刪除事件,則再次觸發(fā)所有進程去寫鎖;


image.png

這種實現(xiàn)方式很簡單,但是會產(chǎn)生“驚群效應(yīng)”,簡單來說就是如果存在許多的客戶端在等待獲取鎖,當(dāng)成功獲取到鎖的進程釋放該節(jié)點后,所有處于等待狀態(tài)的客戶端都會被喚醒,這個時候 zookeeper 在短時間內(nèi)發(fā)送大量子節(jié)點變更事件給所有待獲取鎖的客戶端,然后實際情況是只會有一個客戶端獲得鎖。如果在集群規(guī)模比較大的情況下,會對 zookeeper 服務(wù)器的性能產(chǎn)生比較的影響。

利用有序節(jié)點來實現(xiàn)分布式鎖

我們可以通過有序節(jié)點來實現(xiàn)分布式鎖,每個客戶端都往指定的節(jié)點下注冊一個臨時有序節(jié)點,越早創(chuàng)建的節(jié)點,節(jié)點的順序編號就越小,那么我們可以判斷子節(jié)點中最小的節(jié)點設(shè)置為獲得鎖。
如果自己的節(jié)點不是所有子節(jié)點中最小的,意味著還沒有獲得鎖。這個的實現(xiàn)和前面單節(jié)點實現(xiàn)的差異性在于,每個節(jié)點只需要監(jiān)聽比自己小的節(jié)點,當(dāng)比自己小的節(jié)點刪除以后,客戶端會收到 watcher 事件,此時再次判斷自己的節(jié)點是不是所有子節(jié)點中最小的,如果是則獲得鎖,否則就不斷重復(fù)這個過程,這樣就不會導(dǎo)致羊群效應(yīng),因為每個客戶端只需要監(jiān)控一個節(jié)點。

curator 分布式鎖的基本使用

curator 對于鎖這塊做了一些封裝,curator 提供了InterProcessMutex 這樣一個 api。除了分布式鎖之外,還提供了 leader 選舉、分布式隊列等常用的功能。

  • InterProcessMutex:分布式可重入排它鎖
  • InterProcessSemaphoreMutex:分布式排它鎖
  • InterProcessReadWriteLock:分布式讀寫鎖

public class LockDemo {

    private static String CONNECTION_STR = "192.168.13.102:2181,192.168.13.103:2181,192.168.13.104:2181";

    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
                connectString(CONNECTION_STR).sessionTimeoutMs(5000).
                retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        curatorFramework.start();

        final InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/locks");

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "->嘗試競爭鎖");
                try {
                    lock.acquire(); //阻塞競爭鎖

                    System.out.println(Thread.currentThread().getName() + "->成功獲得了鎖");
                } catch (Exception e) {
                    e.printStackTrace();
                }
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        lock.release(); //釋放鎖
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "Thread-" + i).start();
        }
    }
}

Curator 實現(xiàn)分布式鎖的基本原理

我們先打開InterProcessMutex構(gòu)造函數(shù)看看源碼

   // 最常用
    public InterProcessMutex(CuratorFramework client,
                             String path){
        // Zookeeper 利用 path 創(chuàng)建臨時順序節(jié)點,實現(xiàn)公平鎖的核心
        this(client, path, new StandardLockInternalsDriver());
    }
    public InterProcessMutex(CuratorFramework client,
                             String path, LockInternalsDriver driver){
        // maxLeases=1,表示可以獲得分布式鎖的線程數(shù)量(跨 JVM)為 1,即為互斥鎖
        this(client, path, LOCK_NAME, 1, driver);
    }

    // protected 構(gòu)造函數(shù)
    InterProcessMutex(CuratorFramework client, String
            path, String lockName, int maxLeases,
                      LockInternalsDriver driver){
        basePath = PathUtils.validatePath(path);
        // internals 的類型為 LockInternals ,
        InterProcessMutex 將分布式鎖的申請和釋放操作委托給internals 執(zhí)行
        internals = new LockInternals(client, driver, path,
                lockName, maxLeases);
    }

再跟進InterProcessMutex.acquire方法

    // 無限等待
    public void acquire() throws Exception {
        if (!internalLock(-1, null)) {
            throw new IOException("Lost connection while trying to acquire lock:" + basePath);
        }
    }

    // 限時等待
    public boolean acquire(long time, TimeUnit unit)
            throws Exception {
        return internalLock(time, unit);
    }

InterProcessMutex.internalLock

 private boolean internalLock(long time, TimeUnit unit) throws Exception {
        Thread currentThread =
                Thread.currentThread();
        LockData lockData =
                threadData.get(currentThread);
        if (lockData != null) {
            // 實現(xiàn)可重入
            // 同一線程再次 acquire,首先判斷當(dāng)前的映射表內(nèi)(threadData)是否有該線程的鎖信息,如果有則原子 + 1,然后返回
            lockData.lockCount.incrementAndGet();
            return true;
        }
        // 映射表內(nèi)沒有對應(yīng)的鎖信息,嘗試通過LockInternals 獲取鎖
        String lockPath = internals.attemptLock(time,unit, getLockNodeBytes());
        if (lockPath != null) {
            // 成功獲取鎖,記錄信息到映射表
            LockData newLockData = new
                    LockData(currentThread, lockPath);
            threadData.put(currentThread,
                    newLockData);
            return true;
        }
        return false;
    }

    // 映射表
    // 記錄線程與鎖信息的映射關(guān)系
    private final ConcurrentMap<Thread, LockData>  threadData = Maps.newConcurrentMap();
    // 鎖信息
    // Zookeeper 中一個臨時順序節(jié)點對應(yīng)一個“鎖”,但讓鎖生效激活需要排隊(公平鎖),下面會繼續(xù)分析

    private static class LockData {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new
                AtomicInteger(1); // 分布式鎖重入次數(shù)

        private LockData(Thread owningThread,
                         String lockPath) {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

LockInternals.attemptLock

 // 嘗試獲取鎖,并返回鎖對應(yīng)的 Zookeeper 臨時順序節(jié)點的路徑
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        final long startMillis = System.currentTimeMillis();
        // 無限等待時,millisToWait 為 null
        final Long millisToWait = (unit != null) ?
                unit.toMillis(time) : null;
        // 創(chuàng)建 ZNode 節(jié)點時的數(shù)據(jù)內(nèi)容,無關(guān)緊要,這里為 null,采用默認值(IP 地址)
        final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        // 當(dāng)前已經(jīng)重試次數(shù),與CuratorFramework的重試策略有關(guān)
        int retryCount = 0;
        // 在 Zookeeper 中創(chuàng)建的臨時順序節(jié)點的路徑,相當(dāng)于一把待激活的分布式鎖
        // 激活條件:同級目錄子節(jié)點,名稱排序最?。ㄅ抨?,公平鎖),后續(xù)繼續(xù)分析
        String ourPath = null;
        // 是否已經(jīng)持有分布式鎖
        boolean hasTheLock = false;
        // 是否已經(jīng)完成嘗試獲取分布式鎖的操作
        boolean isDone = false;
        while (!isDone) {
            isDone = true;
            try {
                // 從 InterProcessMutex 的構(gòu)造函數(shù)可知實際 driver 為 StandardLockInternalsDriver 的實例
                // 在Zookeeper中創(chuàng)建臨時順序節(jié)點
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                // 循環(huán)等待來激活分布式鎖,實現(xiàn)鎖的公平性
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            } catch
            (KeeperException.NoNodeException e) {
                // 容錯處理,不影響主邏輯的理解,可跳過
                // 因 為 會 話 過 期 等 原 因 ,StandardLockInternalsDriver 因為無法找到創(chuàng)建的臨時 順序節(jié)點而拋出 NoNodeException 異常
                if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,
                        System.currentTimeMillis() -
                                startMillis, RetryLoop.getDefaultRetrySleeper())) {
                    // 滿足重試策略嘗試重新獲取鎖
                    isDone = false;
                } else {
                    // 不滿足重試策略則繼續(xù)拋出NoNodeException
                    throw e;
                }
            }
        }
        if (hasTheLock) {
            // 成功獲得分布式鎖,返回臨時順序節(jié)點的路徑,上層將其封裝成鎖信息記錄在映射表,方便鎖重入
            return ourPath;
        }
        // 獲取分布式鎖失敗,返回 null
        return null;
    }

createsTheLock

  // From StandardLockInternalsDriver
    // 在 Zookeeper 中創(chuàng)建臨時順序節(jié)點
    public String createsTheLock(CuratorFramework
                                         client, String path, byte[] lockNodeBytes) throws
            Exception {
        String ourPath;
        // lockNodeBytes 不為 null 則作為數(shù)據(jù)節(jié)點內(nèi)容,否則采用默認內(nèi)容(IP 地址)
        if (lockNodeBytes != null) {
            // 下面對 CuratorFramework 的一些細節(jié)做解釋,不影響對分布式鎖主邏輯的解釋,可跳過
            // creatingParentContainersIfNeeded:用于創(chuàng)建父節(jié)點,如果不支持 CreateMode.CONTAINER
            // 那么將采用 CreateMode.PERSISTENT
            // withProtection:臨時子節(jié)點會添加GUID前綴
            ourPath = client.create().creatingParentContainersIfNeeded()
                    //CreateMode.EPHEMERAL_SEQUENTIAL:臨時順序節(jié)點,Zookeeper 能保證在節(jié)點產(chǎn)生的順序性
                    // 依據(jù)順序來激活分布式鎖,從而也實現(xiàn)了分布式鎖的公平性,后續(xù)繼續(xù)分析
                    .withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        } else {
            ourPath =
                    client.create().creatingParentContainersIfNeeded()
                            .withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

LockInternals.internalLockLoop

 // 循環(huán)等待來激活分布式鎖,實現(xiàn)鎖的公平性
    private boolean internalLockLoop(long startMillis,
                                     Long millisToWait, String ourPath) throws Exception {
        // 是否已經(jīng)持有分布式鎖
        boolean haveTheLock = false;
        // 是否需要刪除子節(jié)點
        boolean doDelete = false;
        try {
            if (revocable.get() != null) {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
            while ((client.getState() ==
                    CuratorFrameworkState.STARTED) && !haveTheLock) {
                // 獲取排序后的子節(jié)點列表
                List<String> children = getSortedChildren();
                // 獲取前面自己創(chuàng)建的臨時順序子節(jié)點的名稱
                String sequenceNodeName = ourPath.substring(basePath.length() + 1);
                // 實現(xiàn)鎖的公平性的核心邏輯,看下面的分析
                PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if (predicateResults.getsTheLock()) {
                    // 獲得了鎖,中斷循環(huán),繼續(xù)返回上層
                    haveTheLock = true;
                } else {
                    // 沒有獲得到鎖,監(jiān)聽上一臨時順序節(jié)點
                    String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                    synchronized (this) {
                        try {
                            // exists()會導(dǎo)致導(dǎo)致資源泄漏,因此 exists () 可以監(jiān)聽不存在的 ZNode,因此采用 getData ()
                            // 上一臨時順序節(jié)點如果被刪除,會喚醒當(dāng)前線程繼續(xù)競爭鎖,正常情況下能直接獲得鎖,因為鎖是公平的

                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if (millisToWait != null) {
                                millisToWait -=
                                        (System.currentTimeMillis() - startMillis);
                                startMillis =
                                        System.currentTimeMillis();
                                if (millisToWait <=
                                        0) {
                                    doDelete =
                                            true; // 獲取鎖超時,標記刪除之前創(chuàng)建的臨時順序節(jié)點
                                    break;
                                }
                                wait(millisToWait);
                                // 等待被喚醒,限時等待
                            } else {
                                wait(); // 等待被喚醒,無限等待
                            }
                        } catch
                        (KeeperException.NoNodeException e) {
                            // 容錯處理,邏輯稍微有點繞,可跳過,不影響主邏輯的理解
                            // client.getData()可能調(diào)用時拋出 NoNodeException,原因可能是鎖被釋放或會話過期(連接丟失)等
                            // 這里并沒有做任何處理,因為外層是 while 循環(huán),再次執(zhí)行 driver.getsTheLock 時會調(diào)用 validateOurIndex
                            // 此 時 會 拋 出NoNodeException,從而進入下面的 catch 和 finally 邏輯,重新拋出上層嘗試重試獲取鎖并刪除臨時順序節(jié)點
                        }
                    }
                }
            }
        } catch (Exception e) {
            ThreadUtils.checkInterrupted(e);
            // 標記刪除,在 finally 刪除之前創(chuàng)建的臨時順序節(jié)點(后臺不斷嘗試)
            doDelete = true;
            // 重新拋出,嘗試重新獲取鎖
            throw e;
        } finally {
            if (doDelete) {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

getTheLock

// From StandardLockInternalsDriver
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases)
            throws Exception {
        // 之前創(chuàng)建的臨時順序節(jié)點在排序后的子節(jié)點列表中的索引
        int ourIndex =
                children.indexOf(sequenceNodeName);
        // 校驗之前創(chuàng)建的臨時順序節(jié)點是否有效
        validateOurIndex(sequenceNodeName,
                ourIndex);
        // 鎖公平性的核心邏輯
        // 由 InterProcessMutex 的構(gòu)造函數(shù)可知, maxLeases 為 1,即只有 ourIndex 為 0 時,線程才能持有鎖,或者說該線程創(chuàng)建的臨時順序節(jié)點激活了鎖
        // Zookeeper 的臨時順序節(jié)點特性能保證跨多個 JVM 的線程并發(fā)創(chuàng)建節(jié)點時的順序性,越早創(chuàng)建臨時順序節(jié)點成功的線程會更早地激活鎖或獲得鎖
        boolean getsTheLock = ourIndex <
                maxLeases;
        // 如果已經(jīng)獲得了鎖,則無需監(jiān)聽任何節(jié)點,否則需要監(jiān)聽上一順序節(jié)點(ourIndex - 1)
        // 因 為 鎖 是 公 平 的 , 因 此 無 需 監(jiān) 聽 除 了(ourIndex - 1)以外的所有節(jié)點,這是為了減少羊群效應(yīng), 非常巧妙的設(shè)計??!
        String pathToWatch = getsTheLock ? null :
                children.get(ourIndex - maxLeases);
        // 返回獲取鎖的結(jié)果,交由上層繼續(xù)處理(添加監(jiān)聽等操作)
        return new PredicateResults(pathToWatch,
                getsTheLock);
    }

    static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException {
        if (ourIndex < 0) {
            // 容錯處理,可跳過
            // 由于會話過期或連接丟失等原因,該線程創(chuàng)建的臨時順序節(jié)點被 Zookeeper 服務(wù)端刪除,往外拋出 NoNodeException
            // 如果在重試策略允許范圍內(nèi),則進行重新嘗試獲取鎖,這會重新重新生成臨時順序節(jié)點
            // 佩服 Curator 的作者將邊界條件考慮得 如此周到!
            throw new KeeperException.NoNodeException("Sequential path  not found:" + sequenceNodeName);
        }
    }

釋放鎖的邏輯

InterProcessMutex.release

 public void release() throws Exception {
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if (lockData == null) {
            // 無法從映射表中獲取鎖信息,不持有鎖
            throw new IllegalMonitorStateException("You do not own the lock:" + basePath);
        }
        int newLockCount = lockData.lockCount.decrementAndGet();
        if (newLockCount > 0) {
            // 鎖是可重入的,初始值為 1,原子-1 到0,鎖才釋放
            return;
        }
        if (newLockCount < 0) {
            // 理論上無法執(zhí)行該路徑
            throw new IllegalMonitorStateException("Lock count has gonenegative for lock:" + basePath);
        }
        try {
            // lockData != null && newLockCount == 0,釋放鎖資源
            internals.releaseLock(lockData.lockPath);
        } finally {
            // 最后從映射表中移除當(dāng)前線程的鎖信息
            threadData.remove(currentThread);
        }
    }

LockInternals.releaseLock

void releaseLock(String lockPath) throws Exception {
        revocable.set(null);
        // 刪除臨時順序節(jié)點,只會觸發(fā)后一順序節(jié)點去 獲取鎖,理論上不存在競爭,只排隊,非搶占,公平鎖,先到先得
        deleteOurPath(lockPath);
    }

    // Class:LockInternals
    private void deleteOurPath(String ourPath) throws Exception {
        try {
            // 后臺不斷嘗試刪除
            client.delete().guaranteed().forPath(ourPath);
        } catch (KeeperException.NoNodeException e) {
            // 已經(jīng)刪除(可能會話過期導(dǎo)致),不做處理
            // 實際使用 Curator-2.12.0 時,并不會拋出該異常
        }
    }
```s


  ——學(xué)自咕泡學(xué)院
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容