在如今服務(wù)器的開發(fā)與部署時(shí),往往考慮的不是單機(jī)服務(wù)的承載力了,而是更高一階,如何設(shè)計(jì)出高可用,高負(fù)載,高容量的服務(wù)架構(gòu)。并且業(yè)務(wù)開發(fā)不是簡(jiǎn)單的將所有業(yè)務(wù)糅合在單臺(tái)服務(wù)上,而是分模塊,分功能分而治理,才有了微服務(wù)架構(gòu)。然后微服務(wù)設(shè)計(jì)中的每一個(gè)模塊都是可以設(shè)計(jì)為一個(gè)集群,例如常用的,用戶模塊,權(quán)限模塊,商品模塊,訂單模塊,支付模塊,供應(yīng)鏈等等。服務(wù)器的設(shè)計(jì)越來越復(fù)雜,對(duì)于開發(fā)人員的技術(shù)能力提出更高要求。
先如今微服務(wù),分布式,集群的架構(gòu)思想中,zookeeper成為了大多首選并且非常重要的中間件,例如我們熟悉的Hadoop,dubbo中,都出現(xiàn)了zookeeper 。zookeeper有很多特性,例如有注冊(cè)訂閱,分布式鎖,隊(duì)列等等功能。但是zookeeper中提供的java api 并不是很友好,使用起來容易踩坑。例如創(chuàng)建path時(shí),需要判斷path的parent是否存在,必須先創(chuàng)建parent path才能創(chuàng)建子路徑。還有在添加watcher事件時(shí),一旦該事件觸發(fā)一次后,如果沒有主動(dòng)將事件重新設(shè)置,他不會(huì)收到第二次。還有其他一些不太友好的api開發(fā)就不在贅述。
所以才引入了curator工具,他實(shí)際是更高級(jí)的api,使用起來更加方便。但內(nèi)部核心也是使用zookeeper提供的api,只是在開發(fā)中不那么繁瑣而已。
舉個(gè)創(chuàng)建path例子:
public void createPath() {
String host = "127.0.0.1:2181";
String path = root + "/my_path";
CuratorFramework curator = CuratorClient.create(host);
try {
String last = curator.create().creatingParentsIfNeeded().forPath(path, "123".getBytes());
logger.info("創(chuàng)建路徑完成 " + last);
} catch (Exception e) {
e.printStackTrace();
logger.info("創(chuàng)建路徑失敗 異常類型:" + e.getClass().getName() + ", message:" + e.getMessage());
}
}
創(chuàng)建路徑,是不是很簡(jiǎn)單,不需要關(guān)心root是否已經(jīng)創(chuàng)建。curator自己會(huì)去做驗(yàn)證判斷是否需要?jiǎng)?chuàng)建root路徑。然而,我們?cè)偕A一下,既然zookeeper提供了很多特性,那么curator是否也能足夠支撐呢?在curator組件中,recipes模塊中,可以了解到很多有意思的地方:

