本文基于TalkingData 張學(xué)敏 在公司內(nèi)部KOL的分享主題《基于Spark、NoSQL實時數(shù)據(jù)處理實踐》的整理,同時也在DTCC大會上做了同主題的分享。
主要介紹了項目的技術(shù)選型、技術(shù)架構(gòu),重點介紹下項目面臨的挑戰(zhàn)和解決辦法,還介紹了面對多維度、多值、多版本等業(yè)務(wù)場景時,使用Bitmap與HBase特性解決問題方法。
共分為上下兩篇,本次發(fā)布上篇,下篇敬請關(guān)注。
一、數(shù)據(jù)相關(guān)情況
項目處理的數(shù)據(jù)主要來源于TalkingData的三條SASS業(yè)務(wù)線,他們主要是為移動應(yīng)用開發(fā)者提供應(yīng)用的統(tǒng)計分析、游戲運營分析以及廣告監(jiān)測等能力。開發(fā)者使用TD的SDK將各種事件數(shù)據(jù)發(fā)送過來,然后再通過SASS平臺使用數(shù)據(jù)。


數(shù)據(jù)主要都和智能設(shè)備相關(guān),包含的數(shù)據(jù)內(nèi)容主要可以分為三部分,一部分是設(shè)備信息類,主要包括設(shè)備ID,比如Mac、IDFA等,還有設(shè)備的軟硬件信息,比如操作系統(tǒng)版本號,屏幕分辨率等。另一部分是業(yè)務(wù)相關(guān)信息類,主要包括業(yè)務(wù)事件,會話信息,還有行為狀態(tài)。關(guān)于行為狀態(tài),是我們在智能設(shè)備上使用算法推測終端持有者的行為狀態(tài)信息,比如靜止、行走、奔跑、乘車等。第三部分是上下文信息,包括設(shè)備連接網(wǎng)絡(luò)的情況,使用的是蜂窩網(wǎng)絡(luò)還是WiFi等,還有設(shè)備位置相關(guān)的信息,以及其他傳感器相關(guān)的數(shù)據(jù)等。

關(guān)于設(shè)備體量,目前設(shè)備日活月活分別在2.5億和6.5億以上,每天的事件數(shù)在370億左右,一天數(shù)據(jù)的存儲量是在17T左右。

上圖為整體的數(shù)據(jù)架構(gòu)圖,數(shù)據(jù)流向是自下往上。數(shù)據(jù)采集層使用的是TalkingData自研的SDK,通過SDK將數(shù)據(jù)發(fā)往數(shù)據(jù)收集層。數(shù)據(jù)收集層使用的是TalkingData自研的DataCollector,Collector會將數(shù)據(jù)發(fā)送到數(shù)據(jù)接入層的Kafka。每個業(yè)務(wù)線都有自己的Kafka集群,在Collector可以控制數(shù)據(jù)的流向,大多數(shù)據(jù)都是業(yè)務(wù)線一份,數(shù)據(jù)中心一份。數(shù)據(jù)處理層有兩部分,一部分是使用Spark core或sql的離線計算。其中Spark是on yarn模式,使用yarn進行資源管理,中間通過Alluxio進行加速,使用Jenkins進行作業(yè)管理和調(diào)度,主要負(fù)責(zé)為業(yè)務(wù)方提供數(shù)據(jù)集和數(shù)據(jù)服務(wù)。
另一部分是使用Spark Streaming的實時計算,主要是為TalkingData管理層提供運營數(shù)據(jù)報表。數(shù)據(jù)存儲層,主要功能是存放數(shù)據(jù)處理后的結(jié)果,使用分布式文件系統(tǒng)HDFS、Alluxio存放數(shù)據(jù)集,使用分布式數(shù)據(jù)庫HBase、ScyllaDB,關(guān)系型數(shù)據(jù)庫MySQL以及MPP型數(shù)據(jù)庫GreenPlum存放服務(wù)相關(guān)的數(shù)據(jù)。數(shù)據(jù)應(yīng)用層?xùn)|西就比較多了,有供TalkingData內(nèi)部使用的數(shù)據(jù)分析、探索平臺,也有對外內(nèi)外都可的數(shù)據(jù)服務(wù)、數(shù)據(jù)模型商城,以及智能營銷云、觀象臺等。
二、項目面臨的業(yè)務(wù)訴求

