架構(gòu)師之路(二) 冪等設(shè)計與分布式鎖設(shè)計

本專題所寫所感所得,來自轉(zhuǎn)轉(zhuǎn)首席架構(gòu)師和字節(jié)架構(gòu)團(tuán)隊,此致,敬禮。。

一、冪等設(shè)計

1.1 定義

冪等需要考慮請求層面和業(yè)務(wù)層面的冪等。

  • 請求層面

保證請求重復(fù)執(zhí)行和執(zhí)行一次結(jié)果相同;即f...f(f(x)) = f(x)

  • 業(yè)務(wù)層面

如同一用戶不重復(fù)下單;商品不超買

1.2 目標(biāo)

  • 請求重試不出問題
  • 避免結(jié)果災(zāi)難性(重復(fù)轉(zhuǎn)賬、多交易等)

1.3 冪等范圍

冪等范圍主要是針對請求對數(shù)據(jù)造成改變。以下從兩個維度判斷冪等范圍。

  • 讀/寫請求層面:寫請求
  • 架構(gòu)層層面:數(shù)據(jù)訪問層


二、分布式鎖設(shè)計

業(yè)務(wù)層面的冪等存在并發(fā)消費的可能性,需要轉(zhuǎn)化為串行消費。本質(zhì)上就是分布式鎖的問題。

2.1 定義與目的

分布式鎖是在分布式環(huán)境下,鎖定全局唯一資源,使得請求處理串行化,實現(xiàn)類似于互斥鎖的效果。
分布式鎖的目的是:

  • 防止重復(fù)下單,解決業(yè)務(wù)層冪等問題
  • 解決MQ消息消費冪等性問題,如發(fā)送消息重復(fù)、消息消費端去重等
  • 狀態(tài)的修改行為需要串行處理,避免出現(xiàn)數(shù)據(jù)錯亂

2.2 高可用分布式鎖設(shè)計

2.2.1 目標(biāo)

  • 強一致性
  • 服務(wù)高可用、系統(tǒng)穩(wěn)健
  • 鎖自動續(xù)約及其自動釋放
  • 代碼高度抽象業(yè)務(wù)接入極簡
  • 可視化管理后臺,監(jiān)控及管理

2.2.2 特點

  • 互斥性:和我們本地鎖一樣互斥性是最基本,但是分布式鎖需要保證在不同節(jié)點的不同線程的互斥。
  • 可重入性:同一個節(jié)點上的同一個線程如果獲取了鎖之后那么也可以再次獲取這個鎖。
  • 鎖超時:和本地鎖一樣支持鎖超時,防止死鎖。
  • 高效,高可用:加鎖和解鎖需要高效,同時也需要保證高可用防止分布式鎖失效,可以增加降級。
  • 支持阻塞和非阻塞:和ReentrantLock一樣支持lock和trylock以及tryLock(long timeOut)。
  • 支持公平鎖和非公平鎖(可選):公平鎖的意思是按照請求加鎖的順序獲得鎖,非公平鎖就相反是無序的。這個一般來說實現(xiàn)的比較少。

2.2.3 方案對比

mysql redis zookeeper etcd
一致性算法 paxos raft
CAP cp ap cp cp
高可用 主從 主從 N+1可用(奇數(shù)) N+1可用(奇數(shù))
接口類型 sql 客戶端 客戶端 http/grpc
實現(xiàn) select * from update setnx + lua crateephemeral restful api
  • redis是ap模型,無法保證數(shù)據(jù)一致性
  • zk對鎖實現(xiàn)使用創(chuàng)建臨時節(jié)點和watch機制,執(zhí)行效率、拓展能力、社區(qū)活躍度都不如etcd

2.3 Mysql分布式鎖

首先來說一下Mysql分布式鎖的實現(xiàn)原理,相對來說這個比較容易理解,畢竟數(shù)據(jù)庫和我們開發(fā)人員在平時的開發(fā)中息息相關(guān)。對于分布式鎖我們可以創(chuàng)建一個鎖表:


前面我們所說的lock(),trylock(long timeout),trylock()這幾個方法可以用下面的偽代碼實現(xiàn)。

2.3.1 lock()

lock一般是阻塞式的獲取鎖,意思就是不獲取到鎖誓不罷休,那么我們可以寫一個死循環(huán)來執(zhí)行其操作:

mysqlLock.lcok內(nèi)部是一個sql,為了達(dá)到可重入鎖的效果那么我們應(yīng)該先進(jìn)行查詢,如果有值,那么需要比較node_info是否一致,這里的node_info可以用機器IP和線程名字來表示,如果一致那么就加可重入鎖count的值,如果不一致那么就返回false。如果沒有值那么直接插入一條數(shù)據(jù)。偽代碼如下:

需要注意的是這一段代碼需要加事務(wù),必須要保證這一系列操作的原子性。

