[zookeeper 系列] curator 分布式鎖InterProcessMutex

寫這篇文章的目的主要是為了記錄下自己在zookeeper 鎖上踩過(guò)的坑,以及踩坑之后自己的一點(diǎn)認(rèn)識(shí);


從zk分布式鎖原理說(shuō)起,原理很簡(jiǎn)單,大家也應(yīng)該都知道,簡(jiǎn)單的說(shuō)就是zookeeper實(shí)現(xiàn)分布式鎖是通過(guò)在zk集群上的路徑實(shí)現(xiàn)的,在獲取分布式鎖的時(shí)候在zk服務(wù)器集群節(jié)點(diǎn)上創(chuàng)建臨時(shí)順序節(jié)點(diǎn),釋放鎖的時(shí)候刪除該臨時(shí)節(jié)點(diǎn).
多么簡(jiǎn)單的一句話,但是當(dāng)你實(shí)現(xiàn)起來(lái),想去做點(diǎn)優(yōu)化的時(shí)候往往會(huì)變得很難,難的我們后續(xù)說(shuō);


再?gòu)男枨笳f(shuō)起,需求就是加鎖,但是由于原來(lái)吞吐量不是很大,只是配置了一個(gè)固定的鎖路徑,但是卻不是每次都會(huì)去根據(jù)這個(gè)鎖路徑創(chuàng)建鎖,而是將這個(gè)鎖路徑存放在一個(gè)本地的HashMap中,這樣的話,我就沒(méi)有必要每次都去重復(fù)的創(chuàng)建這個(gè)鎖對(duì)象,簡(jiǎn)單高效的利用;


變更后的需求是這樣的,為了降低鎖的力度,每次我要?jiǎng)討B(tài)的生成一個(gè)path去zk上進(jìn)行創(chuàng)建,然后再根據(jù)這個(gè)path生成鎖對(duì)象,但是,一開(kāi)始我依舊是沿用老的思維,想避免重復(fù)創(chuàng)建這個(gè)path的鎖對(duì)象,于是,我想弄個(gè)三方緩存來(lái)存儲(chǔ)這個(gè)鎖對(duì)象,這時(shí)候坑就來(lái)了;

接下來(lái),我們開(kāi)始分析我的踩坑之旅:

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
...
}

這是curator里面重入鎖對(duì)象的結(jié)構(gòu),InterProcessLock這個(gè)是curator通用的鎖接口,定義的跟jdk本身的也差不多,也是curator留給開(kāi)發(fā)者自己去定制實(shí)現(xiàn)符合自己業(yè)務(wù)需求的鎖對(duì)象的;Revocable接口是用來(lái)執(zhí)行取消動(dòng)作時(shí)觸發(fā)動(dòng)作用到的,如果你自定義鎖對(duì)象的時(shí)候在釋放鎖對(duì)象時(shí)想觸發(fā)一些動(dòng)作,你可以實(shí)現(xiàn)它的方法,以上便是InterProcessLock結(jié)構(gòu)的介紹;

看到這個(gè)代碼結(jié)構(gòu)我們還看出什么東西沒(méi)?它并沒(méi)有實(shí)現(xiàn)Serializable,導(dǎo)致其無(wú)法被序列化,也就是上面我自己想改進(jìn)我業(yè)務(wù)中鎖的場(chǎng)景就不支持了,因?yàn)轭愃朴趓edis這種緩存,沒(méi)法去存放一個(gè)對(duì)象,它頂多支持字符串以及byte[],所以我的想法就被loss掉了;

即使與業(yè)務(wù)無(wú)關(guān)了,但是我們作為可愛(ài)的程序員還是有必要去研究一下這個(gè)玩意的內(nèi)部實(shí)現(xiàn),因?yàn)槲覀儾恢老麓挝覀冞€會(huì)遇到什么場(chǎng)景,所以有必要讓自己刻骨銘心一次;

接下來(lái),我們看其內(nèi)部實(shí)現(xiàn),也就是我們高大上的源碼之旅:

    private final LockInternals internals;
    private final String basePath;

    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

internals:這個(gè)是所有申請(qǐng)鎖與釋放鎖的核心實(shí)現(xiàn),待會(huì)我們?cè)賮?lái)講內(nèi)部實(shí)現(xiàn);
basePath:鎖定的路徑;
threadData:內(nèi)部緩存鎖的容器;

實(shí)現(xiàn)流程主要是這樣的:每次初始化InterProcessMutex對(duì)象的時(shí)候都會(huì)初始化一個(gè)StandardLockInternalsDriver對(duì)象,這個(gè)對(duì)象我們后面再講它的使用,同時(shí)也會(huì)初始化一個(gè)LockInternals對(duì)象,

