1 Zookeeper原生客戶端不足
- 在連接zk超時(shí)的時(shí)候,不支持自動(dòng)重連,需要手動(dòng)重連
- Watch注冊(cè)一次就會(huì)失效,需要反復(fù)注冊(cè)
- 不支持遞歸創(chuàng)建節(jié)點(diǎn)
- Curator能提供更多方案并且實(shí)現(xiàn)簡(jiǎn)答,例如分布式鎖
2 會(huì)話創(chuàng)建
使用
CuratorFrameworkFactory這個(gè)工廠類的兩個(gè)靜態(tài)方法來(lái)創(chuàng)建一個(gè)客戶端,如builder().build()或者newClient(),返回CuratorFramework對(duì)象通過(guò)調(diào)用
CuratorFramework中的start()方法來(lái)啟動(dòng)會(huì)話
CuratorFramework curatorFramework= CuratorFrameworkFactory
.newClient("",new ExponentialBackoffRetry(1000,3));
curatorFramework.start();
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
3 節(jié)點(diǎn)操作
// 創(chuàng)建數(shù)據(jù)節(jié)點(diǎn)/home,內(nèi)容為test
client.create().forPath("/home","test".getBytes());
// 更新數(shù)據(jù)節(jié)點(diǎn)數(shù)據(jù)
client.setData().forPath("/home","test1".getBytes());
// 獲取數(shù)據(jù)節(jié)點(diǎn)信息
client.getData().forPath("/home");
// 獲取數(shù)據(jù)節(jié)點(diǎn)權(quán)限
client.getACL().forPath("/home");
// 更新數(shù)據(jù)節(jié)點(diǎn)權(quán)限client.setACL().withACL(list).forPath("/home");
// 刪除數(shù)據(jù)節(jié)點(diǎn)/home
client.delete().forPath("/home");
4 代碼演示
4.1 添加pom依賴
<!--對(duì)zookeeper的底層api的一些封裝-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<!--封裝了一些高級(jí)特性,如:Cache時(shí)間監(jiān)聽(tīng),選舉,分布式鎖,分布式barrier-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
4.2 代碼分析
- 連接重試策略

- RetryForever:一直重試,直至連接成功,一般不使用
- RetryNTime:指定重連的次數(shù)N
- RetryUtilElapsed:指定最大重連超時(shí)時(shí)間和重連時(shí)間間隔,間歇性重連直到超時(shí)或者連接成功

- ExponentialBackoffRetry:基于“backoff”方式重連,重連的時(shí)間間隔是動(dòng)態(tài)的

- BoundedExponentialBackoffRetry: 同ExponentialBackoffRetry,增加了最大重試次數(shù)的控制
- 默認(rèn)Curator創(chuàng)建節(jié)點(diǎn)是永久節(jié)點(diǎn),如果需要?jiǎng)?chuàng)建臨時(shí)節(jié)點(diǎn)
client.create().withMode(CreateMode.EPHEMERAL).forPath("/aa");
一般執(zhí)行完畢之后,session就會(huì)關(guān)閉了,創(chuàng)建的臨時(shí)節(jié)點(diǎn)就不存在了。
creatingParentsIfNeeded()方法的意思是如果父節(jié)點(diǎn)不存在,則在創(chuàng)建節(jié)點(diǎn)的同時(shí)創(chuàng)建父節(jié)點(diǎn)
client.create().creatingParentsIfNeeded().forPath("/test");
舊版本中 creatingParentContainersIfNeeded()等于creatingParentsIfNeeded();
新版本中 creatingParentContainersIfNeeded()以容器模式遞歸創(chuàng)建節(jié)點(diǎn)
刪除節(jié)點(diǎn)時(shí),如果當(dāng)前節(jié)點(diǎn)還存在子節(jié)點(diǎn),那么刪除時(shí)會(huì)報(bào)錯(cuò),如果要?jiǎng)h除的話,就得加上deletingChildrenIfNeeded()
client.delete().deletingChildrenIfNeeded().forPath("/test");
在連接失敗的情況下,可能刪除失敗,如果確定刪除的,則加上guaranteed(),則在不停重試中,不停去刪除節(jié)點(diǎn)。
使用Stat,則利用storingStatIn
Stat stat=new Stat();
byte[] bytes1=client.getData().storingStatIn(stat).forPath("/aa/bb");
System.out.println(new String(bytes1));
System.out.println(stat.getCversion());
5 分布式鎖
public class DistributedLockTest {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("localhost:2181")
.connectionTimeoutMs(5000)
.sessionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
String path = "/orderNo";
InterProcessMutex mutex = new InterProcessMutex(client, path);
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
client.create().forPath(path, "1000".getBytes());
} else {
client.setData().forPath(path, "1000".getBytes());
}
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
mutex.acquire();
String numStr = new String(client.getData().forPath(path));
int orderNum = Integer.parseInt(numStr);
String currentNum = orderNum + 1 + "";
client.setData().forPath(path, currentNum.getBytes());
System.out.println("currentNum = " + new String(client.getData().forPath(path)));
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
}
源碼分析:
- 初始化
InterProcessMutex


- 調(diào)用
InterProcessMutex的acquire方法

實(shí)際上調(diào)用的是internalLock方法
internalLock --》LockInternals的attemptLock --》createsTheLock和internalLockLoop
StandardLockInternalsDriver的createsTheLock方法:創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}