2.3.2 tryLock()和tryLock(long timeout)

tryLock()是非阻塞獲取鎖,如果獲取不到那么就會馬上返回,代碼可以如下:

tryLock(long timeout)實現(xiàn)如下:


mysqlLock.lock和上面一樣,但是要注意的是select ... for update這個是阻塞的獲取行鎖,如果同一個資源并發(fā)量較大還是有可能會退化成阻塞的獲取鎖。

2.3.3 unlock()

unlock的話如果這里的count為1那么可以刪除,如果大于1那么需要減去1。


2.3.4 鎖超時

我們有可能會遇到我們的機器節(jié)點掛了,那么這個鎖就不會得到釋放,我們可以啟動一個定時任務(wù),通過計算一般我們處理任務(wù)的一般的時間,比如是5ms,那么我們可以稍微擴大一點,當(dāng)這個鎖超過20ms沒有被釋放我們就可以認(rèn)定是節(jié)點掛了然后將其直接釋放。

2.3.5 Mysql小結(jié)

  • 適用場景: Mysql分布式鎖一般適用于資源不存在數(shù)據(jù)庫,如果數(shù)據(jù)庫存在比如訂單,那么可以直接對這條數(shù)據(jù)加行鎖,不需要我們上面多的繁瑣的步驟,比如一個訂單,那么我們可以用select * from order_table where id = 'xxx' for update進(jìn)行加行鎖,那么其他的事務(wù)就不能對其進(jìn)行修改。
  • 優(yōu)點:理解起來簡單,不需要維護(hù)額外的第三方中間件(比如Redis,Zk)。
  • 缺點:雖然容易理解但是實現(xiàn)起來較為繁瑣,需要自己考慮鎖超時,加事務(wù)等等。性能局限于數(shù)據(jù)庫,一般對比緩存來說性能較低。對于高并發(fā)的場景并不是很適合。

2.3.6 樂觀鎖

前面我們介紹的都是悲觀鎖,這里想額外提一下樂觀鎖,在我們實際項目中也是經(jīng)常實現(xiàn)樂觀鎖,因為我們加行鎖的性能消耗比較大,通常我們會對于一些競爭不是那么激烈,但是其又需要保證我們并發(fā)的順序執(zhí)行使用樂觀鎖進(jìn)行處理,我們可以對我們的表加一個版本號字段,那么我們查詢出來一個版本號之后,update或者delete的時候需要依賴我們查詢出來的版本號,判斷當(dāng)前數(shù)據(jù)庫和查詢出來的版本號是否相等,如果相等那么就可以執(zhí)行,如果不等那么就不能執(zhí)行。這樣的一個策略很像我們的CAS(Compare And Swap),比較并交換是一個原子操作。這樣我們就能避免加select * for update行鎖的開銷。

2.4 基于redis分布式鎖

redis是單線程的,所以能保證線程串行處理,但因為redis分布式鎖是ap模型,不是cp模型,無法實現(xiàn)強一致性。但因?qū)崿F(xiàn)簡單,接入成本低,如果對數(shù)據(jù)一致性要求不那么高,可以選擇此方式。

2.4.1 Redis分布式鎖簡單實現(xiàn)

熟悉Redis的同學(xué)那么肯定對setNx(set if not exist)方法不陌生,如果不存在則更新,其可以很好的用來實現(xiàn)我們的分布式鎖。對于某個資源加鎖我們只需要

setNx resourceName value

這里有個問題,加鎖了之后如果機器宕機那么這個鎖就不會得到釋放所以會加入過期時間,加入過期時間需要和setNx同一個原子操作,在Redis2.8之前我們需要使用Lua腳本達(dá)到我們的目的,但是redis2.8之后redis支持nx和ex操作是同一原子操作。

set resourceName value ex 5 nx

2.4.2 Redission

Javaer都知道Jedis,Jedis是Redis的Java實現(xiàn)的客戶端,其API提供了比較全面的Redis命令的支持。Redission也是Redis的客戶端,相比于Jedis功能簡單。Jedis簡單使用阻塞的I/O和redis交互,Redission通過Netty支持非阻塞I/O。Jedis最新版本2.9.0是2016年的快3年了沒有更新,而Redission最新版本是2018.10月更新。

Redission封裝了鎖的實現(xiàn),其繼承了java.util.concurrent.locks.Lock的接口,讓我們像操作我們的本地Lock一樣去操作Redission的Lock,下面介紹一下其如何實現(xiàn)分布式鎖。

