前幾天分析了一下三種分布式鎖的實現(xiàn),但是沒有利用zookeeper實現(xiàn)一個分布式鎖,因為感覺基于Zookeeper實現(xiàn)分布式鎖還是稍微復(fù)雜的,同時也需要使用Watcher機制,所以就單獨搞一篇Zookeeper實現(xiàn)的分布式鎖。
首先,第一種實現(xiàn)。我們可以利用Zookeeper不能重復(fù)創(chuàng)建一個節(jié)點的特性來實現(xiàn)一個分布式鎖,這看起來和redis實現(xiàn)分布式鎖很像。但是也是有差異的,后面會詳細分析。
主要流程圖如下:

上面的流程很簡單:
- 查看目標(biāo)Node是否已經(jīng)創(chuàng)建,已經(jīng)創(chuàng)建,那么等待鎖。
- 如果未創(chuàng)建,創(chuàng)建一個瞬時Node,表示已經(jīng)占有鎖。
- 如果創(chuàng)建失敗,那么證明鎖已經(jīng)被其他線程占有了,那么同樣等待鎖。
- 當(dāng)釋放鎖,或者當(dāng)前Session超時的時候,節(jié)點被刪除,喚醒之前等待鎖的線程去爭搶鎖。
上面是一個完整的流程,簡單的代碼實現(xiàn)如下:
package com.codertom.params.engine;
import com.google.common.base.Strings;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
/**
* Zookeepr實現(xiàn)分布式鎖
*/
public class LockTest {
private String zkQurom = "localhost:2181";
private String lockNameSpace = "/mylock";
private String nodeString = lockNameSpace + "/test1";
private Lock mainLock;
private ZooKeeper zk;
public LockTest(){
try {
zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("Receive event "+watchedEvent);
if(Event.KeeperState.SyncConnected == watchedEvent.getState())
System.out.println("connection is established...");
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
private void ensureRootPath() throws InterruptedException {
try {
if (zk.exists(lockNameSpace,true)==null){
zk.create(lockNameSpace,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
e.printStackTrace();
}
}
private void watchNode(String nodeString, final Thread thread) throws InterruptedException {
try {
zk.exists(nodeString, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println( "==" + watchedEvent.toString());
if(watchedEvent.getType() == Event.EventType.NodeDeleted){
System.out.println("Threre is a Thread released Lock==============");
thread.interrupt();
}
try {
zk.exists(nodeString,new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println( "==" + watchedEvent.toString());
if(watchedEvent.getType() == Event.EventType.NodeDeleted){
System.out.println("Threre is a Thread released Lock==============");
thread.interrupt();
}
try {
zk.exists(nodeString,true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
} catch (KeeperException e) {
e.printStackTrace();
}
}
/**
* 獲取鎖
* @return
* @throws InterruptedException
*/
public boolean lock() throws InterruptedException {
String path = null;
ensureRootPath();
watchNode(nodeString,Thread.currentThread());
while (true) {
try {
path = zk.create(nodeString, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException e) {
System.out.println(Thread.currentThread().getName() + " getting Lock but can not get");
try {
Thread.sleep(5000);
}catch (InterruptedException ex){
System.out.println("thread is notify");
}
}
if (!Strings.nullToEmpty(path).trim().isEmpty()) {
System.out.println(Thread.currentThread().getName() + " get Lock...");
return true;
}
}
}
/**
* 釋放鎖
*/
public void unlock(){
try {
zk.delete(nodeString,-1);
System.out.println("Thread.currentThread().getName() + release Lock...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public static void main(String args[]) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0;i<4;i++){
service.execute(()-> {
LockTest test = new LockTest();
try {
test.lock();
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
test.unlock();
});
}
service.shutdown();
}
}
代碼比較糙,但是大致的實現(xiàn)思路和上述一致,這里需要注意:
- 因為使用的是原生的Zookeeper API實現(xiàn),Watch需要重復(fù)的設(shè)置,所以代碼復(fù)雜的些。
- 喚醒直接用的Thread.interupt這樣其實控制流程其實是不好的。
其實上面的實現(xiàn)有優(yōu)點也有缺點:
優(yōu)點:
實現(xiàn)比較簡單,有通知機制,能提供較快的響應(yīng),有點類似reentrantlock的思想,對于節(jié)點刪除失敗的場景由Session超時保證節(jié)點能夠刪除掉。
缺點:
重量級,同時在大量鎖的情況下會有“驚群”的問題。
“驚群”就是在一個節(jié)點刪除的時候,大量對這個節(jié)點的刪除動作有訂閱Watcher的線程會進行回調(diào),這對Zk集群是十分不利的。所以需要避免這種現(xiàn)象的發(fā)生。
解決“驚群”:
為了解決“驚群“問題,我們需要放棄訂閱一個節(jié)點的策略,那么怎么做呢?
- 我們將鎖抽象成目錄,多個線程在此目錄下創(chuàng)建瞬時的順序節(jié)點,因為Zk會為我們保證節(jié)點的順序性,所以可以利用節(jié)點的順序進行鎖的判斷。
- 首先創(chuàng)建順序節(jié)點,然后獲取當(dāng)前目錄下最小的節(jié)點,判斷最小節(jié)點是不是當(dāng)前節(jié)點,如果是那么獲取鎖成功,如果不是那么獲取鎖失敗。
- 獲取鎖失敗的節(jié)點獲取當(dāng)前節(jié)點上一個順序節(jié)點,對此節(jié)點注冊監(jiān)聽,當(dāng)節(jié)點刪除的時候通知當(dāng)前節(jié)點。
- 當(dāng)unlock的時候刪除節(jié)點之后會通知下一個節(jié)點。
上面的實現(xiàn)和reentrantlock的公平鎖實現(xiàn)還是比較類似的,下面是簡單的實現(xiàn):
package com.codertom.params.engine;
import com.google.common.base.Strings;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by zhiming on 2017-02-05.
*/
public class FairLockTest {
private String zkQurom = "localhost:2181";
private String lockName = "/mylock";
private String lockZnode = null;
private ZooKeeper zk;
public FairLockTest(){
try {
zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("Receive event "+watchedEvent);
if(Event.KeeperState.SyncConnected == watchedEvent.getState())
System.out.println("connection is established...");
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
private void ensureRootPath(){
try {
if (zk.exists(lockName,true)==null){
zk.create(lockName,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 獲取鎖
* @return
* @throws InterruptedException
*/
public void lock(){
String path = null;
ensureRootPath();
try {
path = zk.create(lockName+"/mylock_", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
lockZnode = path;
List<String> minPath = zk.getChildren(lockName,false);
System.out.println(minPath);
Collections.sort(minPath);
System.out.println(minPath.get(0)+" and path "+path);
if (!Strings.nullToEmpty(path).trim().isEmpty()&&!Strings.nullToEmpty(minPath.get(0)).trim().isEmpty()&&path.equals(lockName+"/"+minPath.get(0))) {
System.out.println(Thread.currentThread().getName() + " get Lock...");
return;
}
String watchNode = null;
for (int i=minPath.size()-1;i>=0;i--){
if(minPath.get(i).compareTo(path.substring(path.lastIndexOf("/") + 1))<0){
watchNode = minPath.get(i);
break;
}
}
if (watchNode!=null){
final String watchNodeTmp = watchNode;
final Thread thread = Thread.currentThread();
Stat stat = zk.exists(lockName + "/" + watchNodeTmp,new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if(watchedEvent.getType() == Event.EventType.NodeDeleted){
thread.interrupt();
}
try {
zk.exists(lockName + "/" + watchNodeTmp,true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
if(stat != null){
System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + lockName + "/" + watchNode);
}
}
try {
Thread.sleep(1000000000);
}catch (InterruptedException ex){
System.out.println(Thread.currentThread().getName() + " notify");
System.out.println(Thread.currentThread().getName() + " get Lock...");
return;
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 釋放鎖
*/
public void unlock(){
try {
System.out.println(Thread.currentThread().getName() + "release Lock...");
zk.delete(lockZnode,-1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public static void main(String args[]) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0;i<4;i++){
service.execute(()-> {
FairLockTest test = new FairLockTest();
try {
test.lock();
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
test.unlock();
});
}
service.shutdown();
}
}
同樣上面的程序也有幾點需要注意:
- Zookeeper的API沒有提供直接的獲取上一個節(jié)點或者最小節(jié)點的API需要我們自己實現(xiàn)。
- 使用了interrupt做線程的喚醒,這樣不科學(xué),因為不想將JVM的lock引進來所以沒有用countdownlatch來做流程控制。
- Watch也是要重新設(shè)置的,這里使用了Watch的復(fù)用,所以代碼簡單些。
其實上面的實現(xiàn)還是很復(fù)雜的,因為你需要反復(fù)的去關(guān)注Watcher,實現(xiàn)一個Demo可以,做一個生產(chǎn)環(huán)境可用的Lock并不容易。因為你的代碼bug在生產(chǎn)環(huán)境上會引起很嚴(yán)重的bug。
其實對于Zookeeper的一些常用功能是有一些成熟的包實現(xiàn)的,像Curator。Curator的確是足夠牛逼,不僅封裝了Zookeeper的常用API,也包裝了很多常用Case的實現(xiàn)。但是它的編程風(fēng)格其實還是吧比較難以接受的。
可以用Curator輕易的實現(xiàn)一個分布式鎖:
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{
try
{
// do some work inside of the critical section here
}
finally
{
lock.release();
}
}
是的就這么簡單,一個直接拿過來可用的輪子。
基于Zookeeper的分布式鎖就說完了。基于Zookeeper實現(xiàn)分布式鎖,其實是不常用的。雖然它實現(xiàn)鎖十分優(yōu)雅,但編程復(fù)雜,同時還要單獨維護一套Zookeeper集群,頻繁的Watch對Zookeeper集群的壓力還是蠻大的,如果不是原有的項目以來Zookeeper,同時鎖的量級比較小的話,還是不用為妙。