基于zookeeper的分布式鎖實(shí)現(xiàn)

工作中需要寫一個定時任務(wù),由于是集群環(huán)境,自然而然想到需要通過分布式鎖來保證單臺執(zhí)行..相信大家都會想到使用zk來實(shí)現(xiàn)對應(yīng)的分布式鎖.下面就簡單介紹一下幾種實(shí)現(xiàn)

準(zhǔn)備工作

有幾個幫助類,先把代碼放上來

ZKClient 對zk的操作做了一個簡單的封裝

Java代碼

packagezk.lock;

importorg.apache.zookeeper.*;

importorg.apache.zookeeper.data.Stat;

importzk.util.ZKUtil;

importjava.util.concurrent.CountDownLatch;

importjava.util.concurrent.TimeUnit;

/**

*?User:?zhenghui

*?Date:?14-3-26

*?Time:?下午8:50

*?封裝一個zookeeper實(shí)例.

*/

publicclassZKClientimplementsWatcher?{

privateZooKeeper?zookeeper;

privateCountDownLatch?connectedSemaphore?=newCountDownLatch(1);

publicZKClient(String?connectString,intsessionTimeout)throwsException?{

zookeeper?=newZooKeeper(connectString,?sessionTimeout,this);

System.out.println("connecting?zk?server");

if(connectedSemaphore.await(10l,?TimeUnit.SECONDS))?{

System.out.println("connect?zk?server?success");

}else{

System.out.println("connect?zk?server?error.");

thrownewException("connect?zk?server?error.");

}

}

publicvoidclose()throwsInterruptedException?{

if(zookeeper?!=null)?{

zookeeper.close();

}

}

publicvoidcreatePathIfAbsent(String?path,booleanisPersistent)throwsException?{

CreateMode?createMode?=?isPersistent???CreateMode.PERSISTENT?:?CreateMode.EPHEMERAL;

path?=?ZKUtil.normalize(path);

if(!this.exists(path))?{

zookeeper.create(path,null,?ZooDefs.Ids.OPEN_ACL_UNSAFE,?createMode);

}

}

publicbooleanexists(String?path)throwsException?{

path?=?ZKUtil.normalize(path);

Stat?stat?=?zookeeper.exists(path,null);

returnstat?!=null;

}

publicString?getData(String?path)throwsException?{

path?=?ZKUtil.normalize(path);

try{

byte[]?data?=?zookeeper.getData(path,null,null);

returnnewString(data);

}catch(KeeperException?e)?{

if(einstanceofKeeperException.NoNodeException)?{

thrownewException("Node?does?not?exist,path?is?["+?e.getPath()?+"].",?e);

}else{

thrownewException(e);

}

}catch(InterruptedException?e)?{

Thread.currentThread().interrupt();

thrownewException(e);

}

}

@Override

publicvoidprocess(WatchedEvent?event)?{

if(event?==null)return;

//?連接狀態(tài)

Watcher.Event.KeeperState?keeperState?=?event.getState();

//?事件類型

Watcher.Event.EventType?eventType?=?event.getType();

//?受影響的path

//????????String?path?=?event.getPath();

if(Watcher.Event.KeeperState.SyncConnected?==?keeperState)?{

//?成功連接上ZK服務(wù)器

if(Watcher.Event.EventType.None?==?eventType)?{

System.out.println("zookeeper?connect?success");

connectedSemaphore.countDown();

}

}

//下面可以做一些重連的工作.

elseif(Watcher.Event.KeeperState.Disconnected?==?keeperState)?{

System.out.println("zookeeper?Disconnected");

}elseif(Watcher.Event.KeeperState.AuthFailed?==?keeperState)?{

System.out.println("zookeeper?AuthFailed");

}elseif(Watcher.Event.KeeperState.Expired?==?keeperState)?{

System.out.println("zookeeper?Expired");

}

}

}

ZKUtil 針對zk路徑的一個工具類

Java代碼

packagezk.util;

/**

*?User:?zhenghui

*?Date:?14-3-26

*?Time:?下午9:56

*/

publicclassZKUtil?{

publicstaticfinalString?SEPARATOR?="/";

/**

*?轉(zhuǎn)換path為zk的標(biāo)準(zhǔn)路徑?以/開頭,最后不帶/

*/

publicstaticString?normalize(String?path)?{

String?temp?=?path;

if(!path.startsWith(SEPARATOR))?{

temp?=?SEPARATOR?+?path;

}

if(path.endsWith(SEPARATOR))?{

temp?=?temp.substring(0,?temp.length()-1);

returnnormalize(temp);

}else{

returntemp;

}

}

/**

*?鏈接兩個path,并轉(zhuǎn)化為zk的標(biāo)準(zhǔn)路徑

*/

publicstaticString?contact(String?path1,String?path2){

if(path2.startsWith(SEPARATOR))?{

path2?=?path2.substring(1);

}

if(path1.endsWith(SEPARATOR))?{

returnnormalize(path1?+?path2);

}else{

returnnormalize(path1?+?SEPARATOR?+?path2);

}

}

/**

*?字符串轉(zhuǎn)化成byte類型

*/

publicstaticbyte[]?toBytes(String?data)?{

if(data?==null||?data.trim().equals(""))returnnull;

returndata.getBytes();

}

}