主要的可總結(jié)為四部分:
首先是數(shù)據(jù)修正:離線計算是將數(shù)據(jù)存放在了HDFS上,如果數(shù)據(jù)有延遲,比如事件時間是昨天的數(shù)據(jù)今天才到,那么數(shù)據(jù)將會被錯誤的存放在今天的時間分區(qū)內(nèi)。因為HDFS不支持隨機讀寫,也不好預(yù)測數(shù)據(jù)會延遲多久,所以在離線計算想要完全修正這些數(shù)據(jù),成本還是比較高的。
其次是時序數(shù)據(jù)需求:之前的業(yè)務(wù)都是以小時、天、周、月等時間周期,面向時間斷面? 的宏觀數(shù)據(jù)分析,隨著公司業(yè)務(wù)擴展,比如營銷、風(fēng)控等行業(yè),面向個體的微觀數(shù)據(jù)分析的需求越來越多,所以需要能夠低成本的把一個設(shè)備的相關(guān)的數(shù)據(jù)都取出來做分析。而面向時間斷面的數(shù)據(jù)每天十幾T,想從中抽出某些設(shè)備近1個月的數(shù)據(jù)就會涉及到500多T的數(shù)據(jù)。所以需要建立時序數(shù)據(jù)處理、查詢的能力,能方便的獲取設(shè)備歷史上所有數(shù)據(jù)。
第三是實時處理:離線計算少則延遲一個小時,多則一天或者更久,而有些行業(yè)對數(shù)據(jù)時效性要求是比較高的,比如金融、風(fēng)控等業(yè)務(wù),所以需要實時數(shù)據(jù)處理。同時,為了更多的豐富設(shè)備位置相關(guān)數(shù)據(jù),我們還建立了WiFi、基站等實體的位置庫,所以在實時數(shù)據(jù)處理時,需要實時讀取這些庫為那些連接了WiFi、基站但沒位置數(shù)據(jù)的設(shè)備補充位置相關(guān)信息。
第四是實時查詢,這里描述的是面向?qū)嶓w、多維度、多值、多版本,接下來我詳細(xì)介紹下。

我們將事件數(shù)據(jù)抽象出了各種實體,比如設(shè)備、位置、WiFi基站等實體,其中位置實體可以使用GeoHash或者網(wǎng)格表達(dá)。每個實體都有唯一ID以及多個維度信息,以設(shè)備實體為例,包括ID、軟硬件信息等維度。單個維度又可能會包含多個值,比如WiFi,在家我連接的是WiFi1,到公司鏈接的是WiFi2,所以WiFi維度有WiFi1和WiFi2兩個值。單個值又可能有多個時間版本,比如我在家連接WiFi1可能6點被捕獲到一次,7點被捕獲到兩次。所以,最終建立可以通過指定實體ID,查詢維度、列及時間窗口獲取數(shù)據(jù)的能力。
三、技術(shù)選型和架構(gòu)

