寫這篇文章的目的主要是為了記錄下自己在zookeeper 鎖上踩過(guò)的坑,以及踩坑之后自己的一點(diǎn)認(rèn)識(shí);
從zk分布式鎖原理說(shuō)起,原理很簡(jiǎn)單,大家也應(yīng)該都知道,簡(jiǎn)單的說(shuō)就是zookeeper實(shí)現(xiàn)分布式鎖是通過(guò)在zk集群上的路徑實(shí)現(xiàn)的,在獲取分布式鎖的時(shí)候在zk服務(wù)器集群節(jié)點(diǎn)上創(chuàng)建臨時(shí)順序節(jié)點(diǎn),釋放鎖的時(shí)候刪除該臨時(shí)節(jié)點(diǎn).
多么簡(jiǎn)單的一句話,但是當(dāng)你實(shí)現(xiàn)起來(lái),想去做點(diǎn)優(yōu)化的時(shí)候往往會(huì)變得很難,難的我們后續(xù)說(shuō);
再?gòu)男枨笳f(shuō)起,需求就是加鎖,但是由于原來(lái)吞吐量不是很大,只是配置了一個(gè)固定的鎖路徑,但是卻不是每次都會(huì)去根據(jù)這個(gè)鎖路徑創(chuàng)建鎖,而是將這個(gè)鎖路徑存放在一個(gè)本地的HashMap中,這樣的話,我就沒(méi)有必要每次都去重復(fù)的創(chuàng)建這個(gè)鎖對(duì)象,簡(jiǎn)單高效的利用;
變更后的需求是這樣的,為了降低鎖的力度,每次我要?jiǎng)討B(tài)的生成一個(gè)path去zk上進(jìn)行創(chuàng)建,然后再根據(jù)這個(gè)path生成鎖對(duì)象,但是,一開(kāi)始我依舊是沿用老的思維,想避免重復(fù)創(chuàng)建這個(gè)path的鎖對(duì)象,于是,我想弄個(gè)三方緩存來(lái)存儲(chǔ)這個(gè)鎖對(duì)象,這時(shí)候坑就來(lái)了;
接下來(lái),我們開(kāi)始分析我的踩坑之旅:
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
...
}
這是curator里面重入鎖對(duì)象的結(jié)構(gòu),InterProcessLock這個(gè)是curator通用的鎖接口,定義的跟jdk本身的也差不多,也是curator留給開(kāi)發(fā)者自己去定制實(shí)現(xiàn)符合自己業(yè)務(wù)需求的鎖對(duì)象的;Revocable接口是用來(lái)執(zhí)行取消動(dòng)作時(shí)觸發(fā)動(dòng)作用到的,如果你自定義鎖對(duì)象的時(shí)候在釋放鎖對(duì)象時(shí)想觸發(fā)一些動(dòng)作,你可以實(shí)現(xiàn)它的方法,以上便是InterProcessLock結(jié)構(gòu)的介紹;
看到這個(gè)代碼結(jié)構(gòu)我們還看出什么東西沒(méi)?它并沒(méi)有實(shí)現(xiàn)Serializable,導(dǎo)致其無(wú)法被序列化,也就是上面我自己想改進(jìn)我業(yè)務(wù)中鎖的場(chǎng)景就不支持了,因?yàn)轭愃朴趓edis這種緩存,沒(méi)法去存放一個(gè)對(duì)象,它頂多支持字符串以及byte[],所以我的想法就被loss掉了;
即使與業(yè)務(wù)無(wú)關(guān)了,但是我們作為可愛(ài)的程序員還是有必要去研究一下這個(gè)玩意的內(nèi)部實(shí)現(xiàn),因?yàn)槲覀儾恢老麓挝覀冞€會(huì)遇到什么場(chǎng)景,所以有必要讓自己刻骨銘心一次;
接下來(lái),我們看其內(nèi)部實(shí)現(xiàn),也就是我們高大上的源碼之旅:
private final LockInternals internals;
private final String basePath;
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
internals:這個(gè)是所有申請(qǐng)鎖與釋放鎖的核心實(shí)現(xiàn),待會(huì)我們?cè)賮?lái)講內(nèi)部實(shí)現(xiàn);
basePath:鎖定的路徑;
threadData:內(nèi)部緩存鎖的容器;
實(shí)現(xiàn)流程主要是這樣的:每次初始化InterProcessMutex對(duì)象的時(shí)候都會(huì)初始化一個(gè)StandardLockInternalsDriver對(duì)象,這個(gè)對(duì)象我們后面再講它的使用,同時(shí)也會(huì)初始化一個(gè)LockInternals對(duì)象,
接下來(lái),我們來(lái)看獲取鎖的代碼:
public void acquire() throws Exception{
if ( !internalLock(-1, null) ) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
邏輯如下:
每次獲取鎖時(shí)會(huì)直接從本地緩存中先獲取鎖的元數(shù)據(jù),如果存在,則在原有的計(jì)數(shù)器基礎(chǔ)上+1,直接返回;
否則,嘗試去獲取鎖,邏輯如下,
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
//等待時(shí)間
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
首先設(shè)置一個(gè)是否有鎖的標(biāo)志hasTheLock = false,然后
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);這個(gè)地方主要是通過(guò)StandardLockInternalsDriver在鎖目錄下創(chuàng)建EPHEMERAL_SEQUENTIAL節(jié)點(diǎn),
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);這里主要是循環(huán)獲取鎖的過(guò)程,代碼看下面,首先是判斷是否實(shí)現(xiàn)了revocable接口,如果實(shí)現(xiàn)了那么就對(duì)這個(gè)path設(shè)置監(jiān)聽(tīng),否則的話通過(guò)StandardLockInternalsDriver嘗試得到PredicateResults(主要是否得到鎖及需要監(jiān)視的目錄的兩個(gè)屬性);
private boolean internalLockLoop(long startMillis,Long millisToWait, String ourPath) throws Exception{
boolean haveTheLock = false;
boolean doDelete = false;
try{
if ( revocable.get() != null ){ client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ){
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
然后判斷PredicateResults中的pathToWatch(主要保存sequenceNode)是否是最小的節(jié)點(diǎn),如果是,則得到鎖,getsTheLock為true,否則得到該序列的前一個(gè)節(jié)點(diǎn),設(shè)為pathToWatch,并監(jiān)控起來(lái);再判斷獲取鎖的時(shí)間是否超時(shí),超時(shí)則刪除節(jié)點(diǎn),不競(jìng)爭(zhēng)下次鎖,否則,睡眠等待獲取鎖;最后把獲取的鎖對(duì)象的鎖路徑等信息封裝成LockData存儲(chǔ)在本地緩存中.
獲取鎖的邏輯主要就是這些,有興趣的同學(xué)可以打斷點(diǎn)跟蹤學(xué)習(xí)下,
下面是釋放鎖的過(guò)程;
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{
internals.releaseLock(lockData.lockPath);
}
finally
{
threadData.remove(currentThread);
}
}
代碼很簡(jiǎn)單,從本地緩存中拿到鎖對(duì)象,計(jì)數(shù)器-1,只有到那個(gè)計(jì)數(shù)器=0的時(shí)候才會(huì)去執(zhí)internals.releaseLock(lockData.lockPath);
final void releaseLock(String lockPath) throws Exception
{
client.removeWatchers();
revocable.set(null);
deleteOurPath(lockPath);
}
只要邏輯見(jiàn)名知意,首先移除watcher監(jiān)聽(tīng),這個(gè)監(jiān)聽(tīng)可能是在循環(huán)獲取鎖的時(shí)候創(chuàng)建的,然后取消動(dòng)作時(shí)觸發(fā)動(dòng)作時(shí)間置空,最后就是刪除path;
最后做個(gè)小總結(jié)吧
- curator的InterProcessLock接口提供了多種鎖機(jī)制,互斥鎖,讀寫鎖,以及可定時(shí)數(shù)的互斥鎖的機(jī)制(這個(gè)大家具體問(wèn)題具體分析).
- 所有申請(qǐng)鎖都會(huì)創(chuàng)建臨時(shí)順序節(jié)點(diǎn),保證了都能夠有機(jī)會(huì)去獲取鎖.
- 內(nèi)部用了線程的wait()和notifyAll()這種等待機(jī)制,可以及時(shí)的喚醒最渴望得到鎖的線程.避免常規(guī)利用Thread.sleep()這種無(wú)用的間隔等待機(jī)制.
- 利用redis做鎖的時(shí)候,一般都需要做鎖的有效時(shí)間限定。而curator則利用了zookeeper的臨時(shí)順序節(jié)點(diǎn)特性,一旦客戶端失去連接后,則就會(huì)自動(dòng)清除該節(jié)點(diǎn).
天色已晚,寫到這里,后續(xù)有新的認(rèn)識(shí)待補(bǔ)充;