寫(xiě)作不易,點(diǎn)贊收藏關(guān)注以便下次再看,感謝爸爸們的支持~
上回咱們說(shuō)到,用Mysql數(shù)據(jù)庫(kù)實(shí)現(xiàn)了分布式鎖。實(shí)現(xiàn)起來(lái)相對(duì)簡(jiǎn)單。
但是缺陷也相對(duì)比較明顯,一方面是SQL鎖沒(méi)有過(guò)期機(jī)制,如果不保持高可用的情況下,線程沒(méi)有釋放掉鎖就會(huì)出現(xiàn)死鎖。
另一方面是因?yàn)镾QL本身性能并不高,因此采用SQL加鎖的方式會(huì)極大拖累整個(gè)系統(tǒng)的性能。
基于以上各點(diǎn),本期咱們沿著Zookeeper展開(kāi),介紹如何使用Zookeeper實(shí)現(xiàn)相應(yīng)的分布式鎖。

Zookeeper簡(jiǎn)介
在開(kāi)始咱的文章前,先來(lái)介紹下Zookeeper是個(gè)什么東西。咱們先來(lái)看下百度百科對(duì)于Zookeeper的定義是什么。
ZooKeeper是一個(gè)分布式的,開(kāi)放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),是Google的Chubby一個(gè)開(kāi)源的實(shí)現(xiàn),是Hadoop和Hbase的重要組件。它是一個(gè)為分布式應(yīng)用提供一致性服務(wù)的軟件,提供的功能包括:配置維護(hù)、域名服務(wù)、分布式同步、組服務(wù)等。
ZooKeeper的目標(biāo)就是封裝好復(fù)雜易出錯(cuò)的關(guān)鍵服務(wù),將簡(jiǎn)單易用的接口和性能高效、功能穩(wěn)定的系統(tǒng)提供給用戶。
ZooKeeper包含一個(gè)簡(jiǎn)單的原語(yǔ)集,提供Java和C的接口。
ZooKeeper代碼版本中,提供了分布式獨(dú)享鎖、選舉、隊(duì)列的接口,代碼在$zookeeper_home\src\recipes。其中分布鎖和隊(duì)列有Java和C兩個(gè)版本,選舉只有Java版本。

換成比較通俗易懂的話來(lái)說(shuō),Zookeeper其實(shí)本質(zhì)上就像一個(gè)文件管理系統(tǒng)。其用類(lèi)似文件路徑的方式管理、監(jiān)聽(tīng)多個(gè)節(jié)點(diǎn)(Znode),同時(shí)判斷當(dāng)前每個(gè)節(jié)點(diǎn)上機(jī)器的狀態(tài)(是否宕機(jī)、是否斷開(kāi)連接等),從而達(dá)到分布式協(xié)同的操作。
如下是ZK管理功能的一個(gè)簡(jiǎn)要說(shuō)明。

四種節(jié)點(diǎn)
提到ZK,就不得不提一下ZK的四種基本節(jié)點(diǎn),他們分別是:
- 持久化節(jié)點(diǎn)(PERSISTENT):該節(jié)點(diǎn)持久存在,不會(huì)因?yàn)榭蛻舳藬嚅_(kāi)連接而刪除。
- 持久化順序節(jié)點(diǎn)(PERSISTENT_SEQUENTIAL):該節(jié)點(diǎn)會(huì)按照一定順序持久存在,亦不會(huì)因?yàn)榭蛻舳藬嚅_(kāi)連接而刪除。
- 臨時(shí)節(jié)點(diǎn)(EPHEMERAL):客戶端斷開(kāi)連接后,該節(jié)點(diǎn)會(huì)被刪除。
- 臨時(shí)順序節(jié)點(diǎn)(EPHEMERAL_SEQUENTIAL):客戶端斷開(kāi)連接后該節(jié)點(diǎn)會(huì)被刪除;會(huì)依照一定順序進(jìn)行排列。
這四種節(jié)點(diǎn)組成了最基本的ZK的功能。
事件監(jiān)聽(tīng)
除了四種節(jié)點(diǎn)以外,不得不提一下ZK本身實(shí)現(xiàn)的Watcher(事件監(jiān)聽(tīng)器),其是 ZooKeeper 中的一個(gè)很重要的特性。
ZooKeeper 允許用戶在指定節(jié)點(diǎn)上注冊(cè)一些 Watcher,并且在一些特定事件觸發(fā)的時(shí)候,ZooKeeper 服務(wù)端會(huì)將事件通知到感興趣的客戶端上去,該機(jī)制是 ZooKeeper 實(shí)現(xiàn)分布式協(xié)調(diào)服務(wù)的重要特性。
同時(shí),該機(jī)制也是分布式鎖實(shí)現(xiàn)的重要依賴(lài)特性之一。
原理淺析
加鎖原理:
ZK實(shí)現(xiàn)分布式鎖主要依賴(lài)于上述的兩個(gè)機(jī)制:
1、臨時(shí)順序節(jié)點(diǎn)。
2、事件監(jiān)聽(tīng)。
首先,每個(gè)程序需要加鎖的時(shí)候,會(huì)需要一個(gè)相應(yīng)的加鎖路徑(這里我們假設(shè)為“/curatorLock”),在ZK中根據(jù)這個(gè)加鎖路徑去生成一個(gè)新的臨時(shí)節(jié)點(diǎn)node1。
假設(shè)當(dāng)前新生成的臨時(shí)節(jié)點(diǎn)a,為第一個(gè)臨時(shí)節(jié)點(diǎn)。節(jié)點(diǎn)node1做為第一個(gè)申請(qǐng)鎖的程序,自然是有權(quán)利進(jìn)行上鎖的,那么自然就是加鎖成功了。