Redission不僅提供了Java自帶的一些方法(lock,tryLock),還提供了異步加鎖,對于異步編程更加方便。 由于內(nèi)部源碼較多,就不貼源碼了,這里用文字?jǐn)⑹鰜矸治鏊侨绾渭渔i的,這里分析一下tryLock方法:

  • 嘗試加鎖:首先會嘗試進(jìn)行加鎖,由于保證操作是原子性,那么就只能使用lua腳本,相關(guān)的lua腳本如下:

    可以看見他并沒有使用我們的sexNx來進(jìn)行操作,而是使用的hash結(jié)構(gòu),我們的每一個需要鎖定的資源都可以看做是一個HashMap,鎖定資源的節(jié)點信息是Key,鎖定次數(shù)是value。通過這種方式可以很好的實現(xiàn)可重入的效果,只需要對value進(jìn)行加1操作,就能進(jìn)行可重入鎖。當(dāng)然這里也可以用之前我們說的本地計數(shù)進(jìn)行優(yōu)化。

  • 如果嘗試加鎖失敗,判斷是否超時,如果超時則返回false。
  • 如果加鎖失敗之后,沒有超時,那么需要在名字為redisson_lock__channel+lockName的channel上進(jìn)行訂閱,用于訂閱解鎖消息,然后一直阻塞直到超時,或者有解鎖消息。
  • 重試步驟以上三步,直到最后獲取到鎖,或者某一步獲取鎖超時。

對于我們的unlock方法比較簡單也是通過lua腳本進(jìn)行解鎖,如果是可重入鎖,只是減1。如果是非加鎖線程解鎖,那么解鎖失敗。


Redission還有公平鎖的實現(xiàn),對于公平鎖其利用了list結(jié)構(gòu)和hashset結(jié)構(gòu)分別用來保存我們排隊的節(jié)點,和我們節(jié)點的過期時間,用這兩個數(shù)據(jù)結(jié)構(gòu)幫助我們實現(xiàn)公平鎖,這里就不展開介紹了,有興趣可以參考源碼。

2.4.3 RedLock

我們想象一個這樣的場景當(dāng)機器A申請到一把鎖之后,如果Redis主宕機了,這個時候從機并沒有同步到這一把鎖,那么機器B再次申請的時候就會再次申請到這把鎖,為了解決這個問題Redis作者提出了RedLock紅鎖的算法,在Redission中也對RedLock進(jìn)行了實現(xiàn)。



通過上面的代碼,我們需要實現(xiàn)多個Redis集群,然后進(jìn)行紅鎖的加鎖,解鎖。具體的步驟如下:

  • 首先生成多個Redis集群的Rlock,并將其構(gòu)造成RedLock。
  • 依次循環(huán)對三個集群進(jìn)行加鎖,加鎖的過程和5.2里面一致。
  • 如果循環(huán)加鎖的過程中加鎖失敗,那么需要判斷加鎖失敗的次數(shù)是否超出了最大值,這里的最大值是根據(jù)集群的個數(shù),比如三個那么只允許失敗一個,五個的話只允許失敗兩個,要保證多數(shù)成功。
  • 加鎖的過程中需要判斷是否加鎖超時,有可能我們設(shè)置加鎖只能用3ms,第一個集群加鎖已經(jīng)消耗了3ms了。那么也算加鎖失敗。
  • 3,4步里面加鎖失敗的話,那么就會進(jìn)行解鎖操作,解鎖會對所有的集群在請求一次解鎖。

可以看見RedLock基本原理是利用多個Redis集群,用多數(shù)的集群加鎖成功,減少Redis某個集群出故障,造成分布式鎖出現(xiàn)問題的概率。

2.4.4 Redis小結(jié)

  • 優(yōu)點:對于Redis實現(xiàn)簡單,性能對比ZK和Mysql較好。如果不需要特別復(fù)雜的要求,那么自己就可以利用setNx進(jìn)行實現(xiàn),如果自己需要復(fù)雜的需求的話那么可以利用或者借鑒Redission。對于一些要求比較嚴(yán)格的場景來說的話可以使用RedLock。
  • 缺點:需要維護(hù)Redis集群,如果要實現(xiàn)RedLock那么需要維護(hù)更多的集群。

2.5 基于ZK分布式鎖

ZooKeeper也是我們常見的實現(xiàn)分布式鎖方法,相比于數(shù)據(jù)庫如果沒了解過ZooKeeper可能上手比較難一些。ZooKeeper是以Paxos算法為基礎(chǔ)分布式應(yīng)用程序協(xié)調(diào)服務(wù)。Zk的數(shù)據(jù)節(jié)點和文件目錄類似,所以我們可以用此特性實現(xiàn)分布式鎖。我們以某個資源為目錄,然后這個目錄下面的節(jié)點就是我們需要獲取鎖的客戶端,未獲取到鎖的客戶端注冊需要注冊Watcher到上一個客戶端,可以用下圖表示。


/lock是我們用于加鎖的目錄,/resource_name是我們鎖定的資源,其下面的節(jié)點按照我們加鎖的順序排列。

2.5.1 Curator

