項目背景
在推薦系統(tǒng)中消息隊列是廣泛使用的基礎(chǔ)組件,但在我司的推薦系統(tǒng)中有許多成千上萬個實例需要完整消費(fèi)同一個topic的需求,而僅僅因為單機(jī)網(wǎng)絡(luò)帶寬的限制就部署大量的消息隊列server顯然是一種浪費(fèi),所以我司自研了一套客戶端能夠轉(zhuǎn)發(fā)消息的消息隊列服務(wù)BTQueue。
但BTQ在設(shè)計時考慮的比較簡單,在后續(xù)使用中發(fā)現(xiàn)了許多問題,比如BTQ沒有Sequence Number,只能依靠時間戳定位,導(dǎo)致基于它做數(shù)據(jù)同步的存儲模塊無法精準(zhǔn)地控制數(shù)據(jù)一致,而且BTQ server端沒有使用一致性協(xié)議,本身就有一致性風(fēng)險,還存在穩(wěn)定性、內(nèi)存消耗等其它方面的問題,因此現(xiàn)在需要結(jié)合推薦架構(gòu)中的典型數(shù)據(jù)分發(fā)和存儲場景,設(shè)計一套完善的消息隊列組件,目標(biāo)能夠簡化推薦數(shù)據(jù)流-存儲架構(gòu),降低資源開銷。
衍生Topic
除了BTQ的數(shù)據(jù)轉(zhuǎn)發(fā)功能外,新的消息隊列還將支持通過輕量算子衍生新的topic的能力,衍生出的topic不需要多副本存儲,由上游topic消息經(jīng)過算子計算直接獲得,此特性能夠簡化服務(wù)的部署,方便各模塊間解耦,避免重復(fù)計算,解決傳統(tǒng)中間模塊消費(fèi)-轉(zhuǎn)發(fā)過程中轉(zhuǎn)發(fā)失敗難以處理的問題。
架構(gòu)設(shè)計
每個集群會提前分好固定數(shù)目的partition,每個partition有獨(dú)立的Sequence Number,整個集群的元數(shù)據(jù)存在一個元數(shù)據(jù)服務(wù)上(第一版用zookeeper),除了源頭topic,每個衍生topic配置固定的上游topic。
每個topic的所有consumer,會互相轉(zhuǎn)發(fā)消息,每個partition的轉(zhuǎn)發(fā)的拓?fù)涫且粋€多叉樹,由一個獨(dú)立的調(diào)度模塊,統(tǒng)籌全局的轉(zhuǎn)發(fā)拓?fù)洹?/p>
除了獨(dú)立的調(diào)度模塊,整個架構(gòu)的主體都由基于以下框架開發(fā)的實例組成