接下來(lái),我們來(lái)看獲取鎖的代碼:

public void acquire() throws Exception{
        if ( !internalLock(-1, null) ) {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
private boolean internalLock(long time, TimeUnit unit) throws Exception {
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

邏輯如下:
每次獲取鎖時(shí)會(huì)直接從本地緩存中先獲取鎖的元數(shù)據(jù),如果存在,則在原有的計(jì)數(shù)器基礎(chǔ)上+1,直接返回;
否則,嘗試去獲取鎖,邏輯如下,

 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
         
        final long  startMillis = System.currentTimeMillis();
         //等待時(shí)間
        final Long   millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;

            try
            {
              
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }

        if ( hasTheLock )
        {
            return ourPath;
        }

        return null;
    }

首先設(shè)置一個(gè)是否有鎖的標(biāo)志hasTheLock = false,然后
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);這個(gè)地方主要是通過(guò)StandardLockInternalsDriver在鎖目錄下創(chuàng)建EPHEMERAL_SEQUENTIAL節(jié)點(diǎn),
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);這里主要是循環(huán)獲取鎖的過(guò)程,代碼看下面,首先是判斷是否實(shí)現(xiàn)了revocable接口,如果實(shí)現(xiàn)了那么就對(duì)這個(gè)path設(shè)置監(jiān)聽(tīng),否則的話通過(guò)StandardLockInternalsDriver嘗試得到PredicateResults(主要是否得到鎖及需要監(jiān)視的目錄的兩個(gè)屬性);

private boolean internalLockLoop(long startMillis,Long millisToWait, String ourPath) throws Exception{
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try{
            if ( revocable.get() != null ){   client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ){
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

然后判斷PredicateResults中的pathToWatch(主要保存sequenceNode)是否是最小的節(jié)點(diǎn),如果是,則得到鎖,getsTheLock為true,否則得到該序列的前一個(gè)節(jié)點(diǎn),設(shè)為pathToWatch,并監(jiān)控起來(lái);再判斷獲取鎖的時(shí)間是否超時(shí),超時(shí)則刪除節(jié)點(diǎn),不競(jìng)爭(zhēng)下次鎖,否則,睡眠等待獲取鎖;最后把獲取的鎖對(duì)象的鎖路徑等信息封裝成LockData存儲(chǔ)在本地緩存中.

獲取鎖的邏輯主要就是這些,有興趣的同學(xué)可以打斷點(diǎn)跟蹤學(xué)習(xí)下,


下面是釋放鎖的過(guò)程;

public void release() throws Exception
    {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */

        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }

        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {
            threadData.remove(currentThread);
        }
    }

代碼很簡(jiǎn)單,從本地緩存中拿到鎖對(duì)象,計(jì)數(shù)器-1,只有到那個(gè)計(jì)數(shù)器=0的時(shí)候才會(huì)去執(zhí)internals.releaseLock(lockData.lockPath);

final void releaseLock(String lockPath) throws Exception
    {
        client.removeWatchers();
        revocable.set(null);
        deleteOurPath(lockPath);
    }

只要邏輯見(jiàn)名知意,首先移除watcher監(jiān)聽(tīng),這個(gè)監(jiān)聽(tīng)可能是在循環(huán)獲取鎖的時(shí)候創(chuàng)建的,然后取消動(dòng)作時(shí)觸發(fā)動(dòng)作時(shí)間置空,最后就是刪除path;

最后做個(gè)小總結(jié)吧

    1. curator的InterProcessLock接口提供了多種鎖機(jī)制,互斥鎖,讀寫鎖,以及可定時(shí)數(shù)的互斥鎖的機(jī)制(這個(gè)大家具體問(wèn)題具體分析).
    1. 所有申請(qǐng)鎖都會(huì)創(chuàng)建臨時(shí)順序節(jié)點(diǎn),保證了都能夠有機(jī)會(huì)去獲取鎖.
    1. 內(nèi)部用了線程的wait()和notifyAll()這種等待機(jī)制,可以及時(shí)的喚醒最渴望得到鎖的線程.避免常規(guī)利用Thread.sleep()這種無(wú)用的間隔等待機(jī)制.
    1. 利用redis做鎖的時(shí)候,一般都需要做鎖的有效時(shí)間限定。而curator則利用了zookeeper的臨時(shí)順序節(jié)點(diǎn)特性,一旦客戶端失去連接后,則就會(huì)自動(dòng)清除該節(jié)點(diǎn).

天色已晚,寫到這里,后續(xù)有新的認(rèn)識(shí)待補(bǔ)充;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容