Curator封裝了Zookeeper底層的Api,使我們更加容易方便的對Zookeeper進(jìn)行操作,并且它封裝了分布式鎖的功能,這樣我們就不需要再自己實現(xiàn)了。

Curator實現(xiàn)了可重入鎖(InterProcessMutex),也實現(xiàn)了不可重入鎖(InterProcessSemaphoreMutex)。在可重入鎖中還實現(xiàn)了讀寫鎖。

2.5.2 InterProcessMutex

InterProcessMutex是Curator實現(xiàn)的可重入鎖,我們可以通過下面的一段代碼實現(xiàn)我們的可重入鎖:


我們利用acuire進(jìn)行加鎖,release進(jìn)行解鎖。

加鎖的流程具體如下:

  • 首先進(jìn)行可重入的判定:這里的可重入鎖記錄在ConcurrentMap<Thread, LockData> threadData這個Map里面,如果threadData.get(currentThread)是有值的那么就證明是可重入鎖,然后記錄就會加1。我們之前的Mysql其實也可以通過這種方法去優(yōu)化,可以不需要count字段的值,將這個維護(hù)在本地可以提高性能。
  • 然后在我們的資源目錄下創(chuàng)建一個節(jié)點:比如這里創(chuàng)建一個/0000000002這個節(jié)點,這個節(jié)點需要設(shè)置為EPHEMERAL_SEQUENTIAL也就是臨時節(jié)點并且有序。
  • 獲取當(dāng)前目錄下所有子節(jié)點,判斷自己的節(jié)點是否位于子節(jié)點第一個。
  • 如果是第一個,則獲取到鎖,那么可以返回。
  • 如果不是第一個,則證明前面已經(jīng)有人獲取到鎖了,那么需要獲取自己節(jié)點的前一個節(jié)點。/0000000002的前一個節(jié)點是/0000000001,我們獲取到這個節(jié)點之后,再上面注冊Watcher(這里的watcher其實調(diào)用的是object.notifyAll(),用來解除阻塞)。
  • object.wait(timeout)或object.wait():進(jìn)行阻塞等待這里和我們第5步的watcher相對應(yīng)。

解鎖的具體流程:

  • 首先進(jìn)行可重入鎖的判定:如果有可重入鎖只需要次數(shù)減1即可,減1之后加鎖次數(shù)為0的話繼續(xù)下面步驟,不為0直接返回。
  • 刪除當(dāng)前節(jié)點。
  • 刪除threadDataMap里面的可重入鎖的數(shù)據(jù)。

2.5.3 讀寫鎖

Curator提供了讀寫鎖,其實現(xiàn)類是InterProcessReadWriteLock,這里的每個節(jié)點都會加上前綴:

private static final String READ_LOCK_NAME  = "__READ__";
private static final String WRITE_LOCK_NAME = "__WRIT__";

根據(jù)不同的前綴區(qū)分是讀鎖還是寫鎖,對于讀鎖,如果發(fā)現(xiàn)前面有寫鎖,那么需要將watcher注冊到和自己最近的寫鎖。寫鎖的邏輯和我們之前2.5.2分析的依然保持不變。

2.5.4鎖超時

Zookeeper不需要配置鎖超時,由于我們設(shè)置節(jié)點是臨時節(jié)點,我們的每個機器維護(hù)著一個ZK的session,通過這個session,ZK可以判斷機器是否宕機。如果我們的機器掛掉的話,那么這個臨時節(jié)點對應(yīng)的就會被刪除,所以我們不需要關(guān)心鎖超時。

2.5.5 ZK小結(jié)

  • 優(yōu)點:ZK可以不需要關(guān)心鎖超時時間,實現(xiàn)起來有現(xiàn)成的第三方包,比較方便,并且支持讀寫鎖,ZK獲取鎖會按照加鎖的順序,所以其是公平鎖。對于高可用利用ZK集群進(jìn)行保證。
  • 缺點:ZK需要額外維護(hù),增加維護(hù)成本,性能和Mysql相差不大,依然比較差。并且需要開發(fā)人員了解ZK是什么。

2.6 基于etcd分布式鎖

2.6.1 機制

etcd 支持以下功能,正是依賴這些功能來實現(xiàn)分布式鎖的:

  • Lease 機制:即租約機制(TTL,Time To Live),Etcd 可以為存儲的 KV 對設(shè)置租約,當(dāng)租約到期,KV 將失效刪除;同時也支持續(xù)約,即 KeepAlive。
  • Revision 機制:每個 key 帶有一個 Revision 屬性值,etcd 每進(jìn)行一次事務(wù)對應(yīng)的全局 Revision 值都會加一,因此每個 key 對應(yīng)的 Revision 屬性值都是全局唯一的。通過比較 Revision 的大小就可以知道進(jìn)行寫操作的順序。
  • 在實現(xiàn)分布式鎖時,多個程序同時搶鎖,根據(jù) Revision 值大小依次獲得鎖,可以避免 “羊群效應(yīng)” (也稱 “驚群效應(yīng)”),實現(xiàn)公平鎖。
  • Prefix 機制:即前綴機制,也稱目錄機制??梢愿鶕?jù)前綴(目錄)獲取該目錄下所有的 key 及對應(yīng)的屬性(包括 key, value 以及 revision 等)。
  • Watch 機制:即監(jiān)聽機制,Watch 機制支持 Watch 某個固定的 key,也支持 Watch 一個目錄(前綴機制),當(dāng)被 Watch 的 key 或目錄發(fā)生變化,客戶端將收到通知。

