分布式鎖的基本場景
如果在多線程并行情況下去訪問某一個共享資源,比如說共享變量,那么勢必會造成線程安全問題。那么我們可以用很多種方法來解決,比如 synchronized、 比如 Lock 之類的鎖操作來解決線程安全問題,那么在分布式架構(gòu)下,涉及到多個進程訪問某一個共享資源的情況,比如說在電商平臺中商品庫存問題,在庫存只有 10 個的情況下進來100 個用戶,如何能夠避免超賣呢?所以這個時候我們需要一些互斥手段來防止彼此之間的干擾。然后在分布式情況下,synchronized 或者 Lock 之類的鎖只能控制單一進程的資源訪問,在多進程架構(gòu)下,這些 api就沒辦法解決我們的問題了。怎么辦呢?
用 zookeeper 來實現(xiàn)分布式鎖
我們可以利用 zookeeper 節(jié)點的特性來實現(xiàn)獨占鎖,就是同級節(jié)點的唯一性,多個進程往 zookeeper 的指定節(jié)點下創(chuàng)建一個相同名稱的節(jié)點,只有一個能成功,另外一個是創(chuàng)建失敗;
創(chuàng)建失敗的節(jié)點全部通過 zookeeper 的 watcher 機制來監(jiān)聽 zookeeper 這個子節(jié)點的變化,一旦監(jiān)聽到子節(jié)點的刪除事件,則再次觸發(fā)所有進程去寫鎖;

