工作中需要寫一個定時任務(wù),由于是集群環(huán)境,自然而然想到需要通過分布式鎖來保證單臺執(zhí)行..相信大家都會想到使用zk來實(shí)現(xiàn)對應(yīng)的分布式鎖.下面就簡單介紹一下幾種實(shí)現(xiàn)
準(zhǔn)備工作
有幾個幫助類,先把代碼放上來
ZKClient 對zk的操作做了一個簡單的封裝
Java代碼
packagezk.lock;
importorg.apache.zookeeper.*;
importorg.apache.zookeeper.data.Stat;
importzk.util.ZKUtil;
importjava.util.concurrent.CountDownLatch;
importjava.util.concurrent.TimeUnit;
/**
*?User:?zhenghui
*?Date:?14-3-26
*?Time:?下午8:50
*?封裝一個zookeeper實(shí)例.
*/
publicclassZKClientimplementsWatcher?{
privateZooKeeper?zookeeper;
privateCountDownLatch?connectedSemaphore?=newCountDownLatch(1);
publicZKClient(String?connectString,intsessionTimeout)throwsException?{
zookeeper?=newZooKeeper(connectString,?sessionTimeout,this);
System.out.println("connecting?zk?server");
if(connectedSemaphore.await(10l,?TimeUnit.SECONDS))?{
System.out.println("connect?zk?server?success");
}else{
System.out.println("connect?zk?server?error.");
thrownewException("connect?zk?server?error.");
}
}
publicvoidclose()throwsInterruptedException?{
if(zookeeper?!=null)?{
zookeeper.close();
}
}
publicvoidcreatePathIfAbsent(String?path,booleanisPersistent)throwsException?{
CreateMode?createMode?=?isPersistent???CreateMode.PERSISTENT?:?CreateMode.EPHEMERAL;
path?=?ZKUtil.normalize(path);
if(!this.exists(path))?{
zookeeper.create(path,null,?ZooDefs.Ids.OPEN_ACL_UNSAFE,?createMode);
}
}
publicbooleanexists(String?path)throwsException?{
path?=?ZKUtil.normalize(path);
Stat?stat?=?zookeeper.exists(path,null);
returnstat?!=null;
}
publicString?getData(String?path)throwsException?{
path?=?ZKUtil.normalize(path);
try{
byte[]?data?=?zookeeper.getData(path,null,null);
returnnewString(data);
}catch(KeeperException?e)?{
if(einstanceofKeeperException.NoNodeException)?{
thrownewException("Node?does?not?exist,path?is?["+?e.getPath()?+"].",?e);
}else{
thrownewException(e);
}
}catch(InterruptedException?e)?{
Thread.currentThread().interrupt();
thrownewException(e);
}
}
@Override
publicvoidprocess(WatchedEvent?event)?{
if(event?==null)return;
//?連接狀態(tài)
Watcher.Event.KeeperState?keeperState?=?event.getState();
//?事件類型
Watcher.Event.EventType?eventType?=?event.getType();
//?受影響的path
//????????String?path?=?event.getPath();
if(Watcher.Event.KeeperState.SyncConnected?==?keeperState)?{
//?成功連接上ZK服務(wù)器
if(Watcher.Event.EventType.None?==?eventType)?{
System.out.println("zookeeper?connect?success");
connectedSemaphore.countDown();
}
}
//下面可以做一些重連的工作.
elseif(Watcher.Event.KeeperState.Disconnected?==?keeperState)?{
System.out.println("zookeeper?Disconnected");
}elseif(Watcher.Event.KeeperState.AuthFailed?==?keeperState)?{
System.out.println("zookeeper?AuthFailed");
}elseif(Watcher.Event.KeeperState.Expired?==?keeperState)?{
System.out.println("zookeeper?Expired");
}
}
}
ZKUtil 針對zk路徑的一個工具類
Java代碼
packagezk.util;
/**
*?User:?zhenghui
*?Date:?14-3-26
*?Time:?下午9:56
*/
publicclassZKUtil?{
publicstaticfinalString?SEPARATOR?="/";
/**
*?轉(zhuǎn)換path為zk的標(biāo)準(zhǔn)路徑?以/開頭,最后不帶/
*/
publicstaticString?normalize(String?path)?{
String?temp?=?path;
if(!path.startsWith(SEPARATOR))?{
temp?=?SEPARATOR?+?path;
}
if(path.endsWith(SEPARATOR))?{
temp?=?temp.substring(0,?temp.length()-1);
returnnormalize(temp);
}else{
returntemp;
}
}
/**
*?鏈接兩個path,并轉(zhuǎn)化為zk的標(biāo)準(zhǔn)路徑
*/
publicstaticString?contact(String?path1,String?path2){
if(path2.startsWith(SEPARATOR))?{
path2?=?path2.substring(1);
}
if(path1.endsWith(SEPARATOR))?{
returnnormalize(path1?+?path2);
}else{
returnnormalize(path1?+?SEPARATOR?+?path2);
}
}
/**
*?字符串轉(zhuǎn)化成byte類型
*/
publicstaticbyte[]?toBytes(String?data)?{
if(data?==null||?data.trim().equals(""))returnnull;
returndata.getBytes();
}
}
NetworkUtil 獲取本機(jī)IP的工具方法
Java代碼
packagezk.util;
importjava.net.InetAddress;
importjava.net.NetworkInterface;
importjava.util.Enumeration;
/**
*?User:?zhenghui
*?Date:?14-4-1
*?Time:?下午4:47
*/
publicclassNetworkUtil?{
staticprivatefinalcharCOLON?=':';
/**
*?獲取當(dāng)前機(jī)器ip地址
*?據(jù)說多網(wǎng)卡的時候會有問題.
*/
publicstaticString?getNetworkAddress()?{
Enumeration?netInterfaces;
try{
netInterfaces?=?NetworkInterface.getNetworkInterfaces();
InetAddress?ip;
while(netInterfaces.hasMoreElements())?{
NetworkInterface?ni?=?netInterfaces
.nextElement();
Enumeration?addresses=ni.getInetAddresses();
while(addresses.hasMoreElements()){
ip?=?addresses.nextElement();
if(!ip.isLoopbackAddress()
&&?ip.getHostAddress().indexOf(COLON)?==?-1)?{
returnip.getHostAddress();
}
}
}
return"";
}catch(Exception?e)?{
return"";
}
}
}
--------------------------- 正文開始 ?-----------------------------------
這種實(shí)現(xiàn)非常簡單,具體的流程如下