2.6.2 過程

實現(xiàn)過程:

  • 步驟 1: 準(zhǔn)備

客戶端連接 Etcd,以 /lock/mylock 為前綴創(chuàng)建全局唯一的 key,假設(shè)第一個客戶端對應(yīng)的 key="/lock/mylock/UUID1",第二個為 key="/lock/mylock/UUID2";客戶端分別為自己的 key 創(chuàng)建租約 - Lease,租約的長度根據(jù)業(yè)務(wù)耗時確定,假設(shè)為 15s;

  • 步驟 2: 創(chuàng)建定時任務(wù)作為租約的“心跳”

當(dāng)一個客戶端持有鎖期間,其它客戶端只能等待,為了避免等待期間租約失效,客戶端需創(chuàng)建一個定時任務(wù)作為“心跳”進(jìn)行續(xù)約。此外,如果持有鎖期間客戶端崩潰,心跳停止,key 將因租約到期而被刪除,從而鎖釋放,避免死鎖。

  • 步驟 3: 客戶端將自己全局唯一的 key 寫入 Etcd

進(jìn)行 put 操作,將步驟 1 中創(chuàng)建的 key 綁定租約寫入 Etcd,根據(jù) Etcd 的 Revision 機制,假設(shè)兩個客戶端 put 操作返回的 Revision 分別為 1、2,客戶端需記錄 Revision 用以接下來判斷自己是否獲得鎖。

  • 步驟 4: 客戶端判斷是否獲得鎖

客戶端以前綴 /lock/mylock 讀取 keyValue 列表(keyValue 中帶有 key 對應(yīng)的 Revision),判斷自己 key 的 Revision 是否為當(dāng)前列表中最小的,如果是則認(rèn)為獲得鎖;否則監(jiān)聽列表中前一個 Revision 比自己小的 key 的刪除事件,一旦監(jiān)聽到刪除事件或者因租約失效而刪除的事件,則自己獲得鎖。

  • 步驟 5: 執(zhí)行業(yè)務(wù)

獲得鎖后,操作共享資源,執(zhí)行業(yè)務(wù)代碼。

  • 步驟 6: 釋放鎖

完成業(yè)務(wù)流程后,刪除對應(yīng)的key釋放鎖。

2.6.3 實現(xiàn)

自帶的 etcdctl 可以模擬鎖的使用:


// 第一個終端
$ ./etcdctl lock mutex1
mutex1/326963a02758b52d

// 第二終端
$ ./etcdctl lock mutex1

// 當(dāng)?shù)谝粋€終端結(jié)束了,第二個終端會顯示
mutex1/326963a02758b531

在etcd的clientv3包中,實現(xiàn)了分布式鎖。使用起來和mutex是類似的,為了了解其中的工作機制,這里簡要的做一下總結(jié)。

etcd分布式鎖的實現(xiàn)在go.etcd.io/etcd/clientv3/concurrency包中,主要提供了以下幾個方法:

* func NewMutex(s *Session, pfx string) *Mutex, 用來新建一個mutex
* func (m *Mutex) Lock(ctx context.Context) error,它會阻塞直到拿到了鎖,并且支持通過context來取消獲取鎖。
* func (m *Mutex) Unlock(ctx context.Context) error,解鎖

因此在使用etcd提供的分布式鎖式非常簡單,通常就是實例化一個mutex,然后嘗試搶占鎖,之后進(jìn)行業(yè)務(wù)處理,最后解鎖即可。

demo:

package main

import (  
    "context"
    "fmt"
    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/clientv3/concurrency"
    "log"
    "os"
    "os/signal"
    "time"
)

