API說明
InterProcessMutex有兩個構(gòu)造方法
public InterProcessMutex(CuratorFramework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
this(client, path, LOCK_NAME, 1, driver);
}
參數(shù)說明
| 參數(shù) | 說明 |
|---|---|
| client | curator中zk客戶端對象 |
| path | 搶鎖路徑,同一個鎖path需一致 |
| driver | 可自定義lock驅(qū)動實(shí)現(xiàn)分布式鎖 |
主要方法
//獲取鎖,若失敗則阻塞等待直到成功,支持重入
public void acquire() throws Exception
//超時獲取鎖,超時失敗
public boolean acquire(long time, TimeUnit unit) throws Exception
//釋放鎖
public void release() throws Exception
注意:調(diào)用acquire()方法后需相應(yīng)調(diào)用release()來釋放鎖
使用簡介
下面的例子模擬了100個線程同時搶鎖,搶鎖成功的線程睡眠1秒鐘后釋放鎖,通知其他等待的線程重新?lián)屾i,比較簡單,不多說
public class InterprocessLock {
static CountDownLatch countDownLatch = new CountDownLatch(10);
public static void main(String[] args) {
CuratorFramework zkClient = getZkClient();
String lockPath = "/lock";
InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
//模擬100個線程搶鎖
for (int i = 0; i < 100; i++) {
new Thread(new TestThread(i, lock)).start();
}
}
static class TestThread implements Runnable {
private Integer threadFlag;
private InterProcessMutex lock;
public TestThread(Integer threadFlag, InterProcessMutex lock) {
this.threadFlag = threadFlag;
this.lock = lock;
}
@Override
public void run() {
try {
lock.acquire();
System.out.println("第"+threadFlag+"線程獲取到了鎖");
//等到1秒后釋放鎖
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private static CuratorFramework getZkClient() {
String zkServerAddress = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServerAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
return zkClient;
}
}
源碼分析
從獲取鎖acquire()方法入手
public void acquire() throws Exception {
if ( !internalLock(-1, null) ) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
看到調(diào)用了internalLock方法,進(jìn)到internalLock方法中
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
//先判斷當(dāng)前線程是否持有了鎖,如果是,則加鎖次數(shù)count+1,返回成功
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
//調(diào)用LockInternals的attemptLock()方法進(jìn)行加鎖
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
//加鎖成功,則將當(dāng)前線程對應(yīng)加鎖數(shù)據(jù)加到map中
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
進(jìn)到LockInternals的attemptLock()中,看下代碼
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
//開始時間,后面用做超時判斷
final long startMillis = System.currentTimeMillis();
//超時時間,轉(zhuǎn)換為毫秒
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//節(jié)點(diǎn)數(shù)據(jù)
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
//重試次數(shù)
int retryCount = 0;
//lockPath
String ourPath = null;
//是否持有鎖
boolean hasTheLock = false;
//是否處理完成
boolean isDone = false;
//循環(huán)處理
while ( !isDone )
{
isDone = true;
try
{
//在path下創(chuàng)建一個EPHEMERAL_SEQUENTIAL(臨時順序型)類型節(jié)點(diǎn)
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//搶鎖并判斷是否擁有鎖
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// 重試范圍內(nèi)時進(jìn)行重試
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
創(chuàng)建臨時有序節(jié)點(diǎn)createsTheLock方法如下,比較簡單
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
判斷是否擁有鎖的方法internalLockLoop才是核心,下面注意了
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
//自旋
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
//獲取path下對應(yīng)臨時有序節(jié)點(diǎn),并按節(jié)點(diǎn)編號從小到大排序
List<String> children = getSortedChildren();
//獲取當(dāng)前線程創(chuàng)建的臨時節(jié)點(diǎn)名稱
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//判斷當(dāng)前節(jié)點(diǎn)編號是否<maxLease,若是,則搶到了鎖,maxLease這里為1,所以只有index為0時才搶到鎖,標(biāo)識只有1個線程能搶到鎖
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
//前一個節(jié)點(diǎn)編號較小的節(jié)點(diǎn)的路徑
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
//如果沒搶到鎖,監(jiān)聽前一個節(jié)點(diǎn)事件
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
判斷是否超時
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
//超時 直接退出,并標(biāo)記 刪除節(jié)點(diǎn)doDelete標(biāo)記=true
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
//調(diào)用Object.wait(),等待線程被notify喚醒
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
//如果標(biāo)記了刪除,刪除節(jié)點(diǎn)數(shù)據(jù)
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
可以看到邏輯比較清晰,N個線程同時在path下創(chuàng)建臨時順序節(jié)點(diǎn),編號最小的獲取鎖,沒搶到鎖的會調(diào)用wait()方法等待被喚醒
那么是在哪里調(diào)用了notify()方法來喚醒其他節(jié)點(diǎn)的呢?
答案是在監(jiān)聽器wacher里,該監(jiān)聽器會在前一個(節(jié)點(diǎn)編號較小)的節(jié)點(diǎn)被刪除后觸發(fā)
先分析下釋放鎖的方法release
看下源碼
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
//如果鎖被當(dāng)前線程獲取了超過1次,將count-1,直接返回
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{
//釋放鎖
internals.releaseLock(lockData.lockPath);
}
finally
{
threadData.remove(currentThread);
}
}
最終調(diào)用releaseLock方法中的deleteOurPath中
void releaseLock(String lockPath) throws Exception
{
revocable.set(null);
deleteOurPath(lockPath);
}
private void deleteOurPath(String ourPath) throws Exception
{
try
{
//直接調(diào)用client刪除節(jié)點(diǎn)
client.delete().guaranteed().forPath(ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// ignore - already deleted (possibly expired session, etc.)
}
}
節(jié)點(diǎn)被刪除后,會觸發(fā)搶鎖過程中的wather監(jiān)聽器,看下監(jiān)聽器中內(nèi)容
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
notifyFromWatcher();
}
};
private synchronized void notifyFromWatcher() {
notifyAll();
}
可以看到節(jié)點(diǎn)path被刪除后,會通知后面一個節(jié)點(diǎn)進(jìn)行notify操作,notify操作后,重新進(jìn)入while自旋中,重新判斷是否搶到了鎖
最后看下getTheLock
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases)
throws Exception {
// 之前創(chuàng)建的臨時順序節(jié)點(diǎn)在排序后的子節(jié)點(diǎn)列表中的索引
int ourIndex =
children.indexOf(sequenceNodeName);
// 校驗(yàn)之前創(chuàng)建的臨時順序節(jié)點(diǎn)是否有效
validateOurIndex(sequenceNodeName,
ourIndex);
// 鎖公平性的核心邏輯
// 由 InterProcessMutex 的構(gòu)造函數(shù)可知, maxLeases 為 1,即只有 ourIndex 為 0 時,線程才能持有鎖,或者說該線程創(chuàng)建的臨時順序節(jié)點(diǎn)激活了鎖
// Zookeeper 的臨時順序節(jié)點(diǎn)特性能保證跨多個 JVM 的線程并發(fā)創(chuàng)建節(jié)點(diǎn)時的順序性,越早創(chuàng)建臨時順序節(jié)點(diǎn)成功的線程會更早地激活鎖或獲得鎖
boolean getsTheLock = ourIndex <
maxLeases;
// 如果已經(jīng)獲得了鎖,則無需監(jiān)聽任何節(jié)點(diǎn),否則需要監(jiān)聽上一順序節(jié)點(diǎn)(ourIndex - 1)
// 因 為 鎖 是 公 平 的 , 因 此 無 需 監(jiān) 聽 除 了(ourIndex - 1)以外的所有節(jié)點(diǎn),這是為了減少羊群效應(yīng), 非常巧妙的設(shè)計?。? String pathToWatch = getsTheLock ? null :
children.get(ourIndex - maxLeases);
// 返回獲取鎖的結(jié)果,交由上層繼續(xù)處理(添加監(jiān)聽等操作)
return new PredicateResults(pathToWatch,
getsTheLock);
}
static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException {
if (ourIndex < 0) {
// 容錯處理,可跳過
// 由于會話過期或連接丟失等原因,該線程創(chuàng)建的臨時順序節(jié)點(diǎn)被 Zookeeper 服務(wù)端刪除,往外拋出 NoNodeException
// 如果在重試策略允許范圍內(nèi),則進(jìn)行重新嘗試獲取鎖,這會重新重新生成臨時順序節(jié)點(diǎn)
// 佩服 Curator 的作者將邊界條件考慮得 如此周到!
throw new KeeperException.NoNodeException("Sequential path not found:" + sequenceNodeName);
}
}