但是如果當(dāng)前節(jié)點(diǎn)node1前面已經(jīng)有了別的節(jié)點(diǎn)加了鎖。那么這個(gè)時(shí)候顯然我們是不能獲取鎖的,因此只能采用事件監(jiān)聽(tīng)的機(jī)制,對(duì)前一個(gè)節(jié)點(diǎn)進(jìn)行監(jiān)聽(tīng),直到前一個(gè)節(jié)點(diǎn)釋放了鎖。

三個(gè)乃至更多個(gè)節(jié)點(diǎn)的情況則相似。整個(gè)加鎖的邏輯并不復(fù)雜。
解鎖原理:
解鎖的主要操作跟加鎖相反,首先需要將當(dāng)前監(jiān)聽(tīng)自己的監(jiān)聽(tīng)器都刪除,從而告訴別的機(jī)器,“我用完鎖啦~”。以便其余機(jī)器重新獲取,或者重新設(shè)置監(jiān)聽(tīng)對(duì)象和監(jiān)聽(tīng)狀態(tài)。

緊接著,獲取著鎖的節(jié)點(diǎn)(node0)會(huì)將自己進(jìn)行刪除,從而使得別的節(jié)點(diǎn)可以成為首節(jié)點(diǎn),并進(jìn)行加鎖的操作。

由此一來(lái),整個(gè)解鎖的過(guò)程就實(shí)現(xiàn)了。

Zookeeper分布式鎖實(shí)戰(zhàn)
代碼實(shí)現(xiàn)
這里我們借助CuratorFramework框架以及框架自帶的InterProcessMutex互斥鎖實(shí)現(xiàn)相應(yīng)的邏輯。
@Component
@Slf4j
public class ZkClientUtil {
//zk連接ip
private final String zkServers = "你的zk服務(wù)器Ip";
private CuratorFramework curatorFramework;
// zk自增存儲(chǔ)node
private String lockPath = "/curatorLock";
InterProcessMutex lock;
@PostConstruct
public void initZKClient(){
//如果等待時(shí)間 小于最大自旋時(shí)間則進(jìn)行自旋
LOGGER.info(">>>>Zk連接中....");
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(zkServers) //zk 服務(wù)地址
.sessionTimeoutMs(5000) // 會(huì)話超時(shí)時(shí)間
.connectionTimeoutMs(5000) // 連接超時(shí)時(shí)間
.retryPolicy(retryPolicy)
.build();
curatorFramework.start();
lock = new InterProcessMutex(curatorFramework, lockPath);
LOGGER.info(">>>>Zk連接成功!");
}
/**
* 獲取對(duì)應(yīng)的節(jié)點(diǎn)鎖
*/
@SneakyThrows
public void getLock(){
//設(shè)置超時(shí)時(shí)間
boolean acquire = lock.acquire(50, TimeUnit.SECONDS);
if (acquire){
LOGGER.info("ZK加鎖成功:"+Thread.currentThread().getId());
}else {
LOGGER.info("ZK加鎖失?。?+Thread.currentThread().getId());
}
}
/**
* 對(duì)應(yīng)節(jié)點(diǎn)進(jìn)行解鎖
*/
@SneakyThrows
public void unlock(){
lock.release();
LOGGER.info("ZK解鎖成功"+Thread.currentThread().getId());
}
}
然后只需要對(duì)咱們?cè)鹊纳弦黄诖a做一點(diǎn)小小的改動(dòng)~
@SneakyThrows
public synchronized Boolean deductProduct(ProductPO productPO){
CompletableFuture<Exception> subThread = CompletableFuture.supplyAsync(()->{
try{
zkClientUtil.getLock(); // 替換關(guān)鍵的加鎖代碼
....
}finally {
zkClientUtil.unlock(); // 替換關(guān)鍵的解鎖代碼
}
});
Exception exception = subThread.get();
if (exception !=null){
throw exception;
}
return true;
}
然后自豪的運(yùn)行代碼,就得到運(yùn)行的結(jié)果如下:


