Zookeeper客戶端Curator實(shí)現(xiàn)分布式鎖及源碼分析

API說明

InterProcessMutex有兩個構(gòu)造方法

public InterProcessMutex(CuratorFramework client, String path) {
    this(client, path, new StandardLockInternalsDriver());
}

public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
    this(client, path, LOCK_NAME, 1, driver);
}

參數(shù)說明

參數(shù) 說明
client curator中zk客戶端對象
path 搶鎖路徑,同一個鎖path需一致
driver 可自定義lock驅(qū)動實(shí)現(xiàn)分布式鎖

主要方法

//獲取鎖,若失敗則阻塞等待直到成功,支持重入
public void acquire() throws Exception
//超時獲取鎖,超時失敗
public boolean acquire(long time, TimeUnit unit) throws Exception
//釋放鎖
public void release() throws Exception

注意:調(diào)用acquire()方法后需相應(yīng)調(diào)用release()來釋放鎖

使用簡介

下面的例子模擬了100個線程同時搶鎖,搶鎖成功的線程睡眠1秒鐘后釋放鎖,通知其他等待的線程重新?lián)屾i,比較簡單,不多說

public class InterprocessLock {
    static CountDownLatch countDownLatch = new CountDownLatch(10);

    public static void main(String[] args)  {
        CuratorFramework zkClient = getZkClient();
        String lockPath = "/lock";
        InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
        //模擬100個線程搶鎖
        for (int i = 0; i < 100; i++) {
            new Thread(new TestThread(i, lock)).start();
        }
    }

    static class TestThread implements Runnable {
        private Integer threadFlag;
        private InterProcessMutex lock;

        public TestThread(Integer threadFlag, InterProcessMutex lock) {
            this.threadFlag = threadFlag;
            this.lock = lock;
        }