可以看到他提供了很多功能,例如原子計(jì)算,柵欄,緩存,選舉,鎖,隊(duì)列等,提供了很豐富的功能。 那么我們來分析一下 curator如何利用zookeeper的特性,實(shí)現(xiàn)這些功能的。
首先需要了解一些基本嘗試,例如zookeeper中Watcher有哪些事件
public enum EventType {
None (-1),
NodeCreated (1),
NodeDeleted (2),
NodeDataChanged (3),
NodeChildrenChanged (4);
}
包含了節(jié)點(diǎn)創(chuàng)建,節(jié)點(diǎn)刪除,節(jié)點(diǎn)數(shù)據(jù)變革和子節(jié)點(diǎn)變更,這些是zookeeper自己的watcher事件類型。
那么curator組件還會(huì)提出哪些自己的事件呢?
public enum CuratorEventType
{
/**
* Corresponds to {@link CuratorFramework#create()}
*/
CREATE,
/**
* Corresponds to {@link CuratorFramework#delete()}
*/
DELETE,
/**
* Corresponds to {@link CuratorFramework#checkExists()}
*/
EXISTS,
/**
* Corresponds to {@link CuratorFramework#getData()}
*/
GET_DATA,
/**
* Corresponds to {@link CuratorFramework#setData()}
*/
SET_DATA,
/**
* Corresponds to {@link CuratorFramework#getChildren()}
*/
CHILDREN,
//....后面還有很多 事件
}
這些事件與zookeeper沒有直接關(guān)系,而是curator通過調(diào)用相應(yīng)api后,會(huì)觸發(fā)相應(yīng)的事件,例如調(diào)用create()方法,會(huì)觸發(fā)CREATE事件。如果調(diào)用checkExists方法,會(huì)觸發(fā)EXISTES事件。
cache包
該包內(nèi)主要熟悉NodeCache和PathChildrenCache
? NodeCache,是指可以從本地cache中得到節(jié)點(diǎn)數(shù)據(jù),并且該node可以增加watcher事件,例如節(jié)點(diǎn)的 更新/創(chuàng)建/刪除。然后重新拉取數(shù)據(jù),然后通過本地注冊(cè)的listeners,他們會(huì)得到變更通知。
private final CuratorFramework client;
private final String path;
private final boolean dataIsCompressed;
private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
private final AtomicBoolean isConnected = new AtomicBoolean(true);
private ConnectionStateListener connectionStateListener = new ConnectionStateListener();
private Watcher watcher = new Watcher();
這些是NodeCache的基本屬性,
listeners是存儲(chǔ)了節(jié)點(diǎn)緩存變更的監(jiān)聽器。
data是當(dāng)前節(jié)點(diǎn)的存儲(chǔ)的數(shù)據(jù),從zookeeper節(jié)點(diǎn)上緩存在本地的數(shù)據(jù)
connectionStateListener是連接狀態(tài)變更監(jiān)聽器,例如重連,掉線等事件
watcher就是與zookeeper中的一樣,針對(duì)路徑進(jìn)行監(jiān)聽。
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
{
this.client = client;
this.path = PathUtils.validatePath(path);
this.dataIsCompressed = dataIsCompressed;
}
普通的構(gòu)造器,最重要的是一個(gè)path路徑和client,當(dāng)聲明對(duì)象后,就要啟動(dòng)該NodeCache。
public void start(boolean buildInitial) throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
client.getConnectionStateListenable().addListener(connectionStateListener);
if ( buildInitial )
{
client.checkExists().creatingParentContainersIfNeeded().forPath(path);
internalRebuild();
}
reset();
}
首先會(huì)將connectionStateListener狀態(tài)監(jiān)聽器添加到client狀態(tài)監(jiān)聽列表中。如果buildInitial=true,需要初始化,那么嘗試創(chuàng)建parent,然后獲取zk上的節(jié)點(diǎn)數(shù)據(jù)。最終執(zhí)行reset方法。
private void reset() throws Exception
{
if ( (state.get() == State.STARTED) && isConnected.get() )
{
client.checkExists().creatingParentContainersIfNeeded()
.usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
}
reset方法其實(shí)將watcher添加path路徑中,并且針對(duì)checkExist方法增加回調(diào)方法backgroundCallback,那么該回調(diào)拿到的CuratorEvent事件肯定是EXIST事件。其實(shí)rest并沒有獲取節(jié)點(diǎn)數(shù)據(jù)。
看一下最終調(diào)用方法processBackgroundResult()方法:
private void processBackgroundResult(CuratorEvent event) throws Exception
{
switch ( event.getType() )
{
case GET_DATA:
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
ChildData childData = new ChildData(path, event.getStat(), event.getData());
setNewData(childData);
}
break;
}
case EXISTS:
{
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
setNewData(null);
}
else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
if ( dataIsCompressed )
{
client.getData().decompressed().usingWatcher(watcher)
.inBackground(backgroundCallback).forPath(path);
}
else {
client.getData().usingWatcher(watcher)
.inBackground(backgroundCallback).forPath(path);
}
}
break;
}
}
}
當(dāng)限制任然是EXISTS事件時(shí),判斷是否有節(jié)點(diǎn),如果沒有則setNewData方法,即設(shè)置Node的data數(shù)據(jù)為空。那么如果存在節(jié)點(diǎn),他然后沒有去主動(dòng)獲取得到data數(shù)據(jù),怎么做的?看一下他執(zhí)行了getData()方法,并且添加了watcher事件,但是任然通過回調(diào)方法,那么此時(shí)回調(diào)方法是GET_DATA事件了。最終processBackgroundResult方法是執(zhí)行了case GET_DATA這塊代碼。因?yàn)榇藭r(shí)發(fā)起獲取數(shù)據(jù)時(shí),會(huì)將數(shù)據(jù)添加到CuratorEvent中,此時(shí)生成了ChildData對(duì)象,包括了path,stat和節(jié)點(diǎn)數(shù)據(jù)等信息。
考慮一下zookeeper中Watcher是什么時(shí)候才能觸發(fā)事件,當(dāng)然是節(jié)點(diǎn)刪除,更新,或者創(chuàng)建才會(huì)發(fā)起,那么怎么才能使用在NodeCache中。再來看一下NodeCache自帶的屬性watcher實(shí)現(xiàn)類
private Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
try
{
reset();
}
catch(Exception e)
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
}
};
其實(shí)他任然調(diào)用reset方法,就是這么簡(jiǎn)單。還是先是發(fā)起EXISTS事件,然后GET_DATA事件。但是考慮清楚,watcher事件是一次性觸發(fā)功能,不會(huì)執(zhí)行第二次,所以在reset中,都會(huì)對(duì)path添加watcher事件。
那么NodeCache節(jié)點(diǎn)變更,如何通知添加在監(jiān)聽容器內(nèi)的監(jiān)聽器的?
private void setNewData(ChildData newData) throws InterruptedException
{
ChildData previousData = data.getAndSet(newData);
if ( !Objects.equal(previousData, newData) )
{
listeners.forEach
(
new Function<NodeCacheListener, Void>()
{
@Override
public Void apply(NodeCacheListener listener)
{
try
{
listener.nodeChanged();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("Calling listener", e);
}
return null;
}
}
);
//.....
}
在setNewData方法中,通過與原子應(yīng)用的data中之前存儲(chǔ)的previousData比較,如果不同。則那么需要遍歷容器內(nèi)的監(jiān)聽器了,最終執(zhí)行nodeChanged方法。
? PathChildrenCache 子路徑緩存
考慮一下,既然curator能在集群中使用,那么舉個(gè)最簡(jiǎn)單的例子,在集群中,增加或者減少服務(wù),需要及時(shí)發(fā)現(xiàn)才能防止繼續(xù)調(diào)用該服務(wù)。那么在curator如何使用?當(dāng)然在集群中同等服務(wù)功能中每臺(tái)服務(wù)都是作為一個(gè)節(jié)點(diǎn)角色使用的,那好,只要監(jiān)聽節(jié)點(diǎn)的變化例如節(jié)點(diǎn)移除,或者節(jié)點(diǎn)增加了。就能知道服務(wù)集群中的變更,那么節(jié)點(diǎn)該有哪些標(biāo)識(shí),可以用服務(wù)器的ip和端口組成唯一標(biāo)識(shí)。
所以,引申出來幾個(gè)概念,
1.需要監(jiān)聽的節(jié)點(diǎn)都是某個(gè)業(yè)務(wù)下parent的子節(jié)點(diǎn)children,2.針對(duì)添加子節(jié)點(diǎn),任然必須在parent下變更;
在curator中引入了子節(jié)點(diǎn)管理的幾個(gè)事件
PathChildrenCacheEvent下的Type類型:
public enum Type
{
/**
* A child was added to the path
*/
CHILD_ADDED,
/**
* A child's data was changed
*/
CHILD_UPDATED,
/**
* A child was removed from the path
*/
CHILD_REMOVED,
//還有其他事件
}
例如子節(jié)點(diǎn)新增,變更,刪除等其他事件。在curator采用了大量的異步調(diào)用線程,并且在PathChildrenCache中通過推送事件方式通知節(jié)點(diǎn)狀態(tài)變更的。
RefreshOperation 刷新事件,主要調(diào)用PathChildrenCache中refresh方法;
GetDataOperation 獲取節(jié)點(diǎn)數(shù)據(jù)事件,主要調(diào)用getDataAndStat方法(),異步方式得到節(jié)點(diǎn)數(shù)據(jù)
EventOperation 推送事件,推送給記錄在事件容器中的監(jiān)聽器,發(fā)起childEvent方法
那么在PathChildrenCache 分為2種形式的基本路徑,parentPath路徑,和childPath路徑,在zk中,已經(jīng)提過,當(dāng)對(duì)parentPath進(jìn)行監(jiān)聽,如果parentPath新增節(jié)點(diǎn),就會(huì)觸發(fā)children事件。所以PathChildrenCache也是利用了這點(diǎn)。
在refresh方法中
void refresh(final RefreshMode mode) throws Exception
{
ensurePath();
final BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if (PathChildrenCache.this.state.get().equals(State.CLOSED)) {
// This ship is closed, don't handle the callback
return;
}
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
processChildren(event.getChildren(), mode);
}
}
};
client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
}
對(duì)client添加了childrenWatcher,然后內(nèi)部有個(gè)回調(diào)類callback,他接受的CuratorEventType類型肯定是CHILDREN事件。在processChildren方法中,
private void processChildren(List<String> children, RefreshMode mode) throws Exception
{
Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
for ( String child : children ) {
removedNodes.remove(ZKPaths.makePath(path, child));
}
for ( String fullPath : removedNodes )
{
remove(fullPath);
}
for ( String name : children )
{
String fullPath = ZKPaths.makePath(path, name);
if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
{
getDataAndStat(fullPath);
}
updateInitialSet(name, NULL_CHILD_DATA);
}
maybeOfferInitializedEvent(initialSet.get());
}
其中children 是當(dāng)前parent地下所有的子路徑的名字(不是完整的路徑)。與本地記錄的當(dāng)前數(shù)據(jù),比較出需要移除的節(jié)點(diǎn),發(fā)送EventOperation事件中的子節(jié)點(diǎn)移除事件。然后通過RefreshMode模式或者當(dāng)前currentData中沒有保護(hù)子節(jié)點(diǎn)的全路徑,那么需要獲取數(shù)據(jù)。
在getDataAndStat()方法中
void getDataAndStat(final String fullPath) throws Exception
{
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
applyNewData(fullPath, event.getResultCode(), event.getStat(), cacheData ? event.getData() : null);
}
};
if ( USE_EXISTS && !cacheData )
{
client.checkExists().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
}
else
{
// always use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
if ( dataIsCompressed && cacheData )
{
client.getData().decompressed().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
}
else
{
client.getData().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
}
}
}
通過是否需要保持節(jié)點(diǎn)數(shù)據(jù)執(zhí)行相應(yīng)的方法,但是這里都有個(gè)共同的是,對(duì)該子節(jié)點(diǎn)全路徑添加dataWatcher事件,那么該路徑的刪除或者變更,都會(huì)通知到dataWatcher事件中。
在PathChildrenCache中存在2中Watcher事件對(duì)象
private volatile Watcher childrenWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
}
};
private volatile Watcher dataWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
try
{
if ( event.getType() == Event.EventType.NodeDeleted )
{
remove(event.getPath());
}
else if ( event.getType() == Event.EventType.NodeDataChanged )
{
offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
}
};
有針對(duì)parent路徑添加childrenWatcher事件,基本上子節(jié)點(diǎn)變更,都會(huì)觸發(fā),然后通過刷新事件,異步方式重新執(zhí)行refresh方法。
由這對(duì)具體的child路徑田間dataWatcher事件,主要是子節(jié)點(diǎn)路徑刪除,或者數(shù)據(jù)變更,做出相應(yīng)的移除事件或者獲取數(shù)據(jù)事件動(dòng)作。
熟悉了PathChildrenPath的工作原理,那么在工作中如何整合。首先我們要聲明自己的PathChildrenCacheListener 監(jiān)聽器實(shí)現(xiàn)類,有了他,才能知道節(jié)點(diǎn)的變更情況。 假設(shè)有個(gè)業(yè)務(wù)功能是多組服務(wù)器支撐提供,需要保證他能可動(dòng)態(tài)調(diào)整服務(wù)器資源。那么上游調(diào)用者就可以通過PathChildrenPath工具監(jiān)聽當(dāng)前提供服務(wù)器組有哪些,而不用實(shí)時(shí)關(guān)心,在發(fā)起調(diào)用時(shí),去判斷當(dāng)前存在的服務(wù)器信息了。
locks包
在分布式系統(tǒng)中,如果要使用公用某一資源時(shí)候,往往會(huì)申請(qǐng)一個(gè)分布式鎖。curator也提供了分布式鎖,利用了zk的特性。使用方式很簡(jiǎn)單:
public void testDistributeLock() throws Exception {
String host = "127.0.0.1:2181";
String path = root + "/lock_test";
CuratorFramework curator = CuratorClient.create(host);
InterProcessMutex mutex = new InterProcessMutex(curator, path);
if(mutex.acquire(10, TimeUnit.SECONDS)) {
try {
// 業(yè)務(wù)邏輯
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
}
}
}
path就是比作資源,鎖針對(duì)path就行資源獲取,然后執(zhí)行業(yè)務(wù)邏輯,最終都需要release鎖資源。觀察一下InterProcessMutex工作原理。
首先通過acquire方法了解internalLock方法
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
為什么通過當(dāng)前線程得知鎖的狀態(tài)數(shù)據(jù),加入從threadData拿到了lockData數(shù)據(jù),說明在這個(gè)線程之前就已經(jīng)獲取鎖資源了,如果重復(fù)獲取同一個(gè)鎖,那么只要記錄lockCount數(shù)量即可。當(dāng)前線程沒有保存的鎖資源,需要通過internals內(nèi)置鎖工具嘗試獲取鎖,最終得到一個(gè)lockPath,然后進(jìn)行封裝成LockData保存在threadData中,沒有返回路徑,說明獲取鎖失敗了。
看一下LockInternals 類attemptLock方法,是如何嘗試獲取鎖
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases)
{
this.driver = driver;
this.lockName = lockName;
this.maxLeases = maxLeases;
this.client = client;
this.basePath = PathUtils.validatePath(path);
this.path = ZKPaths.makePath(path, lockName);
}
在構(gòu)造器中,有兩個(gè)路徑,basePath基本路徑,還有path這個(gè)路徑是通過basePath與lockName(默認(rèn)名字為lock-)組合起來,說明這里的path是basePath的子節(jié)點(diǎn)路徑。還有一個(gè)參數(shù) driver,默認(rèn)通過StandardLockInternalsDriver類實(shí)現(xiàn)的,該類主要負(fù)責(zé)創(chuàng)建路徑,判斷是否能獲取鎖。看一下StandardLockInternalsDriver創(chuàng)建路徑代碼:
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
其中path路徑時(shí)基本路徑與lockName組合起來的最終路徑名稱,但是創(chuàng)建的時(shí)候,采用了EPHEMERAL_SEQUENTIAL模式得到的路徑,首先路徑是非永久狀態(tài)存儲(chǔ)的,如果連接端口,該ourPath就會(huì)刪除。然后還有特點(diǎn)是有序的,就是ourPath的路徑是path路徑與順序編號(hào)組合在一起的,并且是有序遞增編號(hào)的路徑,例如 /test/lock-000001,test/lock-000002。每次創(chuàng)建都會(huì)增加編號(hào),而且不會(huì)重復(fù),這是zookeeper中一個(gè)特性。
再來熟悉LockInternals中的attemptLock方法。
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try {
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e ) {
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {
isDone = false;
}
else {
throw e;
}
}
}
if ( hasTheLock ) {
return ourPath;
}
return null;
}
為什么在一個(gè)循環(huán)體內(nèi),是為了容錯(cuò),在創(chuàng)建ourPath失敗時(shí),進(jìn)行重復(fù)嘗試。通過driver創(chuàng)建了一個(gè)非持久的并且有序編號(hào)的ourPath路徑,那么考慮一下,因?yàn)槁窂綍r(shí)的編號(hào)是遞增的,那么編號(hào)越小,那么他獲得鎖的概率應(yīng)該是最大的,因?yàn)樗亲钤鐒?chuàng)建路徑,也就分配的編號(hào)小了。當(dāng)定義完這個(gè)獲取鎖的規(guī)則后,后續(xù)就方便很多了。
在internalLockLoop方法中,如何與等待時(shí)間相結(jié)合,當(dāng)獲得鎖后,就可以成功呢?
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
if ( revocable.get() != null ) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() ) {
haveTheLock = true;
}
else {
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try {
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null ) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 ) {
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else {
wait();
}
}
catch ( KeeperException.NoNodeException e ) {
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
在while循環(huán)體內(nèi),獲取了所有basePath下的children節(jié)點(diǎn)名稱,并且進(jìn)行從小到大編號(hào)排序。然后通過driver中的getsTheLock方法得到我節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)名稱,如果不存在,說明我的節(jié)點(diǎn)是第一個(gè),那么我就能獲取鎖。
如果存在前一個(gè)節(jié)點(diǎn),構(gòu)建完整的路徑previousSequencePath,并對(duì)該路徑進(jìn)行監(jiān)聽,增加watcher事件。為什么要這么?還是那個(gè)問題,編號(hào)是有序遞增的,只有當(dāng)我前一個(gè)節(jié)點(diǎn)釋放鎖了,下一個(gè)是我,我就能得到鎖。那么前一個(gè)節(jié)點(diǎn)如何釋放鎖,可以主動(dòng)刪除節(jié)點(diǎn),或者掉線系統(tǒng)自動(dòng)刪除。在對(duì)previousSequencePath添加watcher事件后,進(jìn)入等待,那么當(dāng)前線程等待時(shí)通過誰來喚醒呢?當(dāng)然是通過watcher來喚醒,通過調(diào)用notifyAll方式喚醒線程,然后重新執(zhí)行循環(huán),知道超時(shí),或者得到鎖。這里有個(gè)需要考慮,在超時(shí)時(shí),將doDelete標(biāo)記為刪除,然后再finally方法中通過這個(gè)狀態(tài)去刪除ourPath節(jié)點(diǎn),為什么要這樣呢?因?yàn)槌瑫r(shí)情況下,認(rèn)定是沒有獲取鎖,但是路徑我已經(jīng)創(chuàng)建了,如果不去主動(dòng)刪除,那么他會(huì)一直占用,在ourPath后面的路徑就會(huì)一直等著他主動(dòng)刪除。在考慮一下,這里為什么會(huì)存在 KeeperException.NoNodeException異常呢?因?yàn)樵趯?duì)previousSequencePath進(jìn)行監(jiān)聽時(shí),假設(shè)這個(gè)鎖剛好釋放了,已經(jīng)刪除了previousSequencePath路徑,那么當(dāng)前去監(jiān)聽時(shí),路徑就會(huì)不存在,然后會(huì)拋出節(jié)點(diǎn)不存在的異常。
這是一個(gè)完整的獲取鎖的流程,也很嚴(yán)謹(jǐn)?shù)奶幚砀鞣N出現(xiàn)異常時(shí)的邏輯。當(dāng)然獲取鎖,用完就要進(jìn)行釋放。
在Mutex中的release方法中
public void release() throws Exception
{
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null ) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 ) {
return;
}
if ( newLockCount < 0 ) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
internals.releaseLock(lockData.lockPath);
}
finally
{
threadData.remove(currentThread);
}
}
首先釋放時(shí),從LockData中對(duì)lockCount進(jìn)行扣減,如果任然大于0,就return了,表示該鎖沒有用完。最終通過internals執(zhí)行releaseLock方法,然后移除掉threadData中的currentThread數(shù)據(jù),在LockInternal中釋放就很簡(jiǎn)單了,就是調(diào)用刪除路徑功能,達(dá)到釋放資源效果。那么監(jiān)聽此時(shí)的lockPath時(shí),就能監(jiān)聽到刪除事件,就會(huì)獲取鎖。
在java并發(fā)包中存在讀寫鎖,那么在curator中也存在這樣的讀寫分布式鎖-InterProcessReadWriteLock。
大致與InterProcessMutex思想一致的,但是內(nèi)部有兩個(gè)InterProcessMutex組成,一個(gè)是readMutex,一個(gè)是writeMutex。與Java中的ReentrantReadWriteLock一樣,如果當(dāng)前是read獲取到了資源,那么另外一個(gè)read線程也能獲取資源,都是read資源是不進(jìn)行互斥的,但是如果有write資源,那么就會(huì)互斥。寫與寫資源也是存在互斥的。所以InterProcessReadWriteLock是如何實(shí)現(xiàn)功能的?
無論read,write鎖,他們的basePath肯定是一致的,而且在zk中,創(chuàng)建子節(jié)點(diǎn)為序列化的時(shí)候,不會(huì)因?yàn)樽庸?jié)點(diǎn)的名稱不一樣,編號(hào)會(huì)重置。而且同等對(duì)待,編號(hào)永遠(yuǎn)是有序遞增的。那么好了,writeMutex就是互斥鎖,與什么請(qǐng)求資源無關(guān),readMutex只要判斷在之前的節(jié)點(diǎn)中存在write路徑,那么就需要等待。那么怎么判斷呢。在StandardLockInternalsDriver中有個(gè)getsTheLock方法,該方法返回的PredicateResults結(jié)果才能知道是否能拿到鎖,或者對(duì)前一個(gè)路徑進(jìn)行監(jiān)聽。
在readMutex鎖重新了該方法
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception
{
if ( writeMutex.isOwnedByCurrentThread() )
{
return new PredicateResults(null, true);
}
int index = 0;
int firstWriteIndex = Integer.MAX_VALUE;
int ourIndex = -1;
for ( String node : children )
{
if ( node.contains(WRITE_LOCK_NAME) )
{
firstWriteIndex = Math.min(index, firstWriteIndex);
}
else if ( node.startsWith(sequenceNodeName) )
{
ourIndex = index;
break;
}
++index;
}
StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = (ourIndex < firstWriteIndex);
String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
return new PredicateResults(pathToWatch, getsTheLock);
}
首先判斷是否是重復(fù)鎖,然后查詢第一個(gè)firstWriteIndex寫路徑的位置,與ourIndex自己的位置,進(jìn)行比較。如果ourIndex小,那么就可以獲得鎖了,如果大,那么需要監(jiān)聽firstWriteIndex的對(duì)應(yīng)的路徑了。