func main() {  
    c := make(chan os.Signal)
    signal.Notify(c)

    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    lockKey := "/lock"

    go func () {
        session, err := concurrency.NewSession(cli)
        if err != nil {
            log.Fatal(err)
        }
        m := concurrency.NewMutex(session, lockKey)
        if err := m.Lock(context.TODO()); err != nil {
            log.Fatal("go1 get mutex failed " + err.Error())
        }
        fmt.Printf("go1 get mutex sucess\n")
        fmt.Println(m)
        time.Sleep(time.Duration(10) * time.Second)
        m.Unlock(context.TODO())
        fmt.Printf("go1 release lock\n")
    }()

    go func() {
        time.Sleep(time.Duration(2) * time.Second)
        session, err := concurrency.NewSession(cli)
        if err != nil {
            log.Fatal(err)
        }
        m := concurrency.NewMutex(session, lockKey)
        if err := m.Lock(context.TODO()); err != nil {
            log.Fatal("go2 get mutex failed " + err.Error())
        }
        fmt.Printf("go2 get mutex sucess\n")
        fmt.Println(m)
        time.Sleep(time.Duration(2) * time.Second)
        m.Unlock(context.TODO())
        fmt.Printf("go2 release lock\n")
    }()

    <-c
}

2.6.4 原理

Lock()函數(shù)的實現(xiàn)很簡單:

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
    s := m.s
    client := m.s.Client()

    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return err
    }
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }

    // wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr
}

首先通過一個事務(wù)來嘗試加鎖,這個事務(wù)主要包含了4個操作: cmp、put、get、getOwner。需要注意的是,key是由pfx和Lease()組成的。

  • cmp: 比較加鎖的key的修訂版本是否是0。如果是0就代表這個鎖不存在。
  • put: 向加鎖的key中存儲一個空值,這個操作就是一個加鎖的操作,但是這把鎖是有超時時間的,超時的時間是session的默認(rèn)時長。超時是為了防止鎖沒有被正常釋放導(dǎo)致死鎖。
  • get: get就是通過key來查詢
  • getOwner: 注意這里是用m.pfx來查詢的,并且?guī)Я瞬樵儏?shù)WithFirstCreate()。使用pfx來查詢是因為其他的session也會用同樣的pfx來嘗試加鎖,并且因為每個LeaseID都不同,所以第一次肯定會put成功。但是只有最早使用這個pfx的session才是持有鎖的,所以這個getOwner的含義就是這樣的。

接下來才是通過判斷來檢查是否持有鎖

m.myRev = resp.Header.Revision
if !resp.Succeeded {
    m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
    m.hdr = resp.Header
    return nil
}

m.myRev是當(dāng)前的版本號,resp.Succeeded是cmp為true時值為true,否則是false。這里的判斷表明當(dāng)同一個session非第一次嘗試加鎖,當(dāng)前的版本號應(yīng)該取這個key的最新的版本號。

下面是取得鎖的持有者的key。如果當(dāng)前沒有人持有這把鎖,那么默認(rèn)當(dāng)前會話獲得了鎖?;蛘哝i持有者的版本號和當(dāng)前的版本號一致, 那么當(dāng)前的會話就是鎖的持有者。

// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
    m.Unlock(client.Ctx())
} else {
    m.hdr = hdr
}

上面這段代碼就很好理解了,因為走到這里說明沒有獲取到鎖,那么這里等待鎖的刪除。

waitDeletes方法的實現(xiàn)也很簡單,但是需要注意的是,這里的getOpts只會獲取比當(dāng)前會話版本號更低的key,然后去監(jiān)控最新的key的刪除。等這個key刪除了,自己也就拿到鎖了。

這種分布式鎖的實現(xiàn)和我一開始的預(yù)想是不同的。它不存在鎖的競爭,不存在重復(fù)的嘗試加鎖的操作。而是通過使用統(tǒng)一的前綴pfx來put,然后根據(jù)各自的版本號來排隊獲取鎖。效率非常的高。避免了驚群效應(yīng)

如圖所示,共有4個session來加鎖,那么根據(jù)revision來排隊,獲取鎖的順序為session2 -> session3 -> session1 -> session4。

這里面需要注意一個驚群效應(yīng),每一個client在鎖住/lock這個path的時候,實際都已經(jīng)插入了自己的數(shù)據(jù),類似/lock/LEASE_ID,并且返回了各自的index(就是raft算法里面的日志索引),而只有最小的才算是拿到了鎖,其他的client需要watch等待。例如client1拿到了鎖,client2和client3在等待,而client2拿到的index比client3的更小,那么對于client1刪除鎖之后,client3其實并不關(guān)心,并不需要去watch。所以綜上,等待的節(jié)點只需要watch比自己index小并且差距最小的節(jié)點刪除事件即可。

2.6.5 基于 ETCD的選主

2.6.5.1 機制

etcd有多種使用場景,Master選舉是其中一種。說起Master選舉,過去常常使用zookeeper,通過創(chuàng)建EPHEMERAL_SEQUENTIAL節(jié)點(臨時有序節(jié)點),我們選擇序號最小的節(jié)點作為Master,邏輯直觀,實現(xiàn)簡單是其優(yōu)勢,但是要實現(xiàn)一個高健壯性的選舉并不簡單,同時zookeeper繁雜的擴縮容機制也是沉重的負(fù)擔(dān)。

