一、curator簡介
- Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊Watcher和NodeExistsException異常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”給Curator予高度評價。
curator-framework:對zookeeper的底層api的一些封裝
curator-client:提供一些客戶端的操作,例如重試策略等
curator-recipes:封裝了一些高級特性,如:Cache事件監(jiān)聽、選舉、分布式鎖、分布式計數(shù)器、分布式Barrier等
- 版本問題。使用curator的版本:2.12.0,對應(yīng)Zookeeper的版本為:3.4.x,如果跨版本會有兼容性問題,很有可能導(dǎo)致節(jié)點操作失敗
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
二、Curator-framework基本api使用
1、創(chuàng)建會話
(1)靜態(tài)工廠方法創(chuàng)建會話
String connectionInfo = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory
.newClient(connectionInfo, 5000, 3000, retryPolicy);
newClient靜態(tài)工廠方法包含四個主要參數(shù):

(2)使用fluent流式創(chuàng)建
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
(3)創(chuàng)建包含命名空間的會話
- 為了實現(xiàn)不同的Zookeeper業(yè)務(wù)之間的隔離,需要為每個業(yè)務(wù)分配一個獨立的命名空間(NameSpace),即指定一個Zookeeper的根路徑(官方術(shù)語:為Zookeeper添加“Chroot”特性)。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
#創(chuàng)建命名空間為namespace的會話(該會話內(nèi)的操作都是基于該目錄進(jìn)行的)
.namespace("namespace")
.build();
2、啟動客戶端
client.start();
3、創(chuàng)建數(shù)據(jù)節(jié)點
- Zookeeper的節(jié)點創(chuàng)建模式:
PERSISTENT:持久化
PERSISTENT_SEQUENTIAL:持久化并且?guī)蛄刑?br> EPHEMERAL:臨時
EPHEMERAL_SEQUENTIAL:臨時并且?guī)蛄刑?/p>
//創(chuàng)建一個節(jié)點,指定創(chuàng)建模式(臨時節(jié)點),附帶初始化內(nèi)容,并且自動遞歸創(chuàng)建父節(jié)點
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("path","init".getBytes());
4、刪除節(jié)點
//刪除一個節(jié)點,并且遞歸刪除其所有的子節(jié)點
client.delete().deletingChildrenIfNeeded().forPath("path");
//刪除一個節(jié)點,強(qiáng)制指定版本進(jìn)行刪除
client.delete().withVersion(10086).forPath("path");
//刪除一個節(jié)點,強(qiáng)制保證刪除,guaranteed()接口是一個保障措施,只要客戶端會話有效,
//那么Curator會在后臺持續(xù)進(jìn)行刪除操作,直到刪除節(jié)點成功。
client.delete().guaranteed().forPath("path");
5、讀取數(shù)據(jù)
//讀取一個節(jié)點的數(shù)據(jù)內(nèi)容,同時獲取到該節(jié)點的stat
讀取一個節(jié)點的數(shù)據(jù)內(nèi)容,同時獲取到該節(jié)點的stat
6、更新節(jié)點數(shù)據(jù)
//更新一個節(jié)點的數(shù)據(jù)內(nèi)容,該接口會返回一個Stat實例
Stat path = client.setData().forPath("path", "data".getBytes());
//更新一個節(jié)點的數(shù)據(jù)內(nèi)容,強(qiáng)制指定版本進(jìn)行更新
client.setData().withVersion(10086).forPath("path","data".getBytes());
7、檢測節(jié)點是否存在
Stat stat = client.checkExists().forPath("path");
8、獲取某個節(jié)點的所有子節(jié)點路徑
//該方法的返回值為List<String>,獲得ZNode的子節(jié)點Path列表。 可以調(diào)用額外的方法(監(jiān)控、后臺處
//理或者獲取狀態(tài)watch, background or get stat) 并在最后調(diào)用forPath()指定要操作的父ZNode
client.getChildren().forPath("path");
9、事物
- CuratorFramework的實例包含inTransaction( )接口方法,調(diào)用此方法開啟一個ZooKeeper事務(wù). 可以復(fù)合create, setData, check, and/or delete 等操作然后調(diào)用commit()作為一個原子操作提交。一個例子如下:
client.inTransaction().check().forPath("path")
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
.and()
.setData().withVersion(10086).forPath("path","data2".getBytes())
.and()
.commit();
10、異步接口
- 上面提到的創(chuàng)建、刪除、更新、讀取等方法都是同步的,Curator提供異步接口,引入了BackgroundCallback接口用于處理異步接口調(diào)用之后服務(wù)端返回的結(jié)果信息。BackgroundCallback接口中一個重要的回調(diào)值為CuratorEvent,里面包含事件類型、響應(yīng)嗎和節(jié)點的詳細(xì)信息。
-
CuratorEventType:
CuratorEventType對應(yīng)的事件類型 -
響應(yīng)碼(#getResultCode()):
狀態(tài)響應(yīng)碼
//異步的創(chuàng)建節(jié)點
Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> {
System.out.println(String.format("eventType:%s,resultCode:%s",
curatorEvent.getType(), curatorEvent.getResultCode()));
}, executor)
.forPath("path");
二、Curator-recipes實現(xiàn)高級特征
1、緩存cache
- Zookeeper原生支持通過注冊Watcher來進(jìn)行事件監(jiān)聽,但是開發(fā)者需要反復(fù)注冊(Watcher只能單次注冊單次使用)。Cache是Curator中對事件監(jiān)聽的包裝,可以看作是對事件監(jiān)聽的本地緩存視圖,能夠自動為開發(fā)者處理反復(fù)注冊監(jiān)聽。Curator提供了三種Watcher(Cache)來監(jiān)聽結(jié)點的變化。
(1)Path Cache
- Path Cache用來監(jiān)控一個ZNode的子節(jié)點. 當(dāng)一個子節(jié)點增加, 更新,刪除時, Path Cache會改變它的狀態(tài), 會包含最新的子節(jié)點, 子節(jié)點的數(shù)據(jù)和狀態(tài),而狀態(tài)的更變將通過PathChildrenCacheListener通知。涉及到的四個類:
PathChildrenCache、PathChildrenCacheEvent、PathChildrenCacheListener、ChildData
//1、構(gòu)造函數(shù)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
//2、想使用cache,必須調(diào)用它的start方法,使用完后調(diào)用close方法??梢栽O(shè)置StartMode來實現(xiàn)啟動的模式,NORMAL:正常初始化。BUILD_INITIAL_CACHE:在調(diào)用start()之前會調(diào)用rebuild()。POST_INITIALIZED_EVENT: 當(dāng)Cache初始化數(shù)據(jù)后發(fā)送一個PathChildrenCacheEvent.Type#INITIALIZED事件
//3、可以增加listener監(jiān)聽緩存的變化。
public void addListener(PathChildrenCacheListener listener)
//4、遍歷所有的子節(jié)點
getCurrentData()
private static final String PATH = "/example/pathCache";
public static void main(String[] args) throws Exception {
//1.創(chuàng)建client并啟動
String connectionInfo = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory
.newClient(connectionInfo, 5000, 3000, retryPolicy);
client.start();
//2.創(chuàng)建path cache并啟動
PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
cache.start();
//3、進(jìn)行事件監(jiān)聽
PathChildrenCacheListener cacheListener = (client1, event) -> {
System.out.println("事件類型:" + event.getType());
//如果new PathChildrenCache(client, PATH, true)中的參數(shù)cacheData值設(shè)置為false,則示例中的
// event.getData().getData()、data.getData()將返回null,cache將不會緩存節(jié)點數(shù)據(jù)。
if (null != event.getData()) {
System.out.println("節(jié)點數(shù)據(jù):" + event.getData().getPath() + " = " + new String(event.getData().getData()));
}
};
cache.getListenable().addListener(cacheListener);
//4、進(jìn)行響應(yīng)的curd操作,看看事件回調(diào)結(jié)果
client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
Thread.sleep(10);
client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
Thread.sleep(10);
client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
Thread.sleep(10);
for (ChildData data : cache.getCurrentData()) {
System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
}
client.delete().forPath("/example/pathCache/test01");
Thread.sleep(10);
client.delete().forPath("/example/pathCache/test02");
Thread.sleep(1000 * 5);
cache.close();
client.close();
System.out.println("OK!");
}
(2)Node Cache
- NodeCache只能監(jiān)聽一個節(jié)點的狀態(tài)變化。
- Node Cache與Path Cache類似,Node Cache只是監(jiān)聽某一個特定的節(jié)點。它涉及到下面的三個類:
NodeCache - Node Cache實現(xiàn)類
NodeCacheListener - 節(jié)點監(jiān)聽器
ChildData - 節(jié)點數(shù)據(jù)
client.create().creatingParentsIfNeeded().forPath(PATH);
final NodeCache cache = new NodeCache(client, PATH);
NodeCacheListener listener = () -> {
ChildData data = cache.getCurrentData();
if (null != data) {
System.out.println("節(jié)點數(shù)據(jù):" + new String(cache.getCurrentData().getData()));
} else {
System.out.println("節(jié)點被刪除!");
}
};
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(PATH, "01".getBytes());
Thread.sleep(100);
client.setData().forPath(PATH, "02".getBytes());
Thread.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
Thread.sleep(1000 * 2);
cache.close();
client.close();
System.out.println("OK!");
(3)Tree Cache
- Tree Cache可以監(jiān)控整個樹上的所有節(jié)點,類似于PathCache和NodeCache的組合,主要涉及到下面四個類:
TreeCache - Tree Cache實現(xiàn)類
TreeCacheListener - 監(jiān)聽器類
TreeCacheEvent - 觸發(fā)的事件類
ChildData - 節(jié)點數(shù)據(jù)
- 注意:TreeCache在初始化(調(diào)用start()方法)的時候會回調(diào)TreeCacheListener實例一個事TreeCacheEvent,而回調(diào)的TreeCacheEvent對象的Type為INITIALIZED,ChildData為null,此時event.getData().getPath()很有可能導(dǎo)致空指針異常,這里應(yīng)該主動處理并避免這種情況。
client.create().creatingParentsIfNeeded().forPath(PATH);
TreeCache cache = new TreeCache(client, PATH);
TreeCacheListener listener = (client1, event) ->
System.out.println("事件類型:" + event.getType() +
" | 路徑:" + (null != event.getData() ? event.getData().getPath() : null));
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(PATH, "01".getBytes());
Thread.sleep(100);
client.setData().forPath(PATH, "02".getBytes());
Thread.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
Thread.sleep(1000 * 2);
cache.close();
client.close();
System.out.println("OK!");
2、leader選舉
- Curator 有兩種leader選舉的recipe,分別是LeaderSelector和LeaderLatch。前者是所有存活的客戶端不間斷的輪流做Leader,大同社會。后者是一旦選舉出Leader,除非有客戶端掛掉重新觸發(fā)選舉,否則不會交出領(lǐng)導(dǎo)權(quán)。
(1)LeaderLatch
public class LeaderLatchTest {
/**
* 1、一旦啟動,LeaderLatch會和其它使用相同latch path的其它LeaderLatch交涉,然后其中一個最終會被選舉為leader,
* 2、可以通過hasLeadership方法查看LeaderLatch實例是否leader:
* 3、 LeaderLatch在請求成為leadership會block(阻塞),一旦不使用LeaderLatch了,必須調(diào)用close方法。
* 如果它是leader,會釋放leadership, 其它的參與者將會選舉一個leader。
* 4、 錯誤處理:LeaderLatch實例可以增加ConnectionStateListener來監(jiān)聽網(wǎng)絡(luò)連接問題。 當(dāng) SUSPENDED 或 LOST 時,
* leader不再認(rèn)為自己還是leader。當(dāng)LOST后連接重連后RECONNECTED,LeaderLatch會刪除先前的ZNode然后重新
* 創(chuàng)建一個。LeaderLatch用戶必須考慮導(dǎo)致leadership丟失的連接問題。
* 強(qiáng)烈推薦你使用ConnectionStateListener
*/
protected static String PATH = "/francis/leader";
private static final int CLIENT_QTY = 10;
public static void main(String[] args) throws Exception {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderLatch> examples = Lists.newArrayList();
String connectionInfo = "127.0.0.1:2181";
try {
//1、先創(chuàng)建10個leaderLatch
for (int i = 0; i < CLIENT_QTY; i++) {
CuratorFramework client
= CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(20000, 3));
clients.add(client);
LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
latch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
// TODO Auto-generated method stub
System.out.println("I am Leader");
}
@Override
public void notLeader() {
// TODO Auto-generated method stub
System.out.println("I am not Leader");
}
});
examples.add(latch);
//2、啟動后,選中一個作為leader
client.start();
latch.start();
}
Thread.sleep(10000);
LeaderLatch currentLeader = null;
//3、通過hasLeadership查看自己是否是leader, 如果是的話返回true。
for (LeaderLatch latch : examples) {
if (latch.hasLeadership()) {
currentLeader = latch;
}
}
System.out.println("current leader is " + currentLeader.getId());
System.out.println("release the leader " + currentLeader.getId());
//4、close釋放當(dāng)前的領(lǐng)導(dǎo)權(quán)。
currentLeader.close();
Thread.sleep(5000);
for (LeaderLatch latch : examples) {
if (latch.hasLeadership()) {
currentLeader = latch;
}
}
System.out.println("current leader is " + currentLeader.getId());
System.out.println("release the leader " + currentLeader.getId());
} finally {
for (LeaderLatch latch : examples) {
if (null != latch.getState() && !latch.getState().equals(LeaderLatch.State.CLOSED))
latch.close();
}
for (CuratorFramework client : clients) {
client.close();
}
}
}
}
(2)LeaderSelector
- LeaderSelector使用的時候主要涉及下面幾個類:
LeaderSelector
LeaderSelectorListener
LeaderSelectorListenerAdapter
CancelLeadershipException
- 類似LeaderLatch,LeaderSelector必須start: leaderSelector.start(); 一旦啟動,當(dāng)實例取得領(lǐng)導(dǎo)權(quán)時你的listener的takeLeadership()方法被調(diào)用。而takeLeadership()方法只有領(lǐng)導(dǎo)權(quán)被釋放時才返回。 當(dāng)你不再使用LeaderSelector實例時,應(yīng)該調(diào)用它的close方法。
public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
private final String name;
private final LeaderSelector leaderSelector;
//使用AtomicInteger來記錄此client獲得領(lǐng)導(dǎo)權(quán)的次數(shù), 它是”fair”, 每個client有平等的機(jī)會獲得領(lǐng)導(dǎo)權(quán)。
private final AtomicInteger leaderCount = new AtomicInteger();
public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
this.name = name;
leaderSelector = new LeaderSelector(client, path, this);
//調(diào)用 leaderSelector.autoRequeue();保證在此實例釋放領(lǐng)導(dǎo)權(quán)之后還可能獲得領(lǐng)導(dǎo)權(quán)。
leaderSelector.autoRequeue();
}
public void start() throws IOException {
leaderSelector.start();
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
/**
* leaderSelector.start(); 一旦啟動,當(dāng)實例取得領(lǐng)導(dǎo)權(quán)時你的listener的takeLeadership()方法被調(diào)用。
* 而takeLeadership()方法只有領(lǐng)導(dǎo)權(quán)被釋放時才返回。
* 你可以在takeLeadership進(jìn)行任務(wù)的分配等等,并且不要返回,
* 如果你想要要此實例一直是leader的話可以加一個死循環(huán)。
* @param client
* @throws Exception
*/
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
final int waitSeconds = (int) (5 * Math.random()) + 1;
System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
} catch (InterruptedException e) {
System.err.println(name + " was interrupted.");
Thread.currentThread().interrupt();
} finally {
System.out.println(name + " relinquishing leadership.\n");
}
}
}
public class LeaderSelectorDemo {
protected static String PATH = "/francis/leader";
private static final int CLIENT_QTY = 10;
public static void main(String[] args) throws Exception {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderSelectorAdapter> examples = Lists.newArrayList();
String connectionInfo = "127.0.0.1:2181";
try {
//1、構(gòu)建10個LeaderSelectorAdapter
for (int i = 0; i < CLIENT_QTY; i++) {
CuratorFramework client
= CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(20000, 3));
clients.add(client);
LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
examples.add(selectorAdapter);
//2、啟動,并開始選舉
client.start();
selectorAdapter.start();
}
System.out.println("Press enter/return to quit\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {
System.out.println("Shutting down...");
for (LeaderSelectorAdapter exampleClient : examples) {
CloseableUtils.closeQuietly(exampleClient);
}
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
}
}
}
(3)小結(jié)
對比可知,LeaderLatch必須調(diào)用close()方法才會釋放領(lǐng)導(dǎo)權(quán),而對于LeaderSelector,通過LeaderSelectorListener可以對領(lǐng)導(dǎo)權(quán)進(jìn)行控制, 在適當(dāng)?shù)臅r候釋放領(lǐng)導(dǎo)權(quán),這樣每個節(jié)點都有可能獲得領(lǐng)導(dǎo)權(quán)。從而,LeaderSelector具有更好的靈活性和可控性,建議有LeaderElection應(yīng)用場景下優(yōu)先使用LeaderSelector。
3、分布式鎖
- 要點:
1.推薦使用ConnectionStateListener監(jiān)控連接的狀態(tài),因為當(dāng)連接LOST時你不再擁有鎖
2.分布式的鎖全局同步, 這意味著任何一個時間點不會有兩個客戶端都擁有相同的鎖。
(1)可重入共享鎖Shared Reentrant Lock
- Shared意味著鎖是全局可見的, 客戶端都可以請求鎖。 Reentrant和JDK的ReentrantLock類似,即可重入, 意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。 它是由類InterProcessMutex來實現(xiàn)。 它的構(gòu)造函數(shù)為:
public InterProcessMutex(CuratorFramework client, String path)
- 通過acquire()獲得鎖,并提供超時機(jī)制;并通過release()釋放鎖
- 請求撤銷當(dāng)前的鎖, 調(diào)用attemptRevoke()方法,注意鎖釋放時RevocationListener將會回調(diào)。
public class FakeLimitedResource {
private final AtomicBoolean inUse = new AtomicBoolean(false);
public void use() throws InterruptedException {
// 真實環(huán)境中我們會在這里訪問/維護(hù)一個共享的資源
//這個例子在使用鎖的情況下不會非法并發(fā)異常IllegalStateException
//但是在無鎖的情況由于sleep了一段時間,很容易拋出異常
if (!inUse.compareAndSet(false, true)) {
throw new IllegalStateException("Needs to be used by one client at a time");
}
try {
Thread.sleep((long) (3 * Math.random()));
} finally {
inUse.set(false);
}
}
}
public class InterProcessMutexDemo {
private InterProcessMutex lock;
private final FakeLimitedResource resource;
private final String clientName;
public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
this.resource = resource;
this.clientName = clientName;
this.lock = new InterProcessMutex(client, lockPath);
}
public void doWork(long time, TimeUnit unit) throws Exception {
//加鎖、其他被阻塞
if (!lock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the lock");
}
try {
System.out.println(clientName + " get the lock");
resource.use(); //access resource exclusively
} finally {
System.out.println(clientName + " releasing the lock");
//解鎖
lock.release(); // always release the lock in a finally block
}
}
private static final int QTY = 5;
private static final int REPETITIONS = QTY * 10;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception {
final FakeLimitedResource resource = new FakeLimitedResource();
ExecutorService service = Executors.newFixedThreadPool(QTY);
String connectionInfo = "127.0.0.1:2181";
try {
for (int i = 0; i < QTY; ++i) {
final int index = i;
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(1000, 3));
try {
client.start();
final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
for (int j = 0; j < REPETITIONS; ++j) {
example.doWork(10, TimeUnit.SECONDS);
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
} finally {
}
}
}
(2)不可重入共享鎖—Shared Lock
- 這個鎖和上面的InterProcessMutex相比,就是少了Reentrant的功能,也就意味著它不能在同一個線程中重入,具體實現(xiàn)類為InterProcessSemaphoreMutex
(3)可重入讀寫鎖—Shared Reentrant Read Write Lock
- 類似JDK的ReentrantReadWriteLock。一個讀寫鎖管理一對相關(guān)的鎖。一個負(fù)責(zé)讀操作,另外一個負(fù)責(zé)寫操作。讀操作在寫鎖沒被使用時可同時由多個進(jìn)程使用,而寫鎖在使用時不允許讀(阻塞)。此鎖是可重入的。一個擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進(jìn)入寫鎖。這也意味著寫鎖可以降級成讀鎖, 比如請求寫鎖 --->請求讀鎖--->釋放讀鎖 ---->釋放寫鎖。從讀鎖升級成寫鎖是不行的??芍厝胱x寫鎖主要由兩個類實現(xiàn):InterProcessReadWriteLock、InterProcessMutex。使用時首先創(chuàng)建一個InterProcessReadWriteLock實例,然后再根據(jù)你的需求得到讀鎖或者寫鎖,讀寫鎖的類型是InterProcessMutex。
(4)信號量—Shared Semaphore - 一個計數(shù)的信號量類似JDK的Semaphore。 JDK中Semaphore維護(hù)的一組許可(permits),而Curator中稱之為租約(Lease)。 有兩種方式可以決定semaphore的最大租約數(shù)。第一種方式是用戶給定path并且指定最大LeaseSize。第二種方式用戶給定path并且使用SharedCountReader類。如果不使用SharedCountReader, 必須保證所有實例在多進(jìn)程中使用相同的(最大)租約數(shù)量,否則有可能出現(xiàn)A進(jìn)程中的實例持有最大租約數(shù)量為10,但是在B進(jìn)程中持有的最大租約數(shù)量為20,此時租約的意義就失效了。
- 這次調(diào)用acquire()會返回一個租約對象。 客戶端必須在finally中close這些租約對象,否則這些租約會丟失掉。 但是, 但是,如果客戶端session由于某種原因比如crash丟掉, 那么這些客戶端持有的租約會自動close, 這樣其它客戶端可以繼續(xù)使用這些租約。
- 注意你可以一次性請求多個租約,如果Semaphore當(dāng)前的租約不夠,則請求線程會被阻塞。 同時還提供了超時的重載方法。
(5)多共享鎖對象 —Multi Shared Lock - Multi Shared Lock是一個鎖的容器。 當(dāng)調(diào)用acquire(), 所有的鎖都會被acquire(),如果請求失敗,所有的鎖都會被release。 同樣調(diào)用release時所有的鎖都被release(失敗被忽略)。
4、分布式計數(shù)器
- 計數(shù)器是用來計數(shù)的, 利用ZooKeeper可以實現(xiàn)一個集群共享的計數(shù)器。 只要使用相同的path就可以得到最新的計數(shù)器值, 這是由ZooKeeper的一致性保證的。Curator有兩個計數(shù)器, 一個是用int來計數(shù)(SharedCount),一個用long來計數(shù)(DistributedAtomicLong)。
(1)分布式int計數(shù)器—SharedCount
- 這個類使用int類型來計數(shù)。 主要涉及三個類。SharedCount代表計數(shù)器, 可以為它增加一個SharedCountListener,當(dāng)計數(shù)器改變時此Listener可以監(jiān)聽到改變的事件,而SharedCountReader可以讀取到最新的值, 包括字面值和帶版本信息的值VersionedValue。
SharedCount
SharedCountReader
SharedCountListener
- 注意計數(shù)器必須start,使用完之后必須調(diào)用close關(guān)閉它。
(2)分布式long計數(shù)器—DistributedAtomicLong - 再看一個Long類型的計數(shù)器。 除了計數(shù)的范圍比SharedCount大了之外, 它首先嘗試使用樂觀鎖的方式設(shè)置計數(shù)器, 如果不成功(比如期間計數(shù)器已經(jīng)被其它client更新了), 它使用InterProcessMutex方式來更新計數(shù)值。此計數(shù)器有一系列的操作:
get(): 獲取當(dāng)前值
increment(): 加一
decrement(): 減一
add(): 增加特定的值
subtract(): 減去特定的值
trySet(): 嘗試設(shè)置計數(shù)值
forceSet(): 強(qiáng)制設(shè)置計數(shù)值
5、分布式隊列
- 基本不會使用zk作為分布式隊列使用
- Curator也提供ZK Recipe的分布式隊列實現(xiàn)。 利用ZK的 PERSISTENTS_EQUENTIAL節(jié)點, 可以保證放入到隊列中的項目是按照順序排隊的。 如果單一的消費(fèi)者從隊列中取數(shù)據(jù), 那么它是先入先出的,這也是隊列的特點。 如果你嚴(yán)格要求順序,你就的使用單一的消費(fèi)者,可以使用Leader選舉只讓Leader作為唯一的消費(fèi)者。
6、分布式屏障—Barrier
- 分布式Barrier是這樣一個類: 它會阻塞所有節(jié)點上的等待進(jìn)程,直到某一個被滿足, 然后所有的節(jié)點繼續(xù)進(jìn)行,先關(guān)實現(xiàn)類DistributedBarrier