數(shù)據(jù)接入層我們選擇的是Kafka,Kafka在大數(shù)據(jù)技術(shù)圈里出鏡率還是比較高的。Kafka是LinkedIn在2011年開源的,創(chuàng)建初衷是解決系統(tǒng)間消息傳遞的問題。傳統(tǒng)消息系統(tǒng)有兩種模型,一種是隊列模型,一種是訂閱發(fā)布模型。兩者各有優(yōu)缺,比如隊列模型的消息系統(tǒng)可以支持多個客戶端同時消費不同的數(shù)據(jù),也就是可以很方便的擴展消費端的能力,但訂閱發(fā)布模型就不好擴展,因為它是使用的廣播模式。另一個就是,隊列模型的消息只能被消費一次,一旦一個消息被某個消費者處理了,其他消費者將不能消費到該消息,而發(fā)布訂閱模型同一消息可以被所有消費者消費到。Kafka使用Topic分類數(shù)據(jù),一個Topic類似一個消息隊列。Kafka還有個概念,叫consumer?group,一個group里可以有多個消費者,同一個topic可以被一個group內(nèi)的多個消費者同時消費不同的消息,也就是類似隊列模型可以方便的擴展消費端能力。一個Topic也可以被多個group消費,group之間相互沒有影響,也就是類似發(fā)布訂閱模型,Topic中的一條消息可以被消費多次。所以Kafka等于說是使用Topic和Consumer?group等概念,將隊列模型和訂閱發(fā)布模型的優(yōu)勢都糅合了進來。
現(xiàn)在Kafka官方將Kafka的介紹做了調(diào)整,不再滿足大家簡單的將其定位為消息隊列,新的介紹描述是:可以被用來創(chuàng)建實時數(shù)據(jù)管道和流式應(yīng)用,且具有可擴展、高容錯,高吞吐等優(yōu)勢。另外,經(jīng)過7年的發(fā)展,kafka也比較成熟了,與周邊其他組件可以很方便的集成。但目前也有兩個比較明顯的劣勢,一個是不能保證Topic級別的數(shù)據(jù)有序,另一個是開源的管理工具不夠完善。

Spark現(xiàn)在聽起來不像前幾年那么性感了,但因為我們離線計算使用的Spark,有一定的技術(shù)積累,所以上手比較快。另外,Spark Streaming并不是真正意義上的流式處理,而是微批,相比Storm、Flink延遲還是比較高的,但目前也能完全滿足業(yè)務(wù)需求,另外,為了技術(shù)統(tǒng)一,資源管理和調(diào)度統(tǒng)一,所以我們最終選用了Spark Streaming。
Spark Streaming是Spark核心API的擴展,可實現(xiàn)高擴展、高吞吐、高容錯的實時流數(shù)據(jù)處理應(yīng)用。支持從Kafka、Flum、HDFS、S3等多種數(shù)據(jù)源獲取數(shù)據(jù),并根據(jù)一定的時間間隔拆分成一批批的數(shù)據(jù),然后可以使用map、reduce、join、window等高級函數(shù)或者使用SQL進行復(fù)雜的數(shù)據(jù)處理,最終得到處理后的一批批結(jié)果數(shù)據(jù),其還可以方便的將處理結(jié)果存放到文件系統(tǒng)、數(shù)據(jù)庫或者儀表盤,功能還是很完善的。
Spark Streaming將處理的數(shù)據(jù)流抽象為Dstream,DStream本質(zhì)上表示RDD的序列,所以任何對DStream的操作都會轉(zhuǎn)變?yōu)閷Φ讓覴DD的操作。

HBase是以分布式文件系統(tǒng)HDSF為底層存儲的分布式列式數(shù)據(jù)庫,它是對Google BigTable開源的實現(xiàn),主要解決超大規(guī)模數(shù)據(jù)集的實時讀寫、隨機訪問的問題,并且具有可擴展、高吞吐、高容錯等優(yōu)點。HBase這些優(yōu)點取決于其架構(gòu)和數(shù)據(jù)結(jié)構(gòu)的設(shè)計,他的數(shù)據(jù)寫入并不是直接寫入文件,當(dāng)然HDFS不支持隨機寫入,而是先寫入被稱作MemStore的內(nèi)存,然后再異步刷寫至HDFS,等于是將隨機寫入轉(zhuǎn)換成了順序?qū)?,所以大多時候?qū)懭胨俣雀卟⑶液芊€(wěn)定。
?而讀數(shù)據(jù)快,是使用字典有序的主鍵RowKey通過Zookeeper先定位到數(shù)據(jù)可能所在的RegionServer,然后先查找RegionServer的讀緩存BlockCache,如果沒找到會再查MemStore,只有這兩個地方都找不到時,才會加載HDFS中的內(nèi)容,但因為其使用了LSM樹型結(jié)構(gòu),所以讀取耗時一般也不長。還有就是,HBase還可以使用布隆過濾器通過判存提高查詢速度。
HBase的數(shù)據(jù)模型也很有意思,跟關(guān)系型數(shù)據(jù)庫類似,也有表的概念,也是有行有列的二維表。和關(guān)系型數(shù)據(jù)庫不一樣一個地方是他有ColumnFamily的概念,并且一個ColumnFamily下可以有很多個列,這些列在建表時不用聲明,而是在寫入數(shù)據(jù)時確定,也就是所謂的Free Schema。
HBase的缺點一個是運維成本相對較高,像compact、split、flush等問題處理起來都是比較棘手的,都需要不定期的投入時間做調(diào)優(yōu)。還有個缺點是延遲不穩(wěn)定,影響原因除了其copmact、flush外還有JVM的GC以及緩存命中情況。