對應(yīng)的實(shí)現(xiàn)如下
Java代碼
packagezk.lock;
importzk.util.NetworkUtil;
importzk.util.ZKUtil;
/**
*?User:?zhenghui
*?Date:?14-3-26
*?Time:?下午8:37
*?分布式鎖實(shí)現(xiàn).
*
*?這種實(shí)現(xiàn)的原理是,創(chuàng)建某一個任務(wù)的節(jié)點(diǎn),比如?/lock/tasckname?然后獲取對應(yīng)的值,如果是當(dāng)前的Ip,那么獲得鎖,如果不是,則沒獲得
*?.如果該節(jié)點(diǎn)不存在,則創(chuàng)建該節(jié)點(diǎn),并把改節(jié)點(diǎn)的值設(shè)置成當(dāng)前的IP
*/
publicclassDistributedLock01?{
privateZKClient?zkClient;
publicstaticfinalString?LOCK_ROOT?="/lock";
privateString?lockName;
publicDistributedLock01(String?connectString,intsessionTimeout,String?lockName)throwsException?{
//先創(chuàng)建zk鏈接.
this.createConnection(connectString,sessionTimeout);
this.lockName?=?lockName;
}
publicbooleantryLock(){
String?path?=?ZKUtil.contact(LOCK_ROOT,lockName);
String?localIp?=?NetworkUtil.getNetworkAddress();
try{
if(zkClient.exists(path)){
String?ownnerIp?=?zkClient.getData(path);
if(localIp.equals(ownnerIp)){
returntrue;
}
}else{
zkClient.createPathIfAbsent(path,false);
if(zkClient.exists(path)){
String?ownnerIp?=?zkClient.getData(path);
if(localIp.equals(ownnerIp)){
returntrue;
}
}
}
}catch(Exception?e)?{
e.printStackTrace();
}
returnfalse;
}
/**
*?創(chuàng)建zk連接
*
*/
protectedvoidcreateConnection(String?connectString,intsessionTimeout)throwsException?{
if(zkClient?!=null){
releaseConnection();
}
zkClient?=newZKClient(connectString,sessionTimeout);
zkClient.createPathIfAbsent(LOCK_ROOT,true);
}
/**
*?關(guān)閉ZK連接
*/
protectedvoidreleaseConnection()throwsInterruptedException?{
if(zkClient?!=null)?{
zkClient.close();
}
}
}
總結(jié)
網(wǎng)上有很多文章,大家的方法大多數(shù)都是創(chuàng)建一個root根節(jié)點(diǎn),每一個trylock的客戶端都會在root下創(chuàng)建一個?EPHEMERAL_SEQUENTIAL 的子節(jié)點(diǎn),同時設(shè)置root的child 變更watcher(為了避免羊群效應(yīng),可以只添加前一個節(jié)點(diǎn)的變更通知) .如果創(chuàng)建的節(jié)點(diǎn)的序號是最小,則獲取到鎖,否則繼續(xù)等待root的child 變更
核心技術(shù):Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx
1.?????項目核心代碼結(jié)構(gòu)截圖

項目模塊依賴

特別提醒:開發(fā)人員在開發(fā)的時候可以將自己的業(yè)務(wù)REST服務(wù)化或者Dubbo服務(wù)化
2.????項目依賴介紹
2.1?后臺管理系統(tǒng)、Rest服務(wù)系統(tǒng)、Scheculer定時調(diào)度系統(tǒng)依賴如下圖:

2.2?Dubbo獨(dú)立服務(wù)項目依賴如下圖:

3.??項目功能部分截圖:







zookeeper、dubbo服務(wù)啟動


dubbo管控臺







REST服務(wù)平臺



