寫在所有之前
在日常業(yè)務中可能遇到生成業(yè)務類全局ID的情況,這類ID的關鍵點在于全局不重復,對于單例來說,這個不難實現(xiàn),但是對于分布式場景下,如何保證每個獨立部署的服務都能生成互補重復的唯一鍵呢,它的核心思路就是找到一個唯一的“權威”來做這個事情,也就是說獨立于分布式的應用來找一位發(fā)布者。但是這又引出另一個問題,如何保證這個唯一“權威”不成為系統(tǒng)瓶頸呢?
實際上目前不少支持分布式部署的中間件都有自己的CAP邏輯,美團推出的開源項目Leaf也就是利用了數(shù)據(jù)庫和ZK以及他們的分布式方案最終實現(xiàn)了分布式Id生成系統(tǒng)。
具體的實現(xiàn)邏輯可見美團技術團隊的官方文檔
- 框架介紹
-
源碼地址
本文的目的是從源碼的角度介紹Leaf是如何工作的。如前文所說,在Leaf的設計中有兩種實現(xiàn)方式,一是基于數(shù)據(jù)庫的segment算法,二是基于ZK的雪花算法。我們先整體看一下Leaf源碼的結構:
Leaf項目框架.png
Leaf的源碼很簡單,分為兩部分,leaf-core作為具體的Id生成實現(xiàn)自成一體,另外的leaf-server是官方以web服務的形式對外提供查詢api和監(jiān)控系統(tǒng)。
監(jiān)控web.png
實際上,我們在閱讀源碼時只用關注到leaf-core,leaf-server僅僅是展示層,在真實項目中可用,也可自行替換。以下從segment(基于database)和snowflake(基于雪花算法)兩個方面分別進行源碼解讀。
segment
原理解釋
實際上,segemnt利用數(shù)據(jù)庫作為一個Id生成規(guī)則的存放地址,從表結構可以看到:
CREATE TABLE `leaf_alloc` (
`biz_tag` varchar(128) NOT NULL DEFAULT '',
`max_id` bigint NOT NULL DEFAULT '1',
`step` int NOT NULL,
`description` varchar(256) DEFAULT NULL,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`biz_tag`)
)
biz_tag:業(yè)務標簽,它意味著該表中每一條數(shù)據(jù)都是一個業(yè)務的id生成規(guī)則,比如在一個電商系統(tǒng)中,除了訂單id需要生成外,可能還有商品條形碼id,如此就會出現(xiàn)兩條表記錄。
max_id:當前規(guī)則會分配的最大id值。注意這里是bigint類型,因此意味著segment方式生成的id就是普通數(shù)字。
step:步長,不難理解,當前規(guī)則的號段就是從max_id - step ~ max_id。
我們注意到這一系列字段中l(wèi)eaf_alloc表中并不會存儲當前的id,那客戶端在申請一個新id時,它是怎么知道下一個id是多少呢?實際上,當前id是記錄在該服務運行時內存中。
這就涉及到segment形式中的核心概念——segment,它的定義如下:
public class Segment {
private AtomicLong value = new AtomicLong(0);
private volatile long max;
private volatile int step;
private SegmentBuffer buffer;
}
沒錯第一個成員變量就是當前id值??梢韵胍?,當leaf服務啟動時,程序會讀取db中的規(guī)則配置,將其讀入內存并以Segment形式存儲(見其他三個fields),并初始化當前id=0.
這就是segment的大致邏輯,總結起來,它利用了db存儲規(guī)則,然后在每個leaf-core啟動時加載規(guī)則,并初始化當前id。讀到這里有個小問題:“分布式系統(tǒng)中,自然leaf服務也會分布式部署,當有多個leaf服務時,如何保證生成的id不重復呢”。這里先說結論:
每個leaf-core在啟動時都會去讀取db中的規(guī)則(如果沒有則需提前手動insert)。每次讀取前都會根據(jù)步長(step)取修改max_id:
max_id = max_id + step
理解了吧,也就是說每個leaf服務都有一個獨立的號段(max_id-step, max_id),因此它們在分配id時并不會重復。
關于其原理言盡于此,我們直接看看源碼:
segemnt核心類如下,
一個實現(xiàn)類SegmentIDGenImpl實現(xiàn)接口IDGen,結束!是不是很簡單