        @Override
        public void run() {
            try {
                lock.acquire();
                System.out.println("第"+threadFlag+"線程獲取到了鎖");
                //等到1秒后釋放鎖
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static CuratorFramework getZkClient() {
        String zkServerAddress = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
                .connectString(zkServerAddress)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        zkClient.start();
        return zkClient;
    }
}

源碼分析

從獲取鎖acquire()方法入手

public void acquire() throws Exception {
    if ( !internalLock(-1, null) ) {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

看到調(diào)用了internalLock方法,進(jìn)到internalLock方法中

private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread currentThread = Thread.currentThread();
        //先判斷當(dāng)前線程是否持有了鎖,如果是,則加鎖次數(shù)count+1,返回成功
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //調(diào)用LockInternals的attemptLock()方法進(jìn)行加鎖
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        //加鎖成功,則將當(dāng)前線程對應(yīng)加鎖數(shù)據(jù)加到map中
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

進(jìn)到LockInternals的attemptLock()中,看下代碼

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        //開始時間,后面用做超時判斷
        final long      startMillis = System.currentTimeMillis();
        //超時時間,轉(zhuǎn)換為毫秒
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        //節(jié)點(diǎn)數(shù)據(jù)
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        //重試次數(shù)
        int             retryCount = 0;
        //lockPath
        String          ourPath = null;
        //是否持有鎖
        boolean         hasTheLock = false;
        //是否處理完成
        boolean         isDone = false;
        //循環(huán)處理
        while ( !isDone )
        {
            isDone = true;

            try
            {
                //在path下創(chuàng)建一個EPHEMERAL_SEQUENTIAL(臨時順序型)類型節(jié)點(diǎn)
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                //搶鎖并判斷是否擁有鎖
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // 重試范圍內(nèi)時進(jìn)行重試
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }

        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }

創(chuàng)建臨時有序節(jié)點(diǎn)createsTheLock方法如下,比較簡單

public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

判斷是否擁有鎖的方法internalLockLoop才是核心,下面注意了

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
            //自旋
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                //獲取path下對應(yīng)臨時有序節(jié)點(diǎn),并按節(jié)點(diǎn)編號從小到大排序
                List<String>        children = getSortedChildren();
                //獲取當(dāng)前線程創(chuàng)建的臨時節(jié)點(diǎn)名稱
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                //判斷當(dāng)前節(jié)點(diǎn)編號是否<maxLease,若是,則搶到了鎖,maxLease這里為1,所以只有index為0時才搶到鎖,標(biāo)識只有1個線程能搶到鎖
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    //前一個節(jié)點(diǎn)編號較小的節(jié)點(diǎn)的路徑
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                             //如果沒搶到鎖,監(jiān)聽前一個節(jié)點(diǎn)事件
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                判斷是否超時
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    //超時 直接退出,并標(biāo)記 刪除節(jié)點(diǎn)doDelete標(biāo)記=true
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                //調(diào)用Object.wait(),等待線程被notify喚醒
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            //如果標(biāo)記了刪除,刪除節(jié)點(diǎn)數(shù)據(jù)
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

可以看到邏輯比較清晰,N個線程同時在path下創(chuàng)建臨時順序節(jié)點(diǎn),編號最小的獲取鎖,沒搶到鎖的會調(diào)用wait()方法等待被喚醒
那么是在哪里調(diào)用了notify()方法來喚醒其他節(jié)點(diǎn)的呢?
答案是在監(jiān)聽器wacher里,該監(jiān)聽器會在前一個(節(jié)點(diǎn)編號較小)的節(jié)點(diǎn)被刪除后觸發(fā)
先分析下釋放鎖的方法release
看下源碼

public void release() throws Exception
    {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */

        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        //如果鎖被當(dāng)前線程獲取了超過1次,將count-1,直接返回
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {
            //釋放鎖
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {
            threadData.remove(currentThread);
        }
    }

最終調(diào)用releaseLock方法中的deleteOurPath中

void releaseLock(String lockPath) throws Exception
    {
        revocable.set(null);
        deleteOurPath(lockPath);
    }
    
    private void deleteOurPath(String ourPath) throws Exception
    {
        try
        {
        //直接調(diào)用client刪除節(jié)點(diǎn)
            client.delete().guaranteed().forPath(ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // ignore - already deleted (possibly expired session, etc.)
        }
    }

節(jié)點(diǎn)被刪除后,會觸發(fā)搶鎖過程中的wather監(jiān)聽器,看下監(jiān)聽器中內(nèi)容

private final Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        notifyFromWatcher();
    }
};
private synchronized void notifyFromWatcher() {
        notifyAll();
}

可以看到節(jié)點(diǎn)path被刪除后,會通知后面一個節(jié)點(diǎn)進(jìn)行notify操作,notify操作后,重新進(jìn)入while自旋中,重新判斷是否搶到了鎖

最后看下getTheLock

public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases)
            throws Exception {
        // 之前創(chuàng)建的臨時順序節(jié)點(diǎn)在排序后的子節(jié)點(diǎn)列表中的索引
        int ourIndex =
                children.indexOf(sequenceNodeName);
        // 校驗(yàn)之前創(chuàng)建的臨時順序節(jié)點(diǎn)是否有效
        validateOurIndex(sequenceNodeName,
                ourIndex);
        // 鎖公平性的核心邏輯
        // 由 InterProcessMutex 的構(gòu)造函數(shù)可知, maxLeases 為 1,即只有 ourIndex 為 0 時,線程才能持有鎖,或者說該線程創(chuàng)建的臨時順序節(jié)點(diǎn)激活了鎖
        // Zookeeper 的臨時順序節(jié)點(diǎn)特性能保證跨多個 JVM 的線程并發(fā)創(chuàng)建節(jié)點(diǎn)時的順序性,越早創(chuàng)建臨時順序節(jié)點(diǎn)成功的線程會更早地激活鎖或獲得鎖
        boolean getsTheLock = ourIndex <
                maxLeases;
        // 如果已經(jīng)獲得了鎖,則無需監(jiān)聽任何節(jié)點(diǎn),否則需要監(jiān)聽上一順序節(jié)點(diǎn)(ourIndex - 1)
        // 因 為 鎖 是 公 平 的 , 因 此 無 需 監(jiān) 聽 除 了(ourIndex - 1)以外的所有節(jié)點(diǎn),這是為了減少羊群效應(yīng), 非常巧妙的設(shè)計?。?        String pathToWatch = getsTheLock ? null :
                children.get(ourIndex - maxLeases);
        // 返回獲取鎖的結(jié)果,交由上層繼續(xù)處理(添加監(jiān)聽等操作)
        return new PredicateResults(pathToWatch,
                getsTheLock);
    }

    static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException {
        if (ourIndex < 0) {
            // 容錯處理,可跳過
            // 由于會話過期或連接丟失等原因,該線程創(chuàng)建的臨時順序節(jié)點(diǎn)被 Zookeeper 服務(wù)端刪除,往外拋出 NoNodeException
            // 如果在重試策略允許范圍內(nèi),則進(jìn)行重新嘗試獲取鎖,這會重新重新生成臨時順序節(jié)點(diǎn)
            // 佩服 Curator 的作者將邊界條件考慮得 如此周到!
            throw new KeeperException.NoNodeException("Sequential path  not found:" + sequenceNodeName);
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容