Zookeeper實現(xiàn)分布式鎖
在分布式系統(tǒng)架構中,想必大家對分布式鎖已經(jīng)很熟悉了,實現(xiàn)方式也有很多種,比如基于數(shù)據(jù)庫的分布式鎖、基于redis緩存的分布式鎖以及本文要介紹的Zookeeper實現(xiàn)的分布式鎖。
分布式鎖的基本場景
如果在多線程并行情況下去訪問某一個共享資源,比如說 共享變量,那么勢必會造成線程安全問題。那么我們可以 用很多種方法來解決,比如 synchronized、 比如 Lock 之 類的鎖操作來解決線程安全問題,那么在分布式架構下, 涉及到多個進程訪問某一個共享資源的情況,比如說在電 商平臺中商品庫存問題,在庫存只有 10 個的情況下進來 100 個用戶,如何能夠避免超賣呢?所以這個時候我們需 要一些互斥手段來防止彼此之間的干擾。 然后在分布式情況下,synchronized 或者 Lock 之類的鎖 只能控制單一進程的資源訪問,在多進程架構下,這些 api 就沒辦法解決我們的問題了。
-
獨占鎖(排他鎖)
我們可以 利用 zookeeper 節(jié)點的特性來實現(xiàn)獨占鎖,就是同級節(jié)點的唯一性,多個進程往 zookeeper 的指定節(jié)點下創(chuàng)建一個 相同名稱的節(jié)點,只有一個能成功,其余進程創(chuàng)建都是失敗,創(chuàng)建失敗的節(jié)點全部通過 zookeeper 的 watcher 機制來監(jiān)zookeeper 這個子節(jié)點的變化,一旦監(jiān)聽到子節(jié)點的刪除事件,則再次觸發(fā)所有進程去獲取鎖,這種實現(xiàn)方式很簡單,但是會產(chǎn)生“羊群效應”,簡單來說就 是如果存在許多的客戶端在等待獲取鎖,當成功獲取到鎖 的進程釋放該節(jié)點后,所有處于等待狀態(tài)的客戶端都會被 喚醒,這個時候 zookeeper 在短時間內(nèi)發(fā)送大量子節(jié)點變 更事件給所有待獲取鎖的客戶端,然后實際情況是只會有 一個客戶端獲得鎖。如果在集群規(guī)模比較大的情況下,會 對 zookeeper 服務器的性能產(chǎn)生比較的影響 -
共享鎖(公平鎖)
我們可以通過臨時有序節(jié)點來實現(xiàn)分布式鎖,每個客戶端都往指定的節(jié)點下注冊一個臨時有序節(jié)點,越早創(chuàng)建的節(jié)點, 節(jié)點的順序編號就越小,那么我們可以判斷子節(jié)點中最小 的節(jié)點設置為獲得鎖。如果自己的節(jié)點不是所有子節(jié)點中 最小的,意味著還沒有獲得鎖。這個的實現(xiàn)和前面單節(jié)點 實現(xiàn)的差異性在于,每個節(jié)點只需要監(jiān)聽比自己小的節(jié)點, 當比自己小的節(jié)點刪除以后,客戶端會收到 watcher 事件, 此時再次判斷自己的節(jié)點是不是所有子節(jié)點中最小的,如 果是則獲得鎖,否則就不斷重復這個過程,這樣就不會導 致羊群效應,因為每個客戶端只需要監(jiān)控一個節(jié)點。
Curator實現(xiàn)Zookeeper分布式鎖的使用
curator 對于鎖這塊做了一些封裝,例如:
- InterProcessMutex:分布式可重入排它鎖
- InterProcessSemaphoreMutex:分布式排它鎖
- InterProcessReadWriteLock:分布式讀寫鎖
InterProcessMutex實現(xiàn)Demo
private static String CONNECTION_STR = "47.93.37.94:2181,39.105.162.160:2181,39.105.163.224:2181";
public static void main(String[] args) throws Exception {
//Curator連接zkServer
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
connectString(CONNECTION_STR).sessionTimeoutMs(50000000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
curatorFramework.start();
//創(chuàng)建鎖(重入鎖)
final InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/locks"); //lock為持久結點,創(chuàng)建的臨時有序結點為其子節(jié)點
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "->>嘗試競爭鎖開始");
try {
lock.acquire(); //阻塞競爭鎖
System.out.println(Thread.currentThread().getName() + "->>成功獲得了鎖");
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
lock.release(); //釋放鎖
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread-" + i).start();
}
}
結果如圖:

Zookeeper 實現(xiàn) leader 選舉
leader election 是很重要的一個功能, 這個選舉過程是這樣子的:指派一個進程作為組織者,將 任務分發(fā)給各節(jié)點。在任務開始前,哪個節(jié)點都不知道誰是 leader 或者 coordinator。當選舉算法開始執(zhí)行后,每 個節(jié)點最終會得到一個唯一的節(jié)點作為任務 leader。除此 之外,選舉還經(jīng)常會發(fā)生在 leader 意外宕機的情況下,新 的 leader 要被選舉出來。
Curator 有兩種選舉 recipe(Leader Latch 和 Leader Election)
- Leader Latch
參與選舉的所有節(jié)點,會創(chuàng)建一個順序節(jié)點,其中最小的 節(jié)點會設置為 master 節(jié)點, 沒搶到 Leader 的節(jié)點都監(jiān)聽 前一個節(jié)點的刪除事件,在前一個節(jié)點刪除后進行重新?lián)?主,當 master 節(jié)點手動調用 close()方法或者 master 節(jié)點掛了之后,后續(xù)的子節(jié)點會搶占 master。
其中 spark 使用的就是這種方法 - LeaderSelectorh
LeaderSelector 和 Leader Latch 最的差別在于,leader 可以釋放領導權以后,還可以繼續(xù)參與競爭
來看一下例子
public class LeaderSelectorClientA extends LeaderSelectorListenerAdapter implements Closeable {
private String name; //當前的進程
private LeaderSelector leaderSelector; //leader選舉的API
private CountDownLatch countDownLatch = new CountDownLatch(1); //讓進程不釋放lead權限
public LeaderSelectorClientA() {
}
public LeaderSelectorClientA(String name) {
this.name = name;
}
public LeaderSelector getLeaderSelector() {
return leaderSelector;
}
public void setLeaderSelector(LeaderSelector leaderSelector) {
this.leaderSelector = leaderSelector;
}
public void start() {
leaderSelector.start(); //開始競爭leader
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
//如果進入當前的方法,意味著當前的進程獲得了鎖。獲得鎖以后,這個方法會被回調
//這個方法執(zhí)行結束之后,表示釋放leader權限
System.out.println(name + "->現(xiàn)在是leader了");
countDownLatch.await(); //阻塞當前的進程防止leader丟失
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
private static String CONNECTION_STR = "47.93.37.94:2181,39.105.162.160:2181,39.105.163.224:2181";
public static void main(String[] args) throws IOException {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
connectString(CONNECTION_STR).sessionTimeoutMs(50000).//緩存時間
retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
curatorFramework.start();
LeaderSelectorClientA leaderSelectorClient = new LeaderSelectorClientA("ClientA");
LeaderSelector leaderSelector = new LeaderSelector(curatorFramework, "/leader", leaderSelectorClient);
leaderSelectorClient.setLeaderSelector(leaderSelector);
leaderSelectorClient.start(); //開始選舉
System.in.read();//進程不結束
}
}
public class LeaderSelectorClientB extends LeaderSelectorListenerAdapter implements Closeable {
private String name; //表示當前的進程
private LeaderSelector leaderSelector; //leader選舉的API
private CountDownLatch countDownLatch=new CountDownLatch(1);
public LeaderSelectorClientB(){
}
public LeaderSelectorClientB(String name) {
this.name = name;
}
public LeaderSelector getLeaderSelector() {
return leaderSelector;
}
public void setLeaderSelector(LeaderSelector leaderSelector) {
this.leaderSelector = leaderSelector;
}
public void start(){
leaderSelector.start(); //開始競爭leader
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
//如果進入當前的方法,意味著當前的進程獲得了鎖。獲得鎖以后,這個方法會被回調
//這個方法執(zhí)行結束之后,表示釋放鎖
System.out.println(name+"->現(xiàn)在是leader了");
countDownLatch.await(); //阻塞當前的進程防止leader丟失
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
private static String CONNECTION_STR = "47.93.37.94:2181,39.105.162.160:2181,39.105.163.224:2181";
public static void main(String[] args) throws IOException {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
connectString(CONNECTION_STR).sessionTimeoutMs(50000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
curatorFramework.start();
LeaderSelectorClientB leaderSelectorClient=new LeaderSelectorClientB("ClientB");
LeaderSelector leaderSelector=new LeaderSelector(curatorFramework,"/leader",leaderSelectorClient);
leaderSelectorClient.setLeaderSelector(leaderSelector);
leaderSelectorClient.start(); //開始選舉
System.in.read();
}
}
public class LeaderSelectorClientC extends LeaderSelectorListenerAdapter implements Closeable {
private String name; //表示當前的進程
private LeaderSelector leaderSelector; //leader選舉的API
private CountDownLatch countDownLatch=new CountDownLatch(1);
public LeaderSelectorClientC(){
}
public LeaderSelectorClientC(String name) {
this.name = name;
}
public LeaderSelector getLeaderSelector() {
return leaderSelector;
}
public void setLeaderSelector(LeaderSelector leaderSelector) {
this.leaderSelector = leaderSelector;
}
public void start(){
leaderSelector.start(); //開始競爭leader
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
//如果進入當前的方法,意味著當前的進程獲得了鎖。獲得鎖以后,這個方法會被回調
//這個方法執(zhí)行結束之后,表示釋放鎖
System.out.println(name+"->現(xiàn)在是leader了");
countDownLatch.await(); //阻塞當前的進程防止leader丟失
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
private static String CONNECTION_STR = "47.93.37.94:2181,39.105.162.160:2181,39.105.163.224:2181";
public static void main(String[] args) throws IOException {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
connectString(CONNECTION_STR).sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
curatorFramework.start();
LeaderSelectorClientC leaderSelectorClient=new LeaderSelectorClientC("ClientC");
LeaderSelector leaderSelector=new LeaderSelector(curatorFramework,"/leader",leaderSelectorClient);
leaderSelectorClient.setLeaderSelector(leaderSelector);
leaderSelectorClient.start(); //開始選舉
System.in.read();//
}
}
三個主函數(shù)依次啟動后,最先啟動的會成為leader,leader掛掉后會按照啟動時間的順序選擇新的leader
Zookeeper 數(shù)據(jù)的同步流程
在 zookeeper 中,客戶端會隨機連接到 zookeeper 集群中 的一個節(jié)點,如果是讀請求,就直接從當前節(jié)點中讀取數(shù) 據(jù),如果是寫請求,那么請求會被轉發(fā)給 leader 提交事務,然后只要有超過半數(shù)節(jié)點寫入成功,leader 就會廣播事務,, 那么寫請求就會被提交;有三個問題需要考慮下
- 集群中的 leader 節(jié)點如何選舉出來?
- leader 節(jié)點崩潰以后,整個集群無法處理寫請求,如何快速從其他節(jié)點里面選舉出新的 leader 呢?
- leader 節(jié)點和各個 follower 節(jié)點的數(shù)據(jù)一致性如何保證
ZAB 協(xié)議
ZAB(Zookeeper Atomic Broadcast) 協(xié)議是為分布式協(xié)調服務 ZooKeeper 專門設計的一種支持崩潰恢復的原子廣播協(xié)議。在ZooKeeper 中,主要依賴 ZAB 協(xié)議來實現(xiàn) 分布式數(shù)據(jù)一致性,基于該協(xié)議,ZooKeeper 實現(xiàn)了一種主備模式的系統(tǒng)架構來保持集群中各個副本之間的數(shù)據(jù)一致性
簡介
ZAB 協(xié)議包含兩種基本模式,分別是
- 崩潰恢復
- 原子廣播
當整個集群在啟動時,或者當 leader 節(jié)點出現(xiàn)網(wǎng)絡中斷、 崩潰等情況時,ZAB 協(xié)議就會進入模式并選舉產(chǎn)生新 的 Leader,當 leader 服務器選舉出來后,并且集群中有過 半的機器和該 leader 節(jié)點完成數(shù)據(jù)同步后(同步指的是數(shù) 據(jù)同步,用來保證集群中過半的機器能夠和 leader 服務器 的數(shù)據(jù)狀態(tài)保持一致),ZAB 協(xié)議就會退出恢復模式。
當集群中已經(jīng)有過半的 Follower 節(jié)點完成了和 Leader 狀 態(tài)同步以后,那么整個集群就進入了消息廣播模式。這個 時候,在 Leader 節(jié)點正常工作時,啟動一臺新的服務器加 入到集群,那這個服務器會直接進入數(shù)據(jù)恢復模式,和leader 節(jié)點進行數(shù)據(jù)同步。同步完成后即可正常對外提供 非事務請求的處理;##leader 節(jié)點可以處理事務請求和非事務請 求,follower 節(jié)點只能處理非事務請求,如果 follower 節(jié) 點接收到非事務請求,會把這個請求轉發(fā)給 Leader 服務器##
消息廣播的實現(xiàn)原理
消息廣播的過程實際上是一個 簡化版本的二階段提交過程:
- leader 接收到消息請求后,將消息賦予一個全局唯一的64 位自增 id,叫zxid,通過 zxid 的大小比較既可以實現(xiàn)因果有序這個特征
zxid中高32位標識epoch ,低32位表示遞增編號,epoch類似于一個國家的國號,在古代每一個朝代變更,國號都會變更,例如唐、宋、元、明、清,同樣每次發(fā)生選舉epoch都會發(fā)生變更,可以有效防止前一個leader的數(shù)據(jù)提交影響影響到新的選舉,其實就是樂觀鎖的思想。
- leader 接收到消息請求后,將消息賦予一個全局唯一的64 位自增 id,叫zxid,通過 zxid 的大小比較既可以實現(xiàn)因果有序這個特征
- leader 為每個 follower 準備了一個 FIFO 隊列(通過 TCP協(xié)議來實現(xiàn),以實現(xiàn)了全局有序這一個特點)將帶有 zxid的消息作為一個提案(proposal)分發(fā)給所有的 follower
- 當 follower 接收到 proposal,先把 proposal 寫到磁盤,寫入成功以后再向 leader 回復一個 ack
- 當 leader 接收到合法數(shù)量(超過半數(shù)節(jié)點)的 ACK 后,leader 就會向這些 follower 發(fā)送 commit 命令,同時會在本地執(zhí)行該消息
- 當 follower 收到消息的 commit 命令以后,會提交該消息
這里需要注意的是:
leader 的投票過程,不需要Observer的ack,也就是Observer不需要參與投票過程,但是 Observer 必須要同 步 Leader 的數(shù)據(jù)從而在處理請求的時候保證數(shù)據(jù)的一致性**
崩潰恢復的實現(xiàn)原理
前面我們已經(jīng)清楚了 ZAB 協(xié)議中的消息廣播過程,ZAB 協(xié) 議的這個基于原子廣播協(xié)議的消息廣播過程,在正常情況下是沒有任何問題的,但是一旦 Leader 節(jié)點崩潰,或者由 于網(wǎng)絡問題導致 Leader 服務器失去了過半的 Follower 節(jié) 點的聯(lián)系(leader 失去與過半 follower 節(jié)點聯(lián)系,可能是 leader 節(jié)點和 follower 節(jié)點之間產(chǎn)生了網(wǎng)絡分區(qū),那么此時的 leader 不再是合法的 leader 了),那么就會進入到崩 潰恢復模式。崩潰恢復狀態(tài)下 zab 協(xié)議需要做兩件事
- 選舉出新的 leader
- 數(shù)據(jù)同步
消息廣播時,知道 ZAB 協(xié)議的消息廣播機制是 簡化版本的 2PC 協(xié)議,這種協(xié)議只需要集群中過半的節(jié)點響應提交即可。但是它無法處理 Leader 服務器崩潰帶來的 數(shù)據(jù)不一致問題。因此在 ZAB 協(xié)議中添加了一個“崩潰恢 復模式”來解決這個問題。那么 ZAB 協(xié)議中的崩潰恢復需要保證,如果一個事務 Proposal 在一臺機器上被處理成功,那么這個事務應該在 所有機器上都被處理成功,哪怕是出現(xiàn)故障。為了達到這 個目的,我們先來設想一下,在 zookeeper 中會有哪些場 景導致數(shù)據(jù)不一致性,以及針對這個場景,zab 協(xié)議中的 崩潰恢復應該怎么處理。
- 數(shù)據(jù)同步
已經(jīng)被處理的消息不能丟
當 leader 收到合法數(shù)量 follower 的 ACKs 后,就向各 個 follower 廣播 COMMIT 命令,同時也會在本地執(zhí)行 COMMIT 并向連接的客戶端返回「成功」。但是如果在各 個 follower 在收到 COMMIT 命令前 leader 就掛了,導 致剩下的服務器并沒有執(zhí)行都這條消息。
Zookeeper的一致性問題
zk實現(xiàn)的是是順序一致性,要保證任何一次讀都能督導最近一次寫入或者修改的值,類似于并發(fā)編程中的voliate;
zookeeper 不保證在每個實例中,兩個不同的客戶端具有相同的 zookeeper 數(shù)據(jù)視圖,由于網(wǎng)絡延遲等因素,一個客戶端可能會在另外一 個客戶端收到更改通知之前執(zhí)行更新, 考慮到2個客戶端A和B的場景,如果A把znode /a的值從0設置為 1,然后告訴客戶端 B 讀取 /a, 則客戶端 B 可能會讀取到舊的值 0,具體取決于他連接到那個服務器,如果客戶端 A 和 B 要讀取必須要讀取到 相同的值,那么 client B 在讀取操作之前執(zhí)行 sync 方法。
除此之外,zookeeper 基于 zxid 以及阻塞隊列的方式來實現(xiàn)請求的順序 一致性。如果一個 client 連接到一個最新的 follower 上,那么它 read 讀 取到了最新的數(shù)據(jù),然后 client 由于網(wǎng)絡原因重新連接到 zookeeper 節(jié) 點,而這個時候連接到一個還沒有完成數(shù)據(jù)同步的 follower 節(jié)點,那么這 一次讀到的數(shù)據(jù)不久是舊的數(shù)據(jù)嗎?實際上 zookeeper 處理了這種情況, client 會記錄自己已經(jīng)讀取到的最大的 zxid,如果 client 重連到 server 發(fā) 現(xiàn) client 的 zxid 比自己大。連接會失敗
Leader選舉的原理
選舉的情況有倆種
- 服務啟動時候的leader選舉
- 運行過程中l(wèi)eader宕機導致的leader選舉
先來弄清楚幾個參數(shù)
- 服務器id(myid)
比如有三臺服務器,編號分別是 1,2,3;編號越大在選舉算法中權重也就越大 - zxid(事務id)
值越大說明數(shù)據(jù)越新,在選舉算法中的權重越大 - 邏輯始終(epoch)
叫投票的次數(shù),同一輪投票過程中的邏輯時鐘值是相同的。每投完一次票這個數(shù)據(jù)就會增加,然后與接收到的其它服務器返回的投票信息中的數(shù)值相比,根據(jù)不同的值做出不同的判斷。 - 選舉狀態(tài)
Looking(競選狀態(tài));FOLLOWING(隨從狀態(tài),同步 leader 狀態(tài),參與投票。);LEADING(領導者狀態(tài));OBSERVING(觀察狀態(tài),同步 leader 狀態(tài),不參與投票。)
服務啟動時leader選舉
每個節(jié)點啟動的時候狀態(tài)都是 LOOKING,處于觀望狀態(tài),接下來就開始進行選舉流程。
若進行 Leader 選舉,則至少需要兩臺機器,這里選取 3 臺機器組成的服 務器集群為例。在集群初始化階段,當有一臺服務器 Server1 啟動時,其 單獨無法進行和完成 Leader 選舉,當?shù)诙_服務器 Server2 啟動時,此 時兩臺機器可以相互通信,每臺機器都試圖找到 Leader,于是進入 Leader 選舉過程。選舉過程如下:
- 每個Ser ver 發(fā)出一個投票。由于是初始情況,Server1 和 Server2 都會將自己作為 Leader 服務器來進行投票,每次投票會包含所推舉的服務器的 myid 和 ZXID、epoch,使用(myid, ZXID,epoch)來表示, 此時 Server1 的投票為(1, 0),Server2 的投票為(2, 0),然后各自將這個投票發(fā)給集群中其他機器
- 接受來自各個服務器的投票。集群的每個服務器收到投票后,首先判斷該投票的有效性,如檢查是否是本輪投票(epoch)、是否來自 LOOKING 狀態(tài)的服務器。
- 處理投票。針對每一個投票,服務器都需要將別人的投票和自己的投 票進行 PK,PK 規(guī)則如下:
- 優(yōu)先比較 epoch
- 2其次檢查 ZXID。ZXID 比較大的服務器優(yōu)先作為 Leader
- 3如果 ZXID 相同,那么就比較 myid。myid 較大的服務器作為Leader 服務器
對于 Server1 而言,它的投票是(1, 0),接收 Server2 的投票為(2, 0), 首先會比較兩者的 ZXID,均為 0,再比較 myid,此時 Server2 的 myid 最大,于是更新自己的投票為(2, 0),然后重新投票,對于 Server2 而言,其無須更新自己的投票,只是再次向集群中所有機器 發(fā)出上一次投票信息即可
- 統(tǒng)計投票。每次投票后,服務器都會統(tǒng)計投票信息,判斷是否已經(jīng)有 過半機器接受到相同的投票信息,對于 Server1、Server2 而言,都統(tǒng)計出集群中已經(jīng)有兩臺機器接受了(2, 0)的投票信息,此時便認為 已經(jīng)選出了 Leader
- 改變服務器狀態(tài)。一旦確定了 Leader,每個服務器就會更新自己的 狀態(tài),如果是 Follower,那么就變更為 FOLLOWING,如果是 Leader, 就變更為 LEADING。
運行過程中的 leader 選舉
當集群中的 leader 服務器出現(xiàn)宕機或者不可用的情況時,那么整個集群 將無法對外提供服務,而是進入新一輪的 Leader 選舉,服務器運行期間 的 Leader 選舉和啟動時期的 Leader 選舉基本過程是一致的。
- 變更狀態(tài)。Leader 掛后,余下的非Observer 服務器都會將自己的服務器狀態(tài)變更為 LOOKING,然后開始進入 Leader 選舉過程。
- 每個Server 會發(fā)出一個投票。在運行期間,每個服務器上的 ZXID 可 能不同,此時假定 Server1 的 ZXID 為 123,Server3的ZXID 為 122; 在第一輪投票中,Server1和 Server3都會投自己,產(chǎn)生投票(1, 123), (3, 122),然后各自將投票發(fā)送給集群中所有機器。接收來自各個服務器的投票。與啟動時過程相同。
- 處理投票。與啟動時過程相同,此時,Server1 將會成為 Leader。
- 統(tǒng)計投票。與啟動時過程相同。
- 改變服務器的狀態(tài)。與啟動時過程相同