這篇文章是對(duì)近期工作的一個(gè)總結(jié),雖然主要利用了一些開源系統(tǒng)和比較成熟的機(jī)制,但在業(yè)務(wù)實(shí)踐過程中還是遇到了一些坑。寫下來一是為了自我總結(jié)和梳理,另外也希望能夠給別人帶來一點(diǎn)點(diǎn)的啟發(fā)。
前言
KafkaConsumer一般作為實(shí)時(shí)數(shù)據(jù)流處理當(dāng)中的訂閱端,其主要工作是實(shí)時(shí)的從Kafka中訂閱數(shù)據(jù),進(jìn)行一些簡單的ETL工作(不是必須),將數(shù)據(jù)存儲(chǔ)到其它存儲(chǔ)系統(tǒng)上(一般是HDFS、HBase、Mysql等)。除此之外,我們的KafkaConsumer還要求具有以下特性:
- 數(shù)據(jù)延遲是必須在秒級(jí)別。
- 要保證數(shù)據(jù)的
Exactly Once Semantics,即不丟不重。 - 需要盡量做到優(yōu)雅退出。
第一個(gè)問題比較容易做到,數(shù)據(jù)延遲在秒級(jí),就要求我們的KafkaConsumer必須是一個(gè)7*24小時(shí)的常駐MapReduce程序,同時(shí)由于我們只需要處理訂閱和轉(zhuǎn)儲(chǔ),所以該MapReduce程序是一個(gè) mapper only 的。所以后文我會(huì)重點(diǎn)介紹如何保證第二個(gè)特性,即數(shù)據(jù)的不丟不重。
如何保證數(shù)據(jù)的不丟
這里為了保證數(shù)據(jù)的不丟我們主要利用了Kafka本身的數(shù)據(jù)可回溯性和我們自己在Consumer程序中實(shí)現(xiàn)的checkpoint機(jī)制。
CheckPoint機(jī)制的實(shí)現(xiàn)
Kafka本身按照Topic劃分?jǐn)?shù)據(jù)流,我們的一個(gè)KafkaConsumer程序只會(huì)訂閱一個(gè)Topic的數(shù)據(jù)。同時(shí)Kafka的一個(gè)Topic下劃分為多個(gè)partition,每個(gè)partition是一個(gè)實(shí)際的數(shù)據(jù)管道,我們的KafkaConsumer作為mapper only的,每個(gè)mapper會(huì)去實(shí)時(shí)的訂閱一個(gè)partition的數(shù)據(jù),所以mapper的個(gè)數(shù)是和partition的個(gè)數(shù)保持一樣的。如何實(shí)現(xiàn)一個(gè)Kafka的InputFormat,這塊網(wǎng)上已經(jīng)有很多文章,我就不具體介紹了。下面介紹如何進(jìn)行checkpoint。
由于每個(gè)mapper對(duì)應(yīng)一個(gè)Kafka的partition,所以每個(gè)mapper需要單獨(dú)記錄自己的進(jìn)度。我們?cè)趍ysql當(dāng)中創(chuàng)建這樣一張表:
CREATE TABLE `kafka_consumer_progress` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`kafka_partition_id` int(11) NOT NULL,
`current_offset` bigint(11) NOT NULL,
) ENGINE=InnoDB AUTO_INCREMENT=3419273 DEFAULT CHARSET=utf8mb4
其中id作為自增主鍵,kafka_partition_id記錄數(shù)據(jù)在kafka中對(duì)應(yīng)的partition,current_offset記錄最后寫入的數(shù)據(jù)在該partition下對(duì)應(yīng)的offset。每個(gè)mapper在將一批(注意不是每條都寫,因?yàn)橥鵰ysql中記錄checkpoint也是一個(gè)很重的操作)數(shù)據(jù)寫入kudu成功之后,需要在mysql中更新對(duì)應(yīng)的進(jìn)度。同時(shí),mapper在失敗重啟的時(shí)候也都需要先從msyql當(dāng)中恢復(fù)進(jìn)度。代碼示例如下:
// 1. 重啟時(shí)需要先恢復(fù)進(jìn)度
currentOffset = metaClient.recoverOffset(partitionId);
while(true) {
// 2. 從kafka中訂閱數(shù)據(jù)
Response response = consumer.fetch(currentOffset);
// 3. 將數(shù)據(jù)寫入kudu
foreach(Data data : response.getMessage()) {
kuduWriter.write(data);
}
// 4. 更新進(jìn)度,記錄checkpoint
currentOffset = response.nextOffset();
metaClient.updateProgress(partitionId, currentOffset);
}
如何保證數(shù)據(jù)的不重
進(jìn)行到這一步我們很容易發(fā)現(xiàn),即使在mapper剛剛寫入kudu,還沒來得及更新checkpoint時(shí)掛掉,我們也能從上一個(gè)checkpoint恢復(fù)進(jìn)度,這樣保證了數(shù)據(jù)的at least once 語義。但是如果該批次的數(shù)據(jù)恰好寫成功了而沒來得及更新checkpoint,就會(huì)導(dǎo)致數(shù)據(jù)的重復(fù)寫入。如何保證數(shù)據(jù)的不重復(fù),這里我們根據(jù)具體的業(yè)務(wù)場景,分為兩塊來看。
- 對(duì)于冪等的操作,采用主鍵去重。
- 對(duì)于非冪等的操作,每條數(shù)據(jù)單獨(dú)記錄更新的offset。
冪等操作
針對(duì)我們的業(yè)務(wù)類型來說,數(shù)據(jù)分為Event和Profile兩種類型,其中Event類型的數(shù)據(jù)我們只支持插入這一種語義。所以Event類型的數(shù)據(jù)操作是冪等的。對(duì)于冪等操作重復(fù)執(zhí)行不受影響,但是我們需要保證每條數(shù)據(jù)具有唯一ID。這里我們使用kafka的partition id + offset作為kudu表的主鍵,所有Event數(shù)據(jù)的寫入都相當(dāng)于對(duì)主鍵數(shù)據(jù)的replace。所以,Event數(shù)據(jù)的重復(fù)寫入相當(dāng)于在kudu層做了去重。
非冪等操作
Profile類型的數(shù)據(jù),是我們業(yè)務(wù)實(shí)體,每條數(shù)據(jù)具有自己的唯一ID。Profile類型的數(shù)據(jù)操作支持profile_set、profile_update、profile_increase、profile_set_once等,所以Profile類型的操作屬于非冪等的。對(duì)于這種類型的數(shù)據(jù)我們?cè)趯?duì)應(yīng)的profile kudu表當(dāng)中增加了offset字段,記錄這條數(shù)據(jù)最后被更新的數(shù)據(jù)來源于kafka的哪一個(gè)offset。這里可能會(huì)問,為什么不需要記錄partition id呢,這是因?yàn)槲覀兊腜rofile數(shù)據(jù)在導(dǎo)入Kafka的時(shí)候已經(jīng)是按照Profile的id進(jìn)行的hash取模,所以相同id的Profile數(shù)據(jù)只會(huì)出現(xiàn)在相同的partition內(nèi)。
對(duì)Profile進(jìn)行更新時(shí),我們會(huì)先從kudu當(dāng)中讀取是否已有對(duì)應(yīng)id的Profile數(shù)據(jù)存在,如果有的話,會(huì)比較kudu當(dāng)中Profile數(shù)據(jù)的offset和Kafka中的profile數(shù)據(jù)的offset,只有當(dāng)kudu當(dāng)中的offset小于Kafka中的offset的時(shí)候,才會(huì)對(duì)該條Profile數(shù)據(jù)進(jìn)行更新,從而完成了對(duì)非冪等的Profile操作的去重。
如何做到常駐的MapReduce程序的優(yōu)雅退出。
這里有同學(xué)可能會(huì)問兩個(gè)問題:
- 你們的KafkaConsumer不是7*24小時(shí)常駐的嗎,為什么還需要退出?
- 上面兩步不是保證了數(shù)據(jù)的不丟不重嗎,為什么還需要優(yōu)雅退出?
對(duì)于問題1,主要有兩種場景,一種是我們的程序在升級(jí)的時(shí)候,肯定是需要主動(dòng)退出并重啟的;二是由于各種異常(比如kudu掛掉等),我們需要讓KafkaConsumer退出,從而發(fā)出報(bào)警并進(jìn)行人工干預(yù)。
對(duì)于問題2,我們的場景是:需要在KafkaConsumer當(dāng)中對(duì)數(shù)據(jù)來源進(jìn)行一些統(tǒng)計(jì)工作,同時(shí)需要將統(tǒng)計(jì)信息記錄到mysql當(dāng)中,主要用處是當(dāng)數(shù)據(jù)異常時(shí),可以進(jìn)行方便的debug工作。所以也只是做到盡量優(yōu)雅退出,及時(shí)這部分?jǐn)?shù)據(jù)丟失了,也不會(huì)影響我們程序的正常工作。
解決方案一 —— 捕獲kill信號(hào)
最開始我的想法是當(dāng)作業(yè)失敗,hadoop需要將mapper任務(wù)kill掉時(shí),我們捕獲到相應(yīng)的信號(hào),然后進(jìn)行主動(dòng)退出。但是調(diào)研之后發(fā)現(xiàn),hadoop會(huì)先嘗試使用SIGTERM殺死m(xù)apper進(jìn)程,然后等待一段時(shí)間(默認(rèn)5000毫秒),如果進(jìn)程還沒有退出時(shí),會(huì)使用SIGTERM殺死進(jìn)程。我們雖然能夠捕獲到SIGTERM信號(hào),但是5000毫秒對(duì)于我們來說往往來不及做剩下的clean up工作。而這個(gè)超時(shí)配置又是yarn全局的,我們沒法為KafkaConsumer單獨(dú)修改,所以這個(gè)方案被拋棄了。
解決方案二 —— 使用Zookeeper進(jìn)行同步
方案一走不通,我們必須使用自己的方法進(jìn)行mapper間的信息同步。我們想到了Zookeeper(以下簡稱ZK): 在KafkaConsumer啟動(dòng)時(shí)由本地進(jìn)程創(chuàng)建對(duì)應(yīng)的ZK節(jié)點(diǎn),同時(shí)每個(gè)mapper都會(huì)作為一個(gè)Watcher觀察這個(gè)節(jié)點(diǎn)。當(dāng)KafkaConsumer需要主動(dòng)退出時(shí),本地進(jìn)程會(huì)將對(duì)應(yīng)的ZK節(jié)點(diǎn)刪除,同時(shí)所有mapper觀察到對(duì)應(yīng)的ZK節(jié)點(diǎn)變化之后,會(huì)進(jìn)行最后的clean up并優(yōu)雅退出。這樣就解決了我們?cè)谏?jí)的時(shí)候需要主動(dòng)退出KafkaConsumer的問題。但是還有一個(gè)問題也是我最開始沒有想到的,當(dāng)一個(gè)mapper執(zhí)行出現(xiàn)異常時(shí),需要異常退出,這時(shí)hadoop會(huì)標(biāo)記整個(gè)Job為失敗,并將其它正在運(yùn)行的mapper也kill掉。這就要求單個(gè)mapper異常時(shí),我們也需要使用ZK進(jìn)行優(yōu)雅退出。方案類似:當(dāng)mapper異常退出時(shí),會(huì)嘗試將對(duì)應(yīng)的ZK節(jié)點(diǎn)刪除。代碼示例如下:
private void doExit(Context context) {
// 進(jìn)行最后的clean up工作
this.cleanup();
if (!Terminator.isShutdownBegan()) {
logger.error("kafka consumer mapper run failed.");
// 當(dāng)mapper因?yàn)楫惓M顺鰰r(shí),會(huì)將相應(yīng)的zk節(jié)點(diǎn)刪除,以通知其它mapper進(jìn)行安全退出
// 探測ZK節(jié)點(diǎn)是否存在
boolean pathExists = true;
try {
pathExists = this.zookeeperClient.checkExists(zkNode);
if (pathExists) {
this.zookeeperClient.deletePath(this.zkNode);
pathExists = false;
}
} catch (Exception e) {
logger.error("delete path {} failed.", zkNode, e);
try {
pathExists = this.zookeeperClient.checkExists(zkNode);
} catch (Exception e1) {
logger.error("check zk path result failed.");
}
}
if (!pathExists) {
logger.info("kafka consumer mapper exit gracefully.");
} else {
logger.warn("kafka consumer mapper exit failed.");
System.exit(1);
}
} else {
logger.info("mark shut down phase done.");
Terminator.markShutdownPhaseDone();
}
}
通過方案二,我們能夠解決90%的異常退出問題,但還不能完全解決問題。比如mapper爆內(nèi)存的時(shí)候,hadoop還是會(huì)將mapper殺掉,這是我們還是無法進(jìn)行干預(yù),所以這里的優(yōu)雅退出只能做到盡量準(zhǔn),如果大家有什么好的方法,也可以告訴我。