在工作中經(jīng)常會遇到去重的場景,例如基于 App 的用戶行為日志分析系統(tǒng),用戶的行為日志從手機(jī)客戶端上報(bào)到 Nginx 服務(wù)端,通過 Logstash、Flume 或其他工具將日志從 Nginx 寫入到 Kafka 中。
由于用戶手機(jī)客戶端的網(wǎng)絡(luò)可能出現(xiàn)不穩(wěn)定,所以手機(jī)客戶端上傳日志的策略是:寧可重復(fù)上報(bào),也不能丟日志。所以導(dǎo)致 Kafka 中必然會出現(xiàn)日志重復(fù)的情況,即:同一條日志出現(xiàn)了 2 條或 2 條以上。
通常情況下,F(xiàn)link 任務(wù)的數(shù)據(jù)源都是 Kafka,若 Kafka 中數(shù)據(jù)出現(xiàn)了重復(fù),在實(shí)時(shí) ETL 或者流計(jì)算時(shí)都需要考慮對日志主鍵進(jìn)行去重,否則會導(dǎo)致流計(jì)算結(jié)果偏高或結(jié)果不準(zhǔn)確的問題,例如用戶 a 在某個(gè)頁面只點(diǎn)擊了一次,但由于日志重復(fù)上報(bào),所以用戶 a 在該頁面的點(diǎn)擊日志在 Kafka 中出現(xiàn)了 2 次,最后統(tǒng)計(jì)該頁面的 click 數(shù)時(shí),結(jié)果就會偏高。
這里只闡述了一種可能造成 Kafka 中數(shù)據(jù)重復(fù)的情況,在生產(chǎn)環(huán)境中很多情況都可能造成 Kafka 中數(shù)據(jù)重復(fù),這里不一一列舉,本節(jié)主要講述出現(xiàn)了數(shù)據(jù)重復(fù)后,該如何處理。
實(shí)現(xiàn)去重的通用解決方案:
Kafka 中數(shù)據(jù)出現(xiàn)重復(fù)后,各種解決方案都比較類似,一般需要一個(gè)全局 set 集合來維護(hù)歷史所有數(shù)據(jù)的主鍵。當(dāng)處理新日志時(shí),需要拿到當(dāng)前日志的主鍵與歷史數(shù)據(jù)的 set 集合按照規(guī)則進(jìn)行比較,若 set 集合中已經(jīng)包含了當(dāng)前日志的主鍵,說明當(dāng)前日志在之前已經(jīng)被處理過了,則當(dāng)前日志應(yīng)該被過濾掉,否則認(rèn)為當(dāng)前日志不應(yīng)該被過濾應(yīng)該被處理,而且處理完成后需要將新日志的主鍵加入到 set 集合中,set 集合永遠(yuǎn)存放著所有已經(jīng)被處理過的數(shù)據(jù)。
處理流程很簡單,關(guān)鍵在于如何維護(hù)這個(gè) set 集合,可以簡單估算一下這個(gè) set 集合需要占用多大空間。本小節(jié)要解決的問題是百億數(shù)據(jù)去重,所以就按照每天 1 百億的數(shù)據(jù)量來計(jì)算。
由于每天數(shù)據(jù)量巨大,因此主鍵占用空間通常會比較大,如果主鍵占用空間小意味著表示的數(shù)據(jù)范圍就比較小,就可能導(dǎo)致主鍵沖突,例如:4 個(gè)字節(jié)的 int 類型表示數(shù)據(jù)范圍是為 -2147483648~2147483647,總共可以表示 42 億個(gè)數(shù),如果這里每天百億的數(shù)據(jù)量選用 int 類型做為主鍵的話,很明顯會有大量的主鍵發(fā)生沖突,會將不重復(fù)的數(shù)據(jù)認(rèn)為是發(fā)生了重復(fù)。
用戶的行為日志是在手機(jī)客戶端生成的,沒有全局發(fā)號器,一般會選取 UUID 做為日志的主鍵,UUID 會生成 36 位的字符串,例如:"f106c4a1-4c6f-41c1-9d30-bbb2b271284a"。每個(gè)主鍵占用 36 字節(jié),每天 1 百億數(shù)據(jù),36 字節(jié) * 100億 ≈ 360GB。這僅僅是一天的數(shù)據(jù)量,所以該 set 集合要想存儲空間不發(fā)生持續(xù)地爆炸式增長,必須增加一個(gè)功能,那就是給所有的主鍵增加 ttl(Time To Live的縮寫,即:過期時(shí)間)。
如果不增加 ttl,10 天數(shù)據(jù)量的主鍵占用空間就 3.6T,100 天數(shù)據(jù)量的主鍵占用空間 36T,所以在設(shè)計(jì)之初必須考慮為主鍵設(shè)定 ttl。如果要求按天進(jìn)行去重或者認(rèn)為日志發(fā)生重復(fù)上報(bào)的時(shí)間間隔不可能大于 24 小時(shí),那么為了系統(tǒng)的可靠性 ttl 可以設(shè)置為 36 小時(shí)。每天數(shù)據(jù)量 1 百億,且 set 集合中存放著 36 小時(shí)的數(shù)據(jù)量,即 100 億 * 1.5 = 150 億,所以 set 集合中需要維護(hù) 150 億的數(shù)據(jù)量。
且 set 集合中每條數(shù)據(jù)都增加了 ttl,意味著 set 集合需要為每條數(shù)據(jù)再附帶保存一個(gè)時(shí)間戳,來確定該數(shù)據(jù)什么時(shí)候過期。例如 Redis 中為一個(gè) key 設(shè)置了 ttl,如果沒有為這個(gè) key 附帶時(shí)間戳,那么根本無法判斷該 key 什么時(shí)候應(yīng)該被清理。所以在考慮每條數(shù)據(jù)占用空間時(shí),不僅要考慮數(shù)據(jù)本身,還需要考慮是否需要其他附帶的存儲。主鍵本身占用 36 字節(jié)加上 long 類型的時(shí)間戳 8 字節(jié),所以每條數(shù)據(jù)至少需要占用 44 字節(jié),150 億 * 44 字節(jié) = 660GB。所以每天百億的數(shù)據(jù)量,如果我們使用 set 集合的方案來實(shí)現(xiàn),至少需要占用 660GB 以上的存儲空間。
使用 BloomFilter 來實(shí)現(xiàn)去重
有些流計(jì)算的場景對準(zhǔn)確性要求并不是很高,例如傳統(tǒng)的 Labmda 架構(gòu)中,都會有離線去矯正實(shí)時(shí)計(jì)算的結(jié)果,所以根據(jù)業(yè)務(wù)場景,當(dāng)業(yè)務(wù)要求可以接受結(jié)果有小量誤差時(shí),可以選擇使用一些低成本的數(shù)據(jù)結(jié)構(gòu)。BloomFilter 和 HyperLogLog 都是相對低成本的數(shù)據(jù)結(jié)構(gòu),分別有自己的應(yīng)用場景,且兩種數(shù)據(jù)結(jié)構(gòu)都有一定誤差。
HyperLogLog 可以估算出 HyperLogLog 中插入了多少個(gè)不重復(fù)的元素,而不能告訴我們之前是否插入了哪些元素。BloomFilter 則恰好相反,比起 BloomFilter 更像是一個(gè) set 集合,BloomFilter 可以告訴你 BloomFilter 中肯定不包含元素 a,或者告訴你 BloomFilter 中可能包含元素 b,但 BloomFilter 不能告訴你 BloomFilter 中插入了多少個(gè)元素。接下來了解一下 BloomFilter 的實(shí)現(xiàn)原理。
bitmap 位圖
了解 BloomFilter,從 bitmap(位圖)開始說起?,F(xiàn)在有 1 千萬個(gè)整數(shù),數(shù)據(jù)范圍在 0 到 2 千萬之間。如何快速查找某個(gè)整數(shù)是否在這 1 千萬個(gè)整數(shù)中呢?可以將這 1 千萬個(gè)數(shù)保存在 HashMap 中,不考慮對象頭及其他空間,1000 萬個(gè) int 類型數(shù)據(jù)需要占用大約 1000 萬 * 4 字節(jié) ≈ 40MB 存儲空間。有沒有其他方案呢?因?yàn)閿?shù)據(jù)范圍是 0 到 2 千萬,所以如下圖所示,可以申請一個(gè)長度為 2000 萬、boolean 類型的數(shù)組。將這 1 千萬個(gè)整數(shù)作為數(shù)組下標(biāo),將其對應(yīng)的數(shù)組值設(shè)置成 true,如下圖所示,數(shù)組下標(biāo)為 2、666、999 的位置存儲的數(shù)據(jù)為 true,表示 1 千萬個(gè)數(shù)中包含了 2、666、999 等。當(dāng)查詢某個(gè)整數(shù) K 是否在這 1 千萬個(gè)整數(shù)中時(shí),只需要將對應(yīng)的數(shù)組值 array[K] 取出來,看是否等于 true。如果等于 true,說明 1 千萬整數(shù)中包含這個(gè)整數(shù) K,否則表示不包含這個(gè)整數(shù) K。
Java 的 boolean 基本類型占用一個(gè)字節(jié)(8bit)的內(nèi)存空間,所以上述方案需要申請 2000 萬字節(jié)。如下圖所示,可以通過編程語言用二進(jìn)制位來模擬布爾類型,二進(jìn)制的 1 表示 true、二進(jìn)制的 0 表示 false。通過二進(jìn)制模擬布爾類型的方案,只需要申請 2000 萬 bit 即可,相比 boolean 類型而言,存儲空間占用僅為原來的 1/8。2000 萬 bit ≈ 2.4MB,相比存儲原始數(shù)據(jù)的方案 40 MB 而言,占用的存儲空間少了很多。
假如這 1 千萬個(gè)整數(shù)的數(shù)據(jù)范圍是 0 到 100 億,那么就需要申請 100 億個(gè) bit 約等于 1200MB,比存儲原始數(shù)據(jù)方案的 40MB 還要大很多。該情況下,直接使用位圖使用的存儲空間更多了,怎么解決呢?可以只申請 1 億 bit 的存儲空間,對 1000 萬個(gè)數(shù)求hash,映射到 1 億的二進(jìn)制位上,最后大約占用 12 MB 的存儲空間,但是可能存在 hash 沖突的情況。例如 3 和 100000003(一億零三)這兩個(gè)數(shù)對一億求余都為 3,所以映射到長度為 1 億的位圖上,這兩個(gè)數(shù)會占用同一個(gè) bit,就會導(dǎo)致一個(gè)問題:1 千萬個(gè)整數(shù)中包含了一億零三,所以位圖中下標(biāo)為 3 的位置存儲著二進(jìn)制 1。當(dāng)查詢 1 千萬個(gè)整數(shù)中是否包含數(shù)字 3 時(shí),同樣也是去位圖中下標(biāo) 3 的位置去查找,發(fā)現(xiàn)下標(biāo)為 3 的位置存儲著二進(jìn)制 1,所以誤以為 1 千萬個(gè)整數(shù)中包含數(shù)字 3。為了減少 hash 沖突,于是誕生了 BloomFilter。
BloomFilter 原理介紹
hash 存在 hash 沖突(碰撞)的問題,兩個(gè)不同的 key 通過同一個(gè) hash 函數(shù)得到的值有可能相同。為了減少沖突,可以多引入幾個(gè) hash 函數(shù),如果通過其中的一個(gè) hash 函數(shù)發(fā)現(xiàn)某元素不在集合中,那么該元素肯定不在集合中。當(dāng)所有的 hash 函數(shù)告訴我們該元素在集合中時(shí),才能確定該元素存在于集合中,這便是BloomFilter的基本思想。
如下圖所示,是往 BloomFilter 中插入元素 a、b 的過程,有 3 個(gè) hash 函數(shù),元素 a 經(jīng)過 3 個(gè) hash 函數(shù)后對應(yīng)的 2、8、10 這三個(gè)二進(jìn)制位,所以將這三個(gè)二進(jìn)制位置為 1,元素 b 經(jīng)過 3 個(gè) hash 函數(shù)后,對應(yīng)的 5、10、14 這三個(gè)二進(jìn)制位,將這三個(gè)二進(jìn)制位也置為 1,其中下標(biāo)為 10 的二進(jìn)制位被 a、b 元素都涉及到。
如下圖所示,是從 BloomFilter 中查找元素 c、d 的過程,同樣包含了 3 個(gè) hash 函數(shù),元素 c 經(jīng)過 3 個(gè) hash 函數(shù)后對應(yīng)的 2、6、9 這三個(gè)二進(jìn)制位,其中下標(biāo) 6 和 9 對應(yīng)的二進(jìn)制位為 0,所以會認(rèn)為 BloomFilter 中不存在元素 c。元素 d 經(jīng)過 3 個(gè) hash 函數(shù)后對應(yīng)的 5、8、14 這三個(gè)二進(jìn)制位,這三個(gè)位對應(yīng)的二進(jìn)制位都為 1,所以會認(rèn)為 BloomFilter 中存在元素 d,但其實(shí) BloomFilter 中并不存在元素 d,是因?yàn)樵?a 和元素 b 也對應(yīng)到了 5、8、14 這三個(gè)二進(jìn)制位上,所以 BloomFilter 會有誤判。但是從實(shí)現(xiàn)原理來看,當(dāng) BloomFilter 告訴你不包含元素 c 時(shí),BloomFilter 中肯定不包含元素 c,當(dāng) BloomFilter 告訴你 BloomFilter 中包含元素 d 時(shí),它只是可能包含,也有可能不包含。
使用 BloomFilter 實(shí)現(xiàn)數(shù)據(jù)去重
Redis 4.0 之后 BloomFilter 以插件的形式加入到 Redis 中,關(guān)于 api 的具體使用這里不多贅述。BloomFilter 在創(chuàng)建時(shí)支持設(shè)定一個(gè)預(yù)期容量和誤判率,預(yù)期容量即預(yù)計(jì)插入的數(shù)據(jù)量,誤判率即:當(dāng) BloomFilter 中插入的數(shù)據(jù)達(dá)到預(yù)期容量時(shí),誤判的概率,如果 BloomFilter 中插入數(shù)據(jù)較少的話,誤判率會更低。
經(jīng)筆者測試,申請一個(gè)預(yù)期容量為 10 億,誤判率為千分之一的 BloomFilter,BloomFilter 會申請約 143 億個(gè) bit,即:14G左右,相比之前 660G 的存儲空間小太多了。但是在使用過程中,需要記錄 BloomFilter 中插入元素的個(gè)數(shù),當(dāng)插入元素個(gè)數(shù)達(dá)到 10 億時(shí),為了保障誤差率,可以將當(dāng)前 BloomFilter 清除,重新申請一個(gè)新的 BloomFilter。
通過使用 Redis 的 BloomFilter,我們可以通過相對較小的內(nèi)存實(shí)現(xiàn)百億數(shù)據(jù)的去重,但是 BloomFilter 有誤差,所以只能使用在那些對結(jié)果能承受一定誤差的應(yīng)用場景,對于廣告計(jì)費(fèi)等對數(shù)據(jù)精度要求非常高的場景,極力推薦大家使用精準(zhǔn)去重的方案來實(shí)現(xiàn)。
使用 HBase 維護(hù)全局 set 實(shí)現(xiàn)去重
通過之前分析,我們知道要想實(shí)現(xiàn)百億數(shù)據(jù)量的精準(zhǔn)去重,需要維護(hù) 150 億數(shù)據(jù)量的 set 集合,每條數(shù)據(jù)占用 44 KB,總共需要 660 GB 的存儲空間。注意這里說的是存儲空間而不是內(nèi)存空間,為什么呢?因?yàn)?660G 的內(nèi)存實(shí)在是太貴了,660G 的 Redis 云服務(wù)一個(gè)月至少要 2 萬 RMB 以上,俗話說設(shè)計(jì)架構(gòu)不考慮成本等于耍流氓。這里使用 Redis 確實(shí)可以解決問題,但是成本較高。HBase 基于 rowkey Get 的效率比較高,所以這里可以考慮將這個(gè)大的 set 集合以 HBase rowkey 的形式存放到 HBase 中。HBase 表設(shè)置 ttl 為 36 小時(shí),最近 36 小時(shí)的 150 億條日志的主鍵都存放到 HBase 中,每來一條數(shù)據(jù),先拿到主鍵去 HBase 中查詢,如果 HBase 表中存在該主鍵,說明當(dāng)前日志已經(jīng)被處理過了,當(dāng)前日志應(yīng)該被過濾。如果 HBase 表中不存在該主鍵,說明當(dāng)前日志之前沒有被處理過,此時(shí)應(yīng)該被處理,且處理完成后將當(dāng)前主鍵 Put 到 HBase 表中。由于數(shù)據(jù)量比較大,所以一定要提前對 HBase 表進(jìn)行預(yù)分區(qū),將壓力分散到各個(gè) RegionServer 上。
使用 HBase rowkey 去重帶來的問題
一天 100 億的數(shù)據(jù)量,平均一秒 11.57 萬條日志。但是數(shù)據(jù)一般都會有高峰期,例如外賣軟件高峰期肯定是飯前的一兩個(gè)小時(shí),其余時(shí)間段數(shù)據(jù)量相對比較少。所以雖然每天 100 億數(shù)據(jù)量,但是到了數(shù)據(jù)高峰期每秒數(shù)據(jù)量可以達(dá)到 20 萬左右。按照之前的思路,每條數(shù)據(jù)來了都會對 HBase 進(jìn)行一次 Get 操作,當(dāng)前數(shù)據(jù)處理完還會對 HBase 進(jìn)行一次 Put 操作,所以每秒需要對 HBase 請求 40 萬次。單個(gè)的 Get 和 Put 請求效率比較低,這里可以優(yōu)化為批量操作的 API 或異步 API 來提高訪問 HBase 的效率。
性能問題優(yōu)化后,再分析這里使用 HBase 去重到底能不能保證 Exactly Once?拿計(jì)算 PV 的案例來講。
假如 PV 信息維護(hù)在 Flink 的狀態(tài)中,通過冪等性將 PV 統(tǒng)計(jì)結(jié)果寫入到 Redis 供其他業(yè)務(wù)方查詢實(shí)時(shí)統(tǒng)計(jì)的 PV 值。如下圖所示,F(xiàn)link 處理完日志 b 后進(jìn)行 Checkpoint,將 PV = 2 和 Kafka 對應(yīng)的 offset 信息保存起來,此時(shí) HBase 表中有兩條 rowkey 分別是 a、b,表示主鍵為 a 和 b 的日志已經(jīng)被處理過了。
接著往后處理,當(dāng)處理完日志 d 以后,PV = 4,HBase 表中有 4 條 rowkey 分別是 a、b、c、d,表示主鍵為 a、b、c、d 的日志已經(jīng)被處理過了。但此時(shí)機(jī)器突然故障,導(dǎo)致 Flink 任務(wù)掛掉,如右圖所示 Flink 任務(wù)會從最近一次成功的 Checkpoint 處恢復(fù)任務(wù),從日志 b 之后的位置開始消費(fèi),且 PV 恢復(fù)為 2,因?yàn)樘幚硗耆罩?b 時(shí) PV 為 2。
但由于 HBase 中的數(shù)據(jù)不是由 Flink 來維護(hù),所以無法恢復(fù)到 Checkpoint 時(shí)的狀態(tài)。所以 Flink 任務(wù)恢復(fù)后,PV = 2 且 HBase 中 rowkey 為 a、b、c、d。此時(shí) Flink 任務(wù)從日志 c 開始繼續(xù)處理數(shù)據(jù),當(dāng)處理日志 c 和 d 時(shí),F(xiàn)link 任務(wù)會先查詢 HBase,發(fā)現(xiàn) HBase 中已經(jīng)保存了主鍵 c 和 d,所以認(rèn)為日志 c 和 d 已經(jīng)被處理了,會將日志 c 和 d 過濾掉,于是就產(chǎn)生了丟數(shù)據(jù)的現(xiàn)象,日志 c 和 d 其實(shí)并沒有參與 PV 的計(jì)算。
日志 c 和 d 已經(jīng)被處理過了,此時(shí)就算從 Checkpoint 處恢復(fù),PV 值也應(yīng)該為 4,不應(yīng)該是 2。請注意上述方案,筆者描述的是 PV 信息維護(hù)在 Flink 的狀態(tài)中,所以從 Checkpoint 處恢復(fù)任務(wù)時(shí),會將 Checkpoint 時(shí)狀態(tài)中保存的 PV 信息恢復(fù),所以恢復(fù)為 2。
當(dāng)然還有其他統(tǒng)計(jì) PV 的方式,不需要將 PV 信息維護(hù)在 Flink 狀態(tài)中,而是僅僅在 Redis 中保存 PV 結(jié)果,每處理一條數(shù)據(jù),將 Redis 中的 PV 值加一即可。如下圖所示,PV 不維護(hù)在狀態(tài)中,所以當(dāng)處理完日志 b 進(jìn)行 Checkpoint 時(shí),只會將當(dāng)前消費(fèi)的 offset 信息維護(hù)起來。處理完日志 d 以后,由于機(jī)器故障,F(xiàn)link 任務(wù)掛掉,任務(wù)依然會從日志 b 之后開始消費(fèi),此時(shí) Redis 中保存的 PV=4,且 HBase 中保存的 rowkey 信息為 a、b、c、d。緊接著開始處理 c 和 d,因?yàn)?HBase 中保存了主鍵 c、d,因此不會重復(fù)處理日志 c、d,因此 PV 值計(jì)算正確,也不會出現(xiàn)重復(fù)消費(fèi)的問題。
這種策略貌似沒有問題,但是問題百出。我們的任務(wù)處理元素 d 需要兩個(gè)操作:
① 將 Redis 中 PV 值加一 ② 將主鍵 id 加入到 HBase
由于 Redis 和 HBase 都不支持事務(wù),所以以上兩個(gè)操作并不能保障原子性。如果代碼中先執(zhí)行步驟 ①,可能會造成 ① 執(zhí)行成功 ② 還未執(zhí)行成功,那么恢復(fù)任務(wù)時(shí) PV=4,HBase 中保存主鍵 a、b、c,此時(shí)日志 d 就會重復(fù)計(jì)算,就會造成 PV 值計(jì)算偏高的問題。如果代碼中先執(zhí)行步驟 ②,可能會造成 ② 執(zhí)行成功 ① 還未執(zhí)行成功,那么恢復(fù)任務(wù)時(shí) PV=3,HBase 中保存主鍵 a、b、c、d,此時(shí)日志 d 就會被漏計(jì)算,就會造成 PV 值計(jì)算偏低的問題。這里只是拿 HBase 舉例而已,上述情況中外部的任何存儲介質(zhì)維護(hù) set 集合都不能保證 Exactly Once,因?yàn)?Flink 從 Checkpoint 處恢復(fù)時(shí),外部存儲介質(zhì)并不能恢復(fù)到 Checkpoint 時(shí)的狀態(tài)。既然外部存儲介質(zhì)不能恢復(fù)到 Checkpoint 時(shí)的狀態(tài),那使用 Flink 內(nèi)置的狀態(tài)后端可以嗎?當(dāng)然可以?。?!
使用 Flink 的 KeyedState 實(shí)現(xiàn)去重
使用 Flink 狀態(tài)來維護(hù) set 集合的優(yōu)勢