源頭topic可以支持Kafka、一致性協(xié)議等多種數(shù)據(jù)源,一個源頭topic會啟動多個實例,調(diào)度模塊會將partition均衡調(diào)度到各個實例上,并且支持多個備份容災(zāi)。
下游實例的輸入端設(shè)置為OriginQueue Consumer,并指定好topic,輸出端默認(rèn)會輸出一個與輸入相同的topic,用于轉(zhuǎn)發(fā)流量。使用者可以自行實現(xiàn)多個算子,產(chǎn)出衍生topic,也可以實現(xiàn)本地回調(diào),作為末端消費(fèi)。
流量轉(zhuǎn)發(fā)
調(diào)度策略會讓每個partition由盡量少的節(jié)點轉(zhuǎn)發(fā),并且轉(zhuǎn)發(fā)負(fù)載盡量均衡,節(jié)點不會緩存不負(fù)責(zé)轉(zhuǎn)發(fā)的partition的數(shù)據(jù),這相比BTQ能夠大幅降低內(nèi)存開銷。
現(xiàn)在BTQ遇到過多Consumer由于自身原因產(chǎn)生lag時,會產(chǎn)生大量回源流量,把源頭資源耗盡,這個問題無法從根源上避免,但可以本系統(tǒng)可以從兩點改善:
1.由于每個節(jié)點只轉(zhuǎn)發(fā)緩存少量partition,故而整體上可以緩存更多的數(shù)據(jù),短期的lag可以靠緩存覆蓋,減少回源流量
2.每個topic要限制回源并發(fā)量,優(yōu)先保證健康的consumer的正常消費(fèi),將有l(wèi)ag的消費(fèi)按照Sequence Number、消費(fèi)速率等因素分到不同隊列中,每個隊列從最小Sequence Number從上游拉取數(shù)據(jù),只產(chǎn)生一份回源流量
應(yīng)用舉例-三級存儲架構(gòu)
推薦系統(tǒng)中常見的存儲分為分布式持久化存儲、分布式緩存、本地緩存三種,分布式持久化存儲保存全量數(shù)據(jù),分布式緩存保存熱點數(shù)據(jù),本地緩存保存極熱數(shù)據(jù),但市面上沒有一套分布式存儲系統(tǒng)將這三種角色完美整合。
以我司為例,分布式緩存一般使用redis或者memory cache,分布式緩存和持久化存儲是完全無關(guān)的系統(tǒng),需要使用方自己做數(shù)據(jù)同步;本地緩存現(xiàn)在在我們的推薦架構(gòu)里兩個典型場景是預(yù)估服務(wù)讀取embedding server和推薦服務(wù)讀分布式正排索引。預(yù)估服務(wù)會定期淘汰老的embedding數(shù)據(jù),隨機(jī)抽取一部分訪問更新本地緩存,這種方式的問題是數(shù)據(jù)更新會有比較大的延遲,并且淘汰、請求等操作有一定性能開銷。推薦服務(wù)正排索引的本地緩存,每個實例會消費(fèi)整個BTQ,然后本地做更新和淘汰,這會消費(fèi)大量非熱點數(shù)據(jù)的消息。
但基于OriginQueue的兩點新特性,可以設(shè)計一套統(tǒng)一的分布式存儲系統(tǒng),各存儲實例之間通過消息隊列做數(shù)據(jù)同步,而不必使用方做跨系統(tǒng)的數(shù)據(jù)同步,數(shù)據(jù)一致性大幅改善。由于數(shù)據(jù)源頭是有一致性協(xié)議保證的消息隊列,所以持久化的存儲只要記錄好Sequence Number,就可以不必使用WAL,從而節(jié)約一部分寫io。
大部分場景下,每個調(diào)用實例的熱點數(shù)據(jù)是基本一致的,所以可以由存儲服務(wù)搜集統(tǒng)計信息,篩選出熱點數(shù)據(jù),然后產(chǎn)出一份只有熱點數(shù)據(jù)相關(guān)operate的操作日志,請求方只需要訂閱這部分操作,更新本地緩存即可。這樣本地緩存即能保證實時更新,又不會為非熱點數(shù)據(jù)浪費(fèi)資源。

上圖中,數(shù)據(jù)源是一份按照ItemID分好partition的kafka,一組OriginQueue服務(wù)把它作為輸入端,通過兩個定制的算子,衍生出兩個topic,一個算子篩選出視頻相關(guān)的用戶行為,轉(zhuǎn)換成通用的計數(shù)的KV Operate,另一個算子篩選出直播相關(guān)的用戶行為,轉(zhuǎn)換成通用的計數(shù)的KV Operate,這樣存儲實例只需要消費(fèi)衍生的topic就可以累積數(shù)據(jù),并且各實例之間完全按照Sequence Number對齊。全量的KV operate經(jīng)過熱點過濾模塊過濾后,衍生出新的topic給reader的本地緩存消費(fèi),依靠數(shù)據(jù)轉(zhuǎn)發(fā)的功能,能夠支持上萬個reader同時實時更新本地緩存。
元數(shù)據(jù)服務(wù)
zookeeper是一個廣泛使用并且大家普遍對它不滿的服務(wù),主要問題有:性能差、接口丑、功能少。
由于zk每個集群全局只有一個有序隊列,所以寫入相當(dāng)于是單線程的,但實際使用中,一個zk集群往往會存儲多塊互不相干的數(shù)據(jù),這些數(shù)據(jù)共用一個隊列顯然是不合理的。zk的目錄結(jié)構(gòu)可以看成是一棵樹,現(xiàn)實中許多zk集群,是可以把這顆樹劃分成互不相干的若干棵子樹的,讓這些子樹各自擁有獨(dú)立的操作隊列,這些子樹的祖先節(jié)點,不允許有值,只能創(chuàng)建和刪除。
現(xiàn)在zk的每個watch,是注冊到每個節(jié)點上的,如果需要watch的位置比較多,實現(xiàn)會比較復(fù)雜而且效率不高。其實客戶端也可以作為一個observer,綁定一個子樹,同步操作序列,在本地緩存這個子樹,這樣每個位置的回調(diào)只需要在本地設(shè)置,server端不需要保存一大堆watch信息了,而且使用observer模式就可以避免丟消息的問題。而基于OriginQueue封裝的client,由于自帶流量轉(zhuǎn)發(fā)功能,上萬client同時訂閱大量元數(shù)據(jù)也不會有性能問題。