可以看到結(jié)果確實(shí)是符合預(yù)期~

源碼淺析
然而作為全宇宙最靚的崽,光學(xué)會(huì)用怎么能滿足我呢,大家肯定也都好奇curatorFramework底層原理是咋實(shí)現(xiàn)的吧~
首先我們看看加鎖部分,關(guān)鍵代碼主要是acquire部分:
public boolean acquire(long time, TimeUnit unit) throws Exception {
return this.internalLock(time, unit);
}
acquire部分代碼緊接著深入到internalLock方法中查看具體的邏輯。
private boolean internalLock(long time, TimeUnit unit) throws Exception {
Thread currentThread = Thread.currentThread();
// 從記錄表中嘗試獲取線程的鎖數(shù)據(jù)
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
if (lockData != null) {
// 數(shù)據(jù)不為空,實(shí)現(xiàn)重入,計(jì)數(shù)+1且返回加鎖成功
lockData.lockCount.incrementAndGet();
return true;
} else {
// 數(shù)據(jù)為空,進(jìn)行加鎖操作 (關(guān)鍵代碼,深入查看)
String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
if (lockPath != null) {
//將鎖的記錄保存到ThreadData中方便存儲(chǔ)
InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
this.threadData.put(currentThread, newLockData);
return true;
} else {
return false;
}
}
}
再追入嘗試加鎖的模塊代碼中,其中最關(guān)鍵的代碼是createTheLock方法和internalLockLoop方法。
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception{
final long startMillis = System.currentTimeMillis(); // 獲取當(dāng)前的系統(tǒng)時(shí)間
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; // 單位轉(zhuǎn)換相同
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone ){
isDone = true;
try{
/*關(guān)鍵方法>>>>> 根據(jù)path創(chuàng)建臨時(shí)順序節(jié)點(diǎn)并獲取到節(jié)點(diǎn)相應(yīng)路徑*/
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
/*關(guān)鍵方法>>>>> 這里根據(jù)對(duì)應(yīng)的鎖的子節(jié)點(diǎn),去判斷對(duì)應(yīng)要監(jiān)視的對(duì)象*/
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}catch ( KeeperException.NoNodeException e ){
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ){
//如果重試策略允許重試,則進(jìn)行重試。
isDone = false;
}else{
throw e;
}
}
}
if ( hasTheLock ){
//如果持有鎖了,則返回加鎖加點(diǎn)的路徑
return ourPath;
}
return null;
}
createTheLock方法,會(huì)創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn),以供后續(xù)的加鎖使用。
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception{
String ourPath;
if ( lockNodeBytes != null ) {
ourPath = client
.create()
.creatingParentContainersIfNeeded()
.withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(path, lockNodeBytes);
}else{
ourPath = client
.create()
.creatingParentContainersIfNeeded()
.withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(path);
}
return ourPath;
}
internalLockLoop方法,會(huì)首先根據(jù)當(dāng)前鎖的路徑獲取對(duì)應(yīng)子節(jié)點(diǎn)(即已經(jīng)上鎖的節(jié)點(diǎn)),緊接著會(huì)根據(jù)一個(gè)關(guān)鍵變量maxLeases(默認(rèn)為1,大概率可以通過(guò)修改maxLeases來(lái)控制一把鎖是否可以多人同時(shí)獲取),來(lái)判斷當(dāng)前的節(jié)點(diǎn)能否獲取分布式鎖。
如果這個(gè)時(shí)候,子節(jié)點(diǎn)數(shù)組的長(zhǎng)度超過(guò)了maxLeases,那么我當(dāng)前節(jié)點(diǎn)沒(méi)法獲取到鎖,也就需要對(duì)數(shù)組長(zhǎng)度length-maxLeases的節(jié)點(diǎn)進(jìn)行監(jiān)聽(tīng),以期待獲取相應(yīng)的鎖。同時(shí),該組件還對(duì)超時(shí)的情況做了特殊的處理,以避免死鎖或不斷等待的情況出現(xiàn)。
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception{
boolean haveTheLock = false;
boolean doDelete = false;
try{
if ( revocable.get() != null ){
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ){
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() ){
haveTheLock = true;
} else{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this){
try{
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null ){
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 ){
doDelete = true; // 監(jiān)聽(tīng)超時(shí)了,節(jié)點(diǎn)會(huì)自動(dòng)釋放,避免死鎖
break;
}
wait(millisToWait);
}
else{
wait();
}
}
catch ( KeeperException.NoNodeException e ){
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}catch ( Exception e ){
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}finally{
if ( doDelete ){ //超時(shí)or報(bào)錯(cuò)了,會(huì)將節(jié)點(diǎn)刪除
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
由此一來(lái),整個(gè)加鎖的邏輯就比較清晰了。
解鎖:
解鎖部分的代碼基本類(lèi)似。源代碼如下:
public void release() throws Exception{
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null ){
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 ){
return;
}
if ( newLockCount < 0 ){
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try{
internals.releaseLock(lockData.lockPath);
}finally{
threadData.remove(currentThread);
}
}
首先是會(huì)根據(jù)當(dāng)前線程從記錄表中去獲取其對(duì)應(yīng)的鎖信息,如果鎖信息不存在,拋出異常。
如果鎖信息存在,首先判斷其是否重入了,如果是重入鎖,則計(jì)數(shù)-1。
否則的話,執(zhí)行釋放鎖的操作,這里就是先刪除節(jié)點(diǎn)下對(duì)應(yīng)的所有觀察者,然后將臨時(shí)節(jié)點(diǎn)刪除點(diǎn),完成鎖的釋放。
final void releaseLock(String lockPath) throws Exception
{
client.removeWatchers(); // 移除觀察者
revocable.set(null);
deleteOurPath(lockPath); // 刪除對(duì)應(yīng)路徑的鎖
}
由此,整個(gè)加鎖解鎖的流程就全部解析完啦~
優(yōu)劣性分析
優(yōu)點(diǎn):
- ZK現(xiàn)成的框架支持相對(duì)完善,使用起來(lái)較為方便,而且支持了超時(shí)刪除鎖的機(jī)制,避免了可能出現(xiàn)的死鎖。
- curatorFramework本質(zhì)是一種按照創(chuàng)建順序排隊(duì)的實(shí)現(xiàn)。這種方案效率高,避免了“驚群”效應(yīng),當(dāng)鎖釋放時(shí)只有一個(gè)客戶端會(huì)被喚醒。
- ZK天生設(shè)計(jì)就是分布式協(xié)調(diào),強(qiáng)一致性。鎖的模型健壯、簡(jiǎn)單易用、適合做分布式鎖。
- ZK實(shí)現(xiàn)分布式鎖時(shí),如果節(jié)點(diǎn)獲取不到鎖,只需添加監(jiān)聽(tīng)器即可,不用一直輪詢(xún),性能消耗較小。
缺點(diǎn):
- ZK為了保持高一致性,會(huì)導(dǎo)致在集群leader掛掉的情況下,重新選舉的算法相對(duì)耗時(shí)較久,因此可能導(dǎo)致在較長(zhǎng)的一段時(shí)間內(nèi),加鎖、解鎖的邏輯是不可用的。
- 如果有較多的客戶端頻繁的申請(qǐng)加鎖、釋放鎖,對(duì)于zk集群壓力較大。
參考文獻(xiàn)
分布式鎖之Zk(zookeeper)實(shí)現(xiàn)
你還在使用復(fù)雜的 zkclient 開(kāi)發(fā) zookeeper 么?是時(shí)候用 Curator 了 !