這種實現(xiàn)方式很簡單,但是會產(chǎn)生“驚群效應(yīng)”,簡單來說就是如果存在許多的客戶端在等待獲取鎖,當(dāng)成功獲取到鎖的進程釋放該節(jié)點后,所有處于等待狀態(tài)的客戶端都會被喚醒,這個時候 zookeeper 在短時間內(nèi)發(fā)送大量子節(jié)點變更事件給所有待獲取鎖的客戶端,然后實際情況是只會有一個客戶端獲得鎖。如果在集群規(guī)模比較大的情況下,會對 zookeeper 服務(wù)器的性能產(chǎn)生比較的影響。
利用有序節(jié)點來實現(xiàn)分布式鎖
我們可以通過有序節(jié)點來實現(xiàn)分布式鎖,每個客戶端都往指定的節(jié)點下注冊一個臨時有序節(jié)點,越早創(chuàng)建的節(jié)點,節(jié)點的順序編號就越小,那么我們可以判斷子節(jié)點中最小的節(jié)點設(shè)置為獲得鎖。
如果自己的節(jié)點不是所有子節(jié)點中最小的,意味著還沒有獲得鎖。這個的實現(xiàn)和前面單節(jié)點實現(xiàn)的差異性在于,每個節(jié)點只需要監(jiān)聽比自己小的節(jié)點,當(dāng)比自己小的節(jié)點刪除以后,客戶端會收到 watcher 事件,此時再次判斷自己的節(jié)點是不是所有子節(jié)點中最小的,如果是則獲得鎖,否則就不斷重復(fù)這個過程,這樣就不會導(dǎo)致羊群效應(yīng),因為每個客戶端只需要監(jiān)控一個節(jié)點。
curator 分布式鎖的基本使用
curator 對于鎖這塊做了一些封裝,curator 提供了InterProcessMutex 這樣一個 api。除了分布式鎖之外,還提供了 leader 選舉、分布式隊列等常用的功能。
- InterProcessMutex:分布式可重入排它鎖
- InterProcessSemaphoreMutex:分布式排它鎖
- InterProcessReadWriteLock:分布式讀寫鎖
public class LockDemo {
private static String CONNECTION_STR = "192.168.13.102:2181,192.168.13.103:2181,192.168.13.104:2181";
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
connectString(CONNECTION_STR).sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
curatorFramework.start();
final InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/locks");
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "->嘗試競爭鎖");
try {
lock.acquire(); //阻塞競爭鎖
System.out.println(Thread.currentThread().getName() + "->成功獲得了鎖");
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
lock.release(); //釋放鎖
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread-" + i).start();
}
}
}
Curator 實現(xiàn)分布式鎖的基本原理
我們先打開InterProcessMutex構(gòu)造函數(shù)看看源碼
// 最常用
public InterProcessMutex(CuratorFramework client,
String path){
// Zookeeper 利用 path 創(chuàng)建臨時順序節(jié)點,實現(xiàn)公平鎖的核心
this(client, path, new StandardLockInternalsDriver());
}
public InterProcessMutex(CuratorFramework client,
String path, LockInternalsDriver driver){
// maxLeases=1,表示可以獲得分布式鎖的線程數(shù)量(跨 JVM)為 1,即為互斥鎖
this(client, path, LOCK_NAME, 1, driver);
}
// protected 構(gòu)造函數(shù)
InterProcessMutex(CuratorFramework client, String
path, String lockName, int maxLeases,
LockInternalsDriver driver){
basePath = PathUtils.validatePath(path);
// internals 的類型為 LockInternals ,
InterProcessMutex 將分布式鎖的申請和釋放操作委托給internals 執(zhí)行
internals = new LockInternals(client, driver, path,
lockName, maxLeases);
}
再跟進InterProcessMutex.acquire方法
// 無限等待
public void acquire() throws Exception {
if (!internalLock(-1, null)) {
throw new IOException("Lost connection while trying to acquire lock:" + basePath);
}
}
// 限時等待
public boolean acquire(long time, TimeUnit unit)
throws Exception {
return internalLock(time, unit);
}
InterProcessMutex.internalLock
private boolean internalLock(long time, TimeUnit unit) throws Exception {
Thread currentThread =
Thread.currentThread();
LockData lockData =
threadData.get(currentThread);
if (lockData != null) {
// 實現(xiàn)可重入
// 同一線程再次 acquire,首先判斷當(dāng)前的映射表內(nèi)(threadData)是否有該線程的鎖信息,如果有則原子 + 1,然后返回
lockData.lockCount.incrementAndGet();
return true;
}
// 映射表內(nèi)沒有對應(yīng)的鎖信息,嘗試通過LockInternals 獲取鎖
String lockPath = internals.attemptLock(time,unit, getLockNodeBytes());
if (lockPath != null) {
// 成功獲取鎖,記錄信息到映射表
LockData newLockData = new
LockData(currentThread, lockPath);
threadData.put(currentThread,
newLockData);
return true;
}
return false;
}
// 映射表
// 記錄線程與鎖信息的映射關(guān)系
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
// 鎖信息
// Zookeeper 中一個臨時順序節(jié)點對應(yīng)一個“鎖”,但讓鎖生效激活需要排隊(公平鎖),下面會繼續(xù)分析
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new
AtomicInteger(1); // 分布式鎖重入次數(shù)
private LockData(Thread owningThread,
String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
LockInternals.attemptLock
// 嘗試獲取鎖,并返回鎖對應(yīng)的 Zookeeper 臨時順序節(jié)點的路徑
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
final long startMillis = System.currentTimeMillis();
// 無限等待時,millisToWait 為 null
final Long millisToWait = (unit != null) ?
unit.toMillis(time) : null;
// 創(chuàng)建 ZNode 節(jié)點時的數(shù)據(jù)內(nèi)容,無關(guān)緊要,這里為 null,采用默認值(IP 地址)
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
// 當(dāng)前已經(jīng)重試次數(shù),與CuratorFramework的重試策略有關(guān)
int retryCount = 0;
// 在 Zookeeper 中創(chuàng)建的臨時順序節(jié)點的路徑,相當(dāng)于一把待激活的分布式鎖
// 激活條件:同級目錄子節(jié)點,名稱排序最?。ㄅ抨?,公平鎖),后續(xù)繼續(xù)分析
String ourPath = null;
// 是否已經(jīng)持有分布式鎖
boolean hasTheLock = false;
// 是否已經(jīng)完成嘗試獲取分布式鎖的操作
boolean isDone = false;
while (!isDone) {
isDone = true;
try {
// 從 InterProcessMutex 的構(gòu)造函數(shù)可知實際 driver 為 StandardLockInternalsDriver 的實例
// 在Zookeeper中創(chuàng)建臨時順序節(jié)點
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 循環(huán)等待來激活分布式鎖,實現(xiàn)鎖的公平性
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
} catch
(KeeperException.NoNodeException e) {
// 容錯處理,不影響主邏輯的理解,可跳過
// 因 為 會 話 過 期 等 原 因 ,StandardLockInternalsDriver 因為無法找到創(chuàng)建的臨時 順序節(jié)點而拋出 NoNodeException 異常
if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,
System.currentTimeMillis() -
startMillis, RetryLoop.getDefaultRetrySleeper())) {
// 滿足重試策略嘗試重新獲取鎖
isDone = false;
} else {
// 不滿足重試策略則繼續(xù)拋出NoNodeException
throw e;
}
}
}
if (hasTheLock) {
// 成功獲得分布式鎖,返回臨時順序節(jié)點的路徑,上層將其封裝成鎖信息記錄在映射表,方便鎖重入
return ourPath;
}
// 獲取分布式鎖失敗,返回 null
return null;
}
createsTheLock
// From StandardLockInternalsDriver
// 在 Zookeeper 中創(chuàng)建臨時順序節(jié)點
public String createsTheLock(CuratorFramework
client, String path, byte[] lockNodeBytes) throws
Exception {
String ourPath;
// lockNodeBytes 不為 null 則作為數(shù)據(jù)節(jié)點內(nèi)容,否則采用默認內(nèi)容(IP 地址)
if (lockNodeBytes != null) {
// 下面對 CuratorFramework 的一些細節(jié)做解釋,不影響對分布式鎖主邏輯的解釋,可跳過
// creatingParentContainersIfNeeded:用于創(chuàng)建父節(jié)點,如果不支持 CreateMode.CONTAINER
// 那么將采用 CreateMode.PERSISTENT
// withProtection:臨時子節(jié)點會添加GUID前綴
ourPath = client.create().creatingParentContainersIfNeeded()
//CreateMode.EPHEMERAL_SEQUENTIAL:臨時順序節(jié)點,Zookeeper 能保證在節(jié)點產(chǎn)生的順序性
// 依據(jù)順序來激活分布式鎖,從而也實現(xiàn)了分布式鎖的公平性,后續(xù)繼續(xù)分析
.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
} else {
ourPath =
client.create().creatingParentContainersIfNeeded()
.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
LockInternals.internalLockLoop
// 循環(huán)等待來激活分布式鎖,實現(xiàn)鎖的公平性
private boolean internalLockLoop(long startMillis,
Long millisToWait, String ourPath) throws Exception {
// 是否已經(jīng)持有分布式鎖
boolean haveTheLock = false;
// 是否需要刪除子節(jié)點
boolean doDelete = false;
try {
if (revocable.get() != null) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ((client.getState() ==
CuratorFrameworkState.STARTED) && !haveTheLock) {
// 獲取排序后的子節(jié)點列表
List<String> children = getSortedChildren();
// 獲取前面自己創(chuàng)建的臨時順序子節(jié)點的名稱
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// 實現(xiàn)鎖的公平性的核心邏輯,看下面的分析
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if (predicateResults.getsTheLock()) {
// 獲得了鎖,中斷循環(huán),繼續(xù)返回上層
haveTheLock = true;
} else {
// 沒有獲得到鎖,監(jiān)聽上一臨時順序節(jié)點
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized (this) {
try {
// exists()會導(dǎo)致導(dǎo)致資源泄漏,因此 exists () 可以監(jiān)聽不存在的 ZNode,因此采用 getData ()
// 上一臨時順序節(jié)點如果被刪除,會喚醒當(dāng)前線程繼續(xù)競爭鎖,正常情況下能直接獲得鎖,因為鎖是公平的
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if (millisToWait != null) {
millisToWait -=
(System.currentTimeMillis() - startMillis);
startMillis =
System.currentTimeMillis();
if (millisToWait <=
0) {
doDelete =
true; // 獲取鎖超時,標記刪除之前創(chuàng)建的臨時順序節(jié)點
break;
}
wait(millisToWait);
// 等待被喚醒,限時等待
} else {
wait(); // 等待被喚醒,無限等待
}
} catch
(KeeperException.NoNodeException e) {
// 容錯處理,邏輯稍微有點繞,可跳過,不影響主邏輯的理解
// client.getData()可能調(diào)用時拋出 NoNodeException,原因可能是鎖被釋放或會話過期(連接丟失)等
// 這里并沒有做任何處理,因為外層是 while 循環(huán),再次執(zhí)行 driver.getsTheLock 時會調(diào)用 validateOurIndex
// 此 時 會 拋 出NoNodeException,從而進入下面的 catch 和 finally 邏輯,重新拋出上層嘗試重試獲取鎖并刪除臨時順序節(jié)點
}
}
}
}
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
// 標記刪除,在 finally 刪除之前創(chuàng)建的臨時順序節(jié)點(后臺不斷嘗試)
doDelete = true;
// 重新拋出,嘗試重新獲取鎖
throw e;
} finally {
if (doDelete) {
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
getTheLock
// From StandardLockInternalsDriver
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases)
throws Exception {
// 之前創(chuàng)建的臨時順序節(jié)點在排序后的子節(jié)點列表中的索引
int ourIndex =
children.indexOf(sequenceNodeName);
// 校驗之前創(chuàng)建的臨時順序節(jié)點是否有效
validateOurIndex(sequenceNodeName,
ourIndex);
// 鎖公平性的核心邏輯
// 由 InterProcessMutex 的構(gòu)造函數(shù)可知, maxLeases 為 1,即只有 ourIndex 為 0 時,線程才能持有鎖,或者說該線程創(chuàng)建的臨時順序節(jié)點激活了鎖
// Zookeeper 的臨時順序節(jié)點特性能保證跨多個 JVM 的線程并發(fā)創(chuàng)建節(jié)點時的順序性,越早創(chuàng)建臨時順序節(jié)點成功的線程會更早地激活鎖或獲得鎖
boolean getsTheLock = ourIndex <
maxLeases;
// 如果已經(jīng)獲得了鎖,則無需監(jiān)聽任何節(jié)點,否則需要監(jiān)聽上一順序節(jié)點(ourIndex - 1)
// 因 為 鎖 是 公 平 的 , 因 此 無 需 監(jiān) 聽 除 了(ourIndex - 1)以外的所有節(jié)點,這是為了減少羊群效應(yīng), 非常巧妙的設(shè)計??!
String pathToWatch = getsTheLock ? null :
children.get(ourIndex - maxLeases);
// 返回獲取鎖的結(jié)果,交由上層繼續(xù)處理(添加監(jiān)聽等操作)
return new PredicateResults(pathToWatch,
getsTheLock);
}
static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException {
if (ourIndex < 0) {
// 容錯處理,可跳過
// 由于會話過期或連接丟失等原因,該線程創(chuàng)建的臨時順序節(jié)點被 Zookeeper 服務(wù)端刪除,往外拋出 NoNodeException
// 如果在重試策略允許范圍內(nèi),則進行重新嘗試獲取鎖,這會重新重新生成臨時順序節(jié)點
// 佩服 Curator 的作者將邊界條件考慮得 如此周到!
throw new KeeperException.NoNodeException("Sequential path not found:" + sequenceNodeName);
}
}
釋放鎖的邏輯
InterProcessMutex.release
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if (lockData == null) {
// 無法從映射表中獲取鎖信息,不持有鎖
throw new IllegalMonitorStateException("You do not own the lock:" + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount > 0) {
// 鎖是可重入的,初始值為 1,原子-1 到0,鎖才釋放
return;
}
if (newLockCount < 0) {
// 理論上無法執(zhí)行該路徑
throw new IllegalMonitorStateException("Lock count has gonenegative for lock:" + basePath);
}
try {
// lockData != null && newLockCount == 0,釋放鎖資源
internals.releaseLock(lockData.lockPath);
} finally {
// 最后從映射表中移除當(dāng)前線程的鎖信息
threadData.remove(currentThread);
}
}
LockInternals.releaseLock
void releaseLock(String lockPath) throws Exception {
revocable.set(null);
// 刪除臨時順序節(jié)點,只會觸發(fā)后一順序節(jié)點去 獲取鎖,理論上不存在競爭,只排隊,非搶占,公平鎖,先到先得
deleteOurPath(lockPath);
}
// Class:LockInternals
private void deleteOurPath(String ourPath) throws Exception {
try {
// 后臺不斷嘗試刪除
client.delete().guaranteed().forPath(ourPath);
} catch (KeeperException.NoNodeException e) {
// 已經(jīng)刪除(可能會話過期導(dǎo)致),不做處理
// 實際使用 Curator-2.12.0 時,并不會拋出該異常
}
}
```s
——學(xué)自咕泡學(xué)院