NetworkUtil 獲取本機(jī)IP的工具方法

Java代碼

packagezk.util;

importjava.net.InetAddress;

importjava.net.NetworkInterface;

importjava.util.Enumeration;

/**

*?User:?zhenghui

*?Date:?14-4-1

*?Time:?下午4:47

*/

publicclassNetworkUtil?{

staticprivatefinalcharCOLON?=':';

/**

*?獲取當(dāng)前機(jī)器ip地址

*?據(jù)說多網(wǎng)卡的時候會有問題.

*/

publicstaticString?getNetworkAddress()?{

Enumeration?netInterfaces;

try{

netInterfaces?=?NetworkInterface.getNetworkInterfaces();

InetAddress?ip;

while(netInterfaces.hasMoreElements())?{

NetworkInterface?ni?=?netInterfaces

.nextElement();

Enumeration?addresses=ni.getInetAddresses();

while(addresses.hasMoreElements()){

ip?=?addresses.nextElement();

if(!ip.isLoopbackAddress()

&&?ip.getHostAddress().indexOf(COLON)?==?-1)?{

returnip.getHostAddress();

}

}

}

return"";

}catch(Exception?e)?{

return"";

}

}

}

--------------------------- 正文開始 ?-----------------------------------

這種實(shí)現(xiàn)非常簡單,具體的流程如下

對應(yīng)的實(shí)現(xiàn)如下

Java代碼

packagezk.lock;

importzk.util.NetworkUtil;

importzk.util.ZKUtil;

/**

*?User:?zhenghui

*?Date:?14-3-26

*?Time:?下午8:37

*?分布式鎖實(shí)現(xiàn).

*

*?這種實(shí)現(xiàn)的原理是,創(chuàng)建某一個任務(wù)的節(jié)點(diǎn),比如?/lock/tasckname?然后獲取對應(yīng)的值,如果是當(dāng)前的Ip,那么獲得鎖,如果不是,則沒獲得

*?.如果該節(jié)點(diǎn)不存在,則創(chuàng)建該節(jié)點(diǎn),并把改節(jié)點(diǎn)的值設(shè)置成當(dāng)前的IP

*/

publicclassDistributedLock01?{

privateZKClient?zkClient;

publicstaticfinalString?LOCK_ROOT?="/lock";

privateString?lockName;

publicDistributedLock01(String?connectString,intsessionTimeout,String?lockName)throwsException?{

//先創(chuàng)建zk鏈接.

this.createConnection(connectString,sessionTimeout);

this.lockName?=?lockName;

}

publicbooleantryLock(){

String?path?=?ZKUtil.contact(LOCK_ROOT,lockName);

String?localIp?=?NetworkUtil.getNetworkAddress();

try{

if(zkClient.exists(path)){

String?ownnerIp?=?zkClient.getData(path);

if(localIp.equals(ownnerIp)){

returntrue;

}

}else{

zkClient.createPathIfAbsent(path,false);

if(zkClient.exists(path)){

String?ownnerIp?=?zkClient.getData(path);

if(localIp.equals(ownnerIp)){

returntrue;

}

}

}

}catch(Exception?e)?{

e.printStackTrace();

}

returnfalse;

}

/**

*?創(chuàng)建zk連接

*

*/

protectedvoidcreateConnection(String?connectString,intsessionTimeout)throwsException?{

if(zkClient?!=null){

releaseConnection();

}

zkClient?=newZKClient(connectString,sessionTimeout);

zkClient.createPathIfAbsent(LOCK_ROOT,true);

}

/**

*?關(guān)閉ZK連接

*/

protectedvoidreleaseConnection()throwsInterruptedException?{

if(zkClient?!=null)?{

zkClient.close();

}

}

}

總結(jié)

網(wǎng)上有很多文章,大家的方法大多數(shù)都是創(chuàng)建一個root根節(jié)點(diǎn),每一個trylock的客戶端都會在root下創(chuàng)建一個?EPHEMERAL_SEQUENTIAL 的子節(jié)點(diǎn),同時設(shè)置root的child 變更watcher(為了避免羊群效應(yīng),可以只添加前一個節(jié)點(diǎn)的變更通知) .如果創(chuàng)建的節(jié)點(diǎn)的序號是最小,則獲取到鎖,否則繼續(xù)等待root的child 變更

核心技術(shù):Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx

1.?????項目核心代碼結(jié)構(gòu)截圖

項目模塊依賴

特別提醒:開發(fā)人員在開發(fā)的時候可以將自己的業(yè)務(wù)REST服務(wù)化或者Dubbo服務(wù)化

2.????項目依賴介紹

2.1?后臺管理系統(tǒng)、Rest服務(wù)系統(tǒng)、Scheculer定時調(diào)度系統(tǒng)依賴如下圖:

2.2?Dubbo獨(dú)立服務(wù)項目依賴如下圖:

3.??項目功能部分截圖:

zookeeper、dubbo服務(wù)啟動

dubbo管控臺

REST服務(wù)平臺

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容