ScyllaDB算是個新秀,可以與Cassandra對比了解,其實它就是用C++重寫的Cassandra,客戶端完全與Cassandra兼容,其官網(wǎng)Benchmark對標(biāo)的也是Cassandra,性能有10倍以上的提升,單節(jié)點也可以每秒可以處理100萬TPS,整體性能還是比較喜人的。與HBase、Cassandra一樣也有可擴展、高吞吐、高容錯的特點,另外他的延遲也比較低,并且比較穩(wěn)定。
他和Cassandra與HBase都可以以做到CAP理論里的P,即保證分區(qū)容忍性,也就是在某個或者某些節(jié)點出現(xiàn)網(wǎng)絡(luò)故障或者系統(tǒng)故障時候,不會影響到整個DataBase的使用。而他倆與HBase不一樣的一個地方在于分區(qū)容忍性包證的情況下,一致性與高可用的取舍,也就是CAP理論里,在P一定時C與A的選擇。HBase選擇的是C,即強一致性,比如在region failover 及后續(xù)工作完成前,涉及的region的數(shù)據(jù)是不能讀取的,而ScyllaDB、Cassandra選擇的A,即高可用的,但有些情況下數(shù)據(jù)可能會不一致。所以,選型時需要根據(jù)業(yè)務(wù)場景來定。
ScyllaDB的劣勢也比較明顯,就是項目比較新,Bug和使用的坑比較多, 我在這里就不一一去說了。

前面分別簡單介紹了選定的技術(shù)組件,及他們的優(yōu)缺點,最終項目整體架構(gòu)如上圖所示,數(shù)據(jù)流向用灰色箭頭代表,數(shù)據(jù)采集和收集都與離線計算一樣,不同的是在Spark Streaming從Kafka消費數(shù)據(jù)時,會同時實時從ScyllaDB讀取wifi、基站定位庫的數(shù)據(jù)參與位置補充的計算,然后將處理的結(jié)果數(shù)據(jù)寫入HBase。再往下類似Lambda架構(gòu),會對HBase中的數(shù)據(jù)離線做進一步的處理,然后再將數(shù)據(jù)離線通過Bulkload方式寫入HBase,關(guān)于其中的Bitmap應(yīng)用,后邊再聊。
架構(gòu)右邊部分是服務(wù)相關(guān)的,首先是中間件,主要屏蔽了異構(gòu)數(shù)據(jù)庫對應(yīng)用層服務(wù)的影響,再往上是規(guī)則引擎服務(wù),因為我們上線在SDMK的應(yīng)用服務(wù)有100多個,導(dǎo)致服務(wù)管理成本很高,并且也不利于物理資源的合理運用,所以上線了規(guī)則引擎服務(wù),將所有服務(wù)的業(yè)務(wù)邏輯都通過規(guī)則表達(dá),這樣上線新服務(wù)就不需要重新申請服務(wù)器,只需要添加一條規(guī)則即可。等于是就將一百多個服務(wù)轉(zhuǎn)換成了一個服務(wù),當(dāng)規(guī)則引擎負(fù)載較高時或者大幅降低后,可以很方便的進行資源的擴充和減少。SDMK是TalkingData研發(fā)的類似淘寶的交易平臺,公司內(nèi)、外的數(shù)據(jù)服務(wù)、數(shù)據(jù)模型都可以像商品一樣在上面進行售賣。