master 選舉根本上也是搶鎖,與zookeeper直觀選舉邏輯相比,etcd的選舉則需要在我們熟悉它的一系列基本概念后,調(diào)動我們充分的想象力:

  • MVCC,key存在版本屬性,沒被創(chuàng)建時版本號為0;

  • CAS操作,結(jié)合MVCC,可以實現(xiàn)競選邏輯,if(version == 0) set(key,value),通過原子操作,確保只有一臺機器能set成功;

  • Lease租約,可以對key綁定一個租約,租約到期時沒預(yù)約,這個key就會被回收;

  • Watch監(jiān)聽,監(jiān)聽key的變化事件,如果key被刪除,則重新發(fā)起競選。

至此,etcd選舉的邏輯大體清晰了,但這一系列操作與zookeeper相比復(fù)雜很多,有沒有已經(jīng)封裝好的庫可以直接拿來用?etcd clientv3 concurrency中有對選舉及分布式鎖的封裝。后面進(jìn)一步發(fā)現(xiàn),etcdctl v3里已經(jīng)有master選舉的實現(xiàn)了,下面針對這部分代碼進(jìn)行簡單注釋,在最后參考這部分代碼實現(xiàn)自己的選舉邏輯。

2.6.5.2 etcd選主的實現(xiàn)

官方示例:https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/example_election_test.go

如crontab 示例:

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "go.etcd.io/etcd/clientv3/concurrency"
    "log"
    "time"
)

const prefix = "/election-demo"
const prop = "local"

func main() {
    endpoints := []string{"szth-cce-devops00.szth.baidu.com:8379"}
    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    campaign(cli, prefix, prop)

}

func campaign(c *clientv3.Client, election string, prop string) {
    for {
        // 租約到期時間:5s
        s, err := concurrency.NewSession(c, concurrency.WithTTL(5))
        if err != nil {
            fmt.Println(err)
            continue
        }
        e := concurrency.NewElection(s, election)
        ctx := context.TODO()

        log.Println("開始競選")

        err = e.Campaign(ctx, prop)
        if err != nil {
            log.Println("競選 leader失敗,繼續(xù)")
            switch {
            case err == context.Canceled:
                return
            default:
                continue
            }
        }

        log.Println("獲得leader")
        if err := doCrontab(); err != nil {
            log.Println("調(diào)用主方法失敗,辭去leader,重新競選")
            _ = e.Resign(ctx)
            continue
        }
        return
    }
}

func doCrontab() error {
    for {
        fmt.Println("doCrontab")
        time.Sleep(time.Second * 4)
        //return fmt.Errorf("sss")
    }
}

2.6.5.3 etcd選主的原理

/*
 * 發(fā)起競選
 * 未當(dāng)選leader前,會一直阻塞在Campaign調(diào)用
 * 當(dāng)選leader后,等待SIGINT、SIGTERM或session過期而退出
 * https://github.com/etcd-io/etcd/blob/master/etcdctl/ctlv3/command/elect_command.go
 */

func campaign(c *clientv3.Client, election string, prop string) error {
        //NewSession函數(shù)中創(chuàng)建了一個lease,默認(rèn)是60s TTL,并會調(diào)用KeepAlive,永久為這個lease自動續(xù)約(2/3生命周期的時候執(zhí)行續(xù)約操作)
    s, err := concurrency.NewSession(c)
    if err != nil {
        return err
    }
    e := concurrency.NewElection(s, election)
    ctx, cancel := context.WithCancel(context.TODO())

    donec := make(chan struct{})
    sigc := make(chan os.Signal, 1)
    signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigc
        cancel()
        close(donec)
    }()

    //競選邏輯,將展開分析
    if err = e.Campaign(ctx, prop); err != nil {
        return err
    }

    // print key since elected
    resp, err := c.Get(ctx, e.Key())
    if err != nil {
        return err
    }
    display.Get(*resp)

    select {
    case <-donec:
    case <-s.Done():
        return errors.New("elect: session expired")
    }

    return e.Resign(context.TODO())
}

/*
 * 類似于zookeeper的臨時有序節(jié)點,etcd的選舉也是在相應(yīng)的prefix path下面創(chuàng)建key,該key綁定了lease并根據(jù)lease id進(jìn)行命名,
 * key創(chuàng)建后就有revision號,這樣使得在prefix path下的key也都是按revision有序
 * https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/election.go
 */