在IDGen interface中兩方法get,init
public interface IDGen {
Result get(String key);
boolean init();
}
自然從init實現(xiàn)方法開始,里面涉及到兩個具體方法updateCacheFromDb和updateCacheFromDbAtEveryMinute。從字面理解,將規(guī)則從db中加載到segment對象中,并且從開啟每分鐘同步一次的定時任務。關注點可以放在它的加載過程。
private void updateCacheFromDb() {
logger.info("update cache from db");
StopWatch sw = new Slf4JStopWatch();
try {
// 1.獲取所有biz_tag
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
// 2.獲取當前緩存中的biz_tag
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> insertTagsSet = new HashSet<>(dbTags);
Set<String> removeTagsSet = new HashSet<>(cacheTags);
//db中新加的tags灌進cache
for(int i = 0; i < cacheTags.size(); i++){
String tmp = cacheTags.get(i);
if(insertTagsSet.contains(tmp)){
insertTagsSet.remove(tmp);
}
}
// 3.至此獲得目前需要新增到cache中的tags,存放到insertTagsSet中
for (String tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(tag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
}
//4. cache中已失效的tags從cache刪除
for(int i = 0; i < dbTags.size(); i++){
String tmp = dbTags.get(i);
if(removeTagsSet.contains(tmp)){
removeTagsSet.remove(tmp);
}
}
for (String tag : removeTagsSet) {
cache.remove(tag);
logger.info("Remove tag {} from IdCache", tag);
}
} catch (Exception e) {
logger.warn("update cache from db exception", e);
} finally {
sw.stop("updateCacheFromDb");
}
}
代碼很簡單,值得一提的是,第三步中segment對象被放到了SegmentBuffer中,實際上塞到本地緩存中的是SegmentBuffer,它是個什么玩意兒?這涉及到美團對leaf的一層優(yōu)化。當value==max_id后,會更新db中的規(guī)則,并reload規(guī)則到segmnt中,這里涉及到數(shù)據(jù)庫的I/O,在高qps情況下這個過程會頻繁發(fā)生。

因此leaf在當前value達到某個閾值后,會提前開啟下一個max_id的生成,并把該規(guī)則讀入新segment中,一旦segment1中id達到max_id,則直接內存級別切換segment即可。

因此引入了SegmentBuffer來管理這兩個segemnts:
public class SegmentBuffer {
private String key;
private Segment[] segments; //雙buffer
private volatile int currentPos; //當前的使用的segment的index
private volatile boolean nextReady; //下一個segment是否處于可切換狀態(tài)
private volatile boolean initOk; //是否初始化完成
private final AtomicBoolean threadRunning; //線程是否在運行中
private final ReadWriteLock lock;
private volatile int step;
private volatile int minStep;
private volatile long updateTimestamp;
}
到這里我們看到了leaf-segemnt形式中緩存的最上層是業(yè)務tag-SegmentBuffer,SegmentBuffer中對應了兩個Segment,而當前的value就存在segment當中。

因此我們看到上面提到的updateCacheFromDb方法,實際上是在更新第一層關系,也就是掃描db中所有的bizTag,跟緩存中做比對,新增的bizTag在緩存中創(chuàng)建map,刪除緩存中不存在的bizTag,保留與db一致的bizTag。需要注意的是,這時候對于新增的bizTag僅僅是在cache中創(chuàng)建空的SegmentBuffer。與updateCacheFromDb對應,updateSegmentFromDb就在更新第二層。
public void updateSegmentFromDb(String key, Segment segment) {
StopWatch sw = new Slf4JStopWatch();
SegmentBuffer buffer = segment.getBuffer();
LeafAlloc leafAlloc;
if (!buffer.isInitOk()) {
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
buffer.setStep(leafAlloc.getStep());
buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step
} else if (buffer.getUpdateTimestamp() == 0) {
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(leafAlloc.getStep());
buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step為DB中的step
} else {
long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
int nextStep = buffer.getStep();
//表示15min內更新過,代表更新頻繁,這樣的化,通過增加步長,
// 緩存更多的id,來減少對db的操作
if (duration < SEGMENT_DURATION) {
if (nextStep * 2 > MAX_STEP) {
//do nothing
} else {
nextStep = nextStep * 2;
}
} else if (duration < SEGMENT_DURATION * 2) {
//do nothing with nextStep
} else {
//相當于duration >= 30min,表示該key的訪問并不頻繁,因此可以降低步長。
nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
}
logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
LeafAlloc temp = new LeafAlloc();
temp.setKey(key);
temp.setStep(nextStep);
leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(nextStep);
buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step為DB中的step
}
// must set value before set max
long value = leafAlloc.getMaxId() - buffer.getStep();
segment.getValue().set(value);
segment.setMax(leafAlloc.getMaxId());
segment.setStep(buffer.getStep());
sw.stop("updateSegmentFromDb", key + " " + segment);
}
這樣一來,當客戶端請求獲取一個id時,會傳入當前服務所屬bizTag,如果cache中存在則直接找到segment并獲取其中的value;如果cache中不存在bizTag則直接返回找不到的異常。這里有個疑問:緩存中找不到bizTag的時候,為什么不去db中主動查詢一遍呢?我猜想,開發(fā)者的設計理念就是通過緩存去應對高并發(fā)場景降低db的I/O,直接去避免緩存穿透的問題。
寫到這里,筆者意識到一個問題,當部署了多個leaf服務,此時流量可能隨機的訪問這些leaf,可能存在一個問題,那就是leaf生成的id的確是全局唯一,但不見得是全局按時間遞增。

以上是leaf服務初始化或從db中l(wèi)oad規(guī)則,準備數(shù)據(jù)的源碼內容。接下來是最關鍵的客戶端獲取id的過程:
public Result get(final String key) {
if (!initOK) {
return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
}
//如果key(bizTag)存在,表示對應的規(guī)則已加載到緩存
if (cache.containsKey(key)) {
SegmentBuffer buffer = cache.get(key);
//如果SegmentBuffer未初始化(僅實例化)則先初始化
if (!buffer.isInitOk()) {
synchronized (buffer) {
// 雙重驗證:第一次驗證是快速判斷SegmentBuffer是否需要初始化,如果需要則進入同步模塊,
// 在進入同步模塊后,為了避免其他線程在當前線程等待鎖的過程中進行了初始化,因此會二次初始化
if (!buffer.isInitOk()) {
try {
updateSegmentFromDb(key, buffer.getCurrent());
logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
buffer.setInitOk(true);
} catch (Exception e) {
logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
}
}
}
}
//id都是從本地緩存中獲取,即便當前緩存中不存在數(shù)據(jù),也是先由db同步到cache中,再獲取
return getIdFromSegmentBuffer(cache.get(key));
}
return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
}
最終獲取id的過程在getIdFromSegmentBuffer中,代碼執(zhí)行過程參見下圖:

可以看到,在getIdFromSegmentBuffer過程中主要發(fā)生了兩件事:
- 當前id+1,得到應該返回的id,但是這個id在返回前需要判斷是否超過了規(guī)則定義的max_id,如果超過了(嚴格來說等于也不行)。則將當前的buffer指向另一個備用的segment,他在之前已經(jīng)預加載過。
- 第二件事,實際上就是有個異步的判斷,在剛進入getIdFromSegmentBuffer就會判斷,是否當前的value值超過了10% * max_id,如果超過了,就預加載另一個segment。
根據(jù)上圖,和這兩點看這段邏輯就會很清晰了。leaf segemnt形式介紹完成,總體來說,它是一種利用數(shù)據(jù)庫持久化id生成規(guī)則,而非具體的id數(shù)據(jù)。而把具體的id生成任務丟給內存來進行,這樣做的好處,在于即便在高并發(fā)的場景下,內存可以最快的反應請求,并且每次生成的規(guī)則都代表了一批數(shù)據(jù),因此真實的db
I/O也不會很高。