func (e *Election) Campaign(ctx context.Context, val string) error {
    s := e.session
    client := e.session.Client()

    //真正創(chuàng)建的key名為:prefix + lease id
    k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
    //Txn:transaction,依靠Txn進(jìn)行創(chuàng)建key的CAS操作,當(dāng)key不存在時才會成功創(chuàng)建
    txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
    txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
    txn = txn.Else(v3.OpGet(k))
    resp, err := txn.Commit()
    if err != nil {
        return err
    }
    e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
    //如果key已存在,則創(chuàng)建失敗;
        //當(dāng)key的value與當(dāng)前value不等時,如果自己為leader,則不用重新執(zhí)行選舉直接設(shè)置value;
        //否則報錯。
    if !resp.Succeeded {
        kv := resp.Responses[0].GetResponseRange().Kvs[0]
        e.leaderRev = kv.CreateRevision
        if string(kv.Value) != val {
            if err = e.Proclaim(ctx, val); err != nil {
                e.Resign(ctx)
                return err
            }
        }
    }

    //一直阻塞,直到確認(rèn)自己的create revision為當(dāng)前path中最小,從而確認(rèn)自己當(dāng)選為leader
    _, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
    if err != nil {
        // clean up in case of context cancel
        select {
        case <-ctx.Done():
            e.Resign(client.Ctx())
        default:
            e.leaderSession = nil
        }
        return err
    }
    e.hdr = resp.Header

    return nil
}

2.7 分布式鎖的安全問題

下面我們來討論一下分布式鎖的安全問題:

  • 長時間的GC pause:熟悉Java的同學(xué)肯定對GC不陌生,在GC的時候會發(fā)生STW(stop-the-world),例如CMS垃圾回收器,他會有兩個階段進(jìn)行STW防止引用繼續(xù)進(jìn)行變化。那么有可能會出現(xiàn)下面圖(引用至Martin反駁Redlock的文章)中這個情況:


client1獲取了鎖并且設(shè)置了鎖的超時時間,但是client1之后出現(xiàn)了STW,這個STW時間比較長,導(dǎo)致分布式鎖進(jìn)行了釋放,client2獲取到了鎖,這個時候client1恢復(fù)了鎖,那么就會出現(xiàn)client1,2同時獲取到鎖,這個時候分布式鎖不安全問題就出現(xiàn)了。這個其實不僅僅局限于RedLock,對于我們的ZK,Mysql一樣的有同樣的問題。

  • 時鐘發(fā)生跳躍:對于Redis服務(wù)器如果其時間發(fā)生了向跳躍,那么肯定會影響我們鎖的過期時間,那么我們的鎖過期時間就不是我們預(yù)期的了,也會出現(xiàn)client1和client2獲取到同一把鎖,那么也會出現(xiàn)不安全,這個對于Mysql也會出現(xiàn)。但是ZK由于沒有設(shè)置過期時間,那么發(fā)生跳躍也不會受影響。
  • 長時間的網(wǎng)絡(luò)I/O:這個問題和我們的GC的STW很像,也就是我們這個獲取了鎖之后我們進(jìn)行網(wǎng)絡(luò)調(diào)用,其調(diào)用時間由可能比我們鎖的過期時間都還長,那么也會出現(xiàn)不安全的問題,這個Mysql也會有,ZK也不會出現(xiàn)這個問題。

對于這三個問題,在網(wǎng)上包括Redis作者在內(nèi)發(fā)起了很多討論。

2.7.1 GC的STW

對于這個問題可以看見基本所有的都會出現(xiàn)問題,對于ZK這種他會生成一個自增的序列,那么我們真正進(jìn)行對資源操作的時候,需要判斷當(dāng)前序列是否是最新,有點類似于我們樂觀鎖。當(dāng)然這個解法Redis作者進(jìn)行了反駁,你既然都能生成一個自增的序列了那么你完全不需要加鎖了,也就是可以按照類似于Mysql樂觀鎖的解法去做。

我自己認(rèn)為這種解法增加了復(fù)雜性,當(dāng)我們對資源操作的時候需要增加判斷序列號是否是最新,無論用什么判斷方法都會增加復(fù)雜度。

2.7.2 時鐘發(fā)生跳躍

RedLock不安全很大的原因也是因為時鐘的跳躍,因為鎖過期強依賴于時間,但是ZK不需要依賴時間,依賴每個節(jié)點的Session。Redis作者也給出了解答:對于時間跳躍分為人為調(diào)整和NTP自動調(diào)整。

  • 人為調(diào)整:人為調(diào)整影響的那么完全可以人為不調(diào)整,這個是處于可控的。
  • NTP自動調(diào)整:這個可以通過一定的優(yōu)化,把跳躍時間控制的可控范圍內(nèi),雖然會跳躍,但是是完全可以接受的。

2.7.3長時間的網(wǎng)絡(luò)I/O

對于這個問題的優(yōu)化可以控制網(wǎng)絡(luò)調(diào)用的超時時間,把所有網(wǎng)絡(luò)調(diào)用的超時時間相加,那么我們鎖過期時間其實應(yīng)該大于這個時間,當(dāng)然也可以通過優(yōu)化網(wǎng)絡(luò)調(diào)用比如串行改成并行,異步化等。可以參考文章: 并行化-你的高并發(fā)大殺器,異步化-你的高并發(fā)大殺器

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容