揭秘大眾點(diǎn)評的大數(shù)據(jù)實(shí)時(shí)計(jì)算

實(shí)時(shí)計(jì)算在點(diǎn)評的使用場景

類別一:Dashboard、實(shí)時(shí)DAU、新激活用戶數(shù)、實(shí)時(shí)交易額等

???Dashboard類:北斗(報(bào)表平臺)、微信(公眾號)和云圖(流量分析)等

???實(shí)時(shí)DAU:包括主APP(Android/iPhone/iPad)、團(tuán)APP、周邊快查、PC、M站

???新激活用戶數(shù):主APP

???實(shí)時(shí)交易額:閃惠/團(tuán)購交易額

以報(bào)表平臺為例,下圖是一張APP UV的實(shí)時(shí)曲線圖,它以分鐘級別粒度展現(xiàn)了 實(shí)時(shí)的DAU數(shù)據(jù)和曲線。

從圖中可以看見一個(gè)尖點(diǎn),這個(gè)尖點(diǎn)就是當(dāng)天push過后帶來的用戶,這樣可以看到實(shí)時(shí)的運(yùn)營效率。

類別二:搜索、推薦、安全等

以搜索為例:用戶在點(diǎn)評的每一步有價(jià)值的操作(包括:搜索、點(diǎn)擊、瀏覽、購買、收藏等),都將實(shí)時(shí)、智能地影響搜索結(jié)果排序,從而顯著提升用戶搜索體驗(yàn)、搜索轉(zhuǎn)化率。

某用戶 搜索“ 火鍋 ”,當(dāng)他 在搜索結(jié)果頁 點(diǎn)擊了“ 重慶高老九火鍋 ”后, 再次刷新搜索結(jié)果列表時(shí),該商戶的排序就會提升到頂部 。

再結(jié)合其他的一些實(shí)時(shí)反饋的個(gè)性化推薦策略,最終使團(tuán)購的交易額有了明顯的增加,轉(zhuǎn)化率提升了2個(gè)多點(diǎn)。

實(shí)時(shí)計(jì)算在業(yè)界的使用場景

場景1:阿里JStorm

???雙11實(shí)時(shí)交易數(shù)據(jù)

場景2:360Storm

???搶票軟件驗(yàn)證碼自動(dòng)識別:大家用360瀏覽器在12306上買票的時(shí)候,驗(yàn)證碼自動(dòng)識別是在Storm上計(jì)算完成的。

???網(wǎng)盤圖片縮略圖生成:360網(wǎng)盤的縮略圖也是實(shí)時(shí)生成出來的,這樣可以節(jié)約大量的文件數(shù)量和存儲空間。

???實(shí)時(shí)入侵檢測

???搜索熱詞推薦

場景3:騰訊TDProcess

分布式K/V存儲引擎TDEngine和支持?jǐn)?shù)據(jù)流計(jì)算的TDProcess,TDProcess是基于Storm的計(jì)算引擎,提供了通用的計(jì)算模型,如Sum、Count、PV/UV計(jì)算和TopK統(tǒng)計(jì)等。

場景4:京東Samza

整個(gè)業(yè)務(wù)主要應(yīng)用訂單處理,實(shí)時(shí)分析統(tǒng)計(jì)出待定區(qū)域中訂單各個(gè)狀態(tài)的量:待定位、待派工、待揀貨、待發(fā)貨、待配送、待妥投等。

點(diǎn)評如何構(gòu)建實(shí)時(shí)計(jì)算平臺

點(diǎn)評的實(shí)時(shí)計(jì)算平臺是一個(gè)端到端的方案,從下面的平臺架構(gòu)圖,可以看出整體架構(gòu)是一個(gè)比較長的過程,包括了數(shù)據(jù)源、數(shù)據(jù)的傳輸通道、計(jì)算、存儲和對外服務(wù)等。

實(shí)時(shí)計(jì)算平臺首先解決的問題是,數(shù)據(jù)怎么獲取,如何拿到那些數(shù)據(jù)。現(xiàn)在做到了幾乎所有點(diǎn)評線上產(chǎn)生的數(shù)據(jù)都可以毫秒級拿到,封裝對應(yīng)的數(shù)據(jù)輸入源Spout。

通過Blackhole支持日志類實(shí)時(shí)獲取,包括打點(diǎn)日志、業(yè)務(wù)Log、Nginx日志等。 整合Puma Client第一時(shí)間獲取數(shù)據(jù)庫數(shù)據(jù)變更。整合Swallow獲取應(yīng)用消息。Blackhole是團(tuán)隊(duì)開發(fā)的類Kafka系統(tǒng),主要目標(biāo)是批量從業(yè)務(wù)方拉取日志時(shí)做到數(shù)據(jù)的完整性和一致性,然后也提供了實(shí)時(shí)的消費(fèi)能力。Puma是以MySQL binlog為基礎(chǔ)開發(fā)的,這樣可以實(shí)時(shí)拿到數(shù)據(jù)庫的update、delete、insert操作。?

Swallow是點(diǎn)評的MQ系統(tǒng)。通過整合各種傳輸通道,并且封裝相應(yīng)的Spout,做業(yè)務(wù)開發(fā)的同學(xué)就完全不用關(guān)心數(shù)據(jù)怎樣可靠獲取,只需要寫自己的業(yè)務(wù)邏輯就可以了。解決了數(shù)據(jù)和傳輸問題后,計(jì)算過程則在Storm中完成。

如果在Storm計(jì)算過程中或計(jì)算出結(jié)果后,需要與外部存儲系統(tǒng)交互,也提供了一個(gè)data-service服務(wù) ,通過點(diǎn)評的RPC框架提供接口,用戶不用關(guān)心實(shí)際Redis/HBase這些系統(tǒng)的細(xì)節(jié)和部署情況, 以及這個(gè)數(shù)據(jù)到底是在Redis還是HBase中的,可以根據(jù)SLA來做自動(dòng)切換;

同時(shí)計(jì)算的結(jié)果也是通過data-service服務(wù),再反饋到線上系統(tǒng)。就拿剛剛搜索結(jié)果的例子,搜索業(yè)務(wù)在用戶再次搜索的時(shí)候會根據(jù)userId請求一次data-service,然后拿到這個(gè)用戶的最近瀏覽記錄,并重新排序結(jié)果,返回給用戶。

這樣的好處就是實(shí)時(shí)計(jì)算業(yè)務(wù)和線上其他業(yè)務(wù)完全解耦,實(shí)時(shí)計(jì)算這邊出現(xiàn)問題,不會導(dǎo)致線上業(yè)務(wù)出現(xiàn)問題。

Storm基礎(chǔ)知識簡單介紹

Apache Storm( http://storm.apache.org/)是由Twitter開源的分布式實(shí)時(shí)計(jì)算系統(tǒng)。Storm可以非常容易、可靠地處理無限的數(shù)據(jù)流。對比Hadoop的批處理,Storm是個(gè)實(shí)時(shí)的、分布式以及具備高容錯(cuò)的計(jì)算系統(tǒng)。Storm可以使用何編程語言進(jìn)行開發(fā)。

Storm的集群表面上看和Hadoop的集群非常像,但是在Hadoop上面運(yùn)行的是MapReduce的Job,而在Storm上面運(yùn)行的是Topology。

Storm和Hadoop一個(gè)非常關(guān)鍵的區(qū)別是Hadoop的MapReduce Job最終會結(jié)束,而Storm的Topology會一直運(yùn)行(除非顯式地殺掉)。

Storm基本概念:

Nimbus和Supervisor之間的通訊是依靠ZooKeeper來完成,并且Nimbus進(jìn)程和Supervisor都是快速失敗(fail-fast)和無狀態(tài)的。可以用kill-9來殺死Nimbus和Supervisor進(jìn)程,然后再重啟它們,它們可以繼續(xù)工作。

在Storm中,Spout是Topology中產(chǎn)生源數(shù)據(jù)流的組件。通常Spout獲取從Kafka、MQ等的數(shù)據(jù),然后調(diào)用nextTuple函數(shù),發(fā)射數(shù)據(jù)出去供Bolt消費(fèi)。

圖中的Spout就發(fā)射出去了兩條數(shù)據(jù)流。而Bolt是在Topology中接受Spout的數(shù)據(jù),然后執(zhí)行處理的組件。Bolt在接收到消息后會調(diào)用execute函數(shù),用戶可以在其中執(zhí)行自己想要的操作。

為什么用Storm呢,因?yàn)镾torm有它的優(yōu)點(diǎn):

易用性

只要遵守Topology,Spout,Bolt的編程規(guī)范即可開發(fā)出一個(gè)擴(kuò)展性極好的應(yīng)用,像底層RPC,Worker之間冗余,數(shù)據(jù)分流之類的操作,開發(fā)者完全不用考慮。

擴(kuò)展性

當(dāng)某一級處理單元速度不夠時(shí),直接配置一下并發(fā)數(shù),即可線性擴(kuò)展性能。

健壯性

當(dāng)Worker失效或機(jī)器出現(xiàn)故障時(shí), 自動(dòng)分配新的Worker替換失效Worker。

準(zhǔn)確性

采用Acker機(jī)制,保證數(shù)據(jù)不丟失。采用事務(wù)機(jī)制,保證數(shù)據(jù)準(zhǔn)確性。剛剛介紹了一些Storm的基礎(chǔ)概念和特性,再用一張比較完整的圖來回顧一下整個(gè)Storm的體系架構(gòu):

Storm提交一個(gè)作業(yè)的時(shí)候,是通過Thrift的Client執(zhí)行相應(yīng)的命令來完成。Nimbus針對該Topology建立本地的目錄,Nimbus中的調(diào)度器根據(jù)Topology的配置計(jì)算Task,并把Task分配到不同的Worker上,調(diào)度的結(jié)果寫入Zookeeper中。

Zookeeper上建立assignments節(jié)點(diǎn),存儲Task和Supervisor中Worker的對應(yīng)關(guān)系。在Zookeeper上創(chuàng)建workerbeats節(jié)點(diǎn)來監(jiān)控Worker的心跳。Supervisor去Zookeeper上獲取分配的Tasks信息,啟動(dòng)一個(gè)或者多個(gè)Worker來執(zhí)行。

每個(gè)Worker上運(yùn)行多個(gè)Task,Task由Executor來具體執(zhí)行。Worker根據(jù)Topology信息初始化建立Task之間的連接,相同Worker內(nèi)的Task通過DisrupterQueue來通信,不同Worker間默認(rèn)采用Netty來通信,然后整個(gè)Topology就運(yùn)行起來了。

如何保證業(yè)務(wù)運(yùn)行可靠性

首先Storm自身有很多容錯(cuò)機(jī)制,也加了很多監(jiān)控信息,方便業(yè)務(wù)同學(xué)監(jiān)控自己的業(yè)務(wù)狀態(tài)。

在Storm上,遇到的一個(gè)很基本的問題就是,各個(gè)業(yè)務(wù)是運(yùn)行的Worker會跑在同一臺物理機(jī)上。曾經(jīng)有位同學(xué)就在自己的Worker中起了200多個(gè)線程來處理json,結(jié)果就是這臺機(jī)器的CPU都被他的Worker吃光了,其他的業(yè)務(wù)也跟著倒霉。

因此也使用CGroup做了每個(gè)Worker的資源隔離,主要限制了CPU和Memory的使用。相對而言JStorm在很多方面要完善一些,JStorm自己就帶資源隔離。對應(yīng)監(jiān)控來說,基本的主機(jī)維度的監(jiān)控在ganglia上可以看見,比如現(xiàn)在集群的運(yùn)行狀況。下圖是現(xiàn)在此時(shí)的集群的網(wǎng)絡(luò)和負(fù)載:

這些信息并不能保證業(yè)務(wù)就OK,因此將Storm上的很多監(jiān)控信息和點(diǎn)評的開源監(jiān)控系統(tǒng)Cat集成在了一起,從Cat上可以看見更多的業(yè)務(wù)運(yùn)行狀態(tài)信息。

比如在Cat中我可以看見整個(gè)集群的TPS,現(xiàn)在已經(jīng)從30多萬降下來了。 然后我可以設(shè)置若干的報(bào)警規(guī)則, 如:連續(xù)N分鐘降低了50%可以報(bào)警。然后也監(jiān)控了各個(gè)業(yè)務(wù)Topology的TPS、Spout輸入、Storm的可用Slot等的變化。

這個(gè)圖就是某個(gè)業(yè)務(wù)的TPS信息, 如果TPS同比或者環(huán)比出現(xiàn)問題,也可以報(bào)警給業(yè)務(wù)方。

Storm使用經(jīng)驗(yàn)分享

1.使用組件的并行度代替線程池

Storm自身是一個(gè)分布式、多線程的框架,對每個(gè)Spout和Bolt,都可以設(shè)置其并發(fā)度;它也支持通過rebalance命令來動(dòng)態(tài)調(diào)整并發(fā)度,把負(fù)載分?jǐn)偟蕉鄠€(gè)Worker上。

如果自己在組件內(nèi)部采用線程池做一些計(jì)算密集型的任務(wù),比如JSON解析,有可能使得某些組件的資源消耗特別高,其他組件又很低,導(dǎo)致Worker之間資源消耗不均衡,這種情況在組件并行度比較低的時(shí)候更明顯。

比如某個(gè)Bolt設(shè)置了1個(gè)并行度,但在Bolt中又啟動(dòng)了線程池,這樣導(dǎo)致的一種后果就是,集群中分配了這個(gè)Bolt的Worker進(jìn)程可能會把機(jī)器的資源都給消耗光了,影響到其他Topology在這臺機(jī)器上的任務(wù)的運(yùn)行。如果真有計(jì)算密集型的任務(wù),可以把組件的并發(fā)度設(shè)大,Worker的數(shù)量也相應(yīng)提高,讓計(jì)算分配到多個(gè)節(jié)點(diǎn)上。

為了避免某個(gè)Topology的某些組件把整個(gè)機(jī)器的資源都消耗光的情況,除了不在組件內(nèi)部啟動(dòng)線程池來做計(jì)算以外,也可以通過CGroup控制每個(gè)Worker的資源使用量。

2.不要用DRPC批量處理大數(shù)據(jù)

RPC提供了應(yīng)用程序和StormTopology之間交互的接口,可供其他應(yīng)用直接調(diào)用,使用Storm的并發(fā)性來處理數(shù)據(jù),然后將結(jié)果返回給調(diào)用的客戶端。這種方式在數(shù)據(jù)量不大的情況下,通常不會有問題,而當(dāng)需要處理批量大數(shù)據(jù)的時(shí)候,問題就比較明顯了。

(1)處理數(shù)據(jù)的Topology在超時(shí)之前可能無法返回計(jì)算的結(jié)果。

(2)批量處理數(shù)據(jù),可能使得集群的負(fù)載短暫偏高,處理完畢后,又降低回來,負(fù)載均衡性差。

批量處理大數(shù)據(jù)不是Storm設(shè)計(jì)的初衷,Storm考慮的 是時(shí)效性和批量之間的均衡,更多地看中前者。需要準(zhǔn)實(shí)時(shí)地處理大數(shù)據(jù)量,可以考慮Spark Stream等批量框架。

3.不要在Spout中處理耗時(shí)的操作

Spout中nextTuple方法會發(fā)射數(shù)據(jù)流,在啟用Ack的情況下,fail方法和ack方法會被觸發(fā)。

需要明確一點(diǎn),在Storm中Spout是單線程(JStorm的Spout分了3個(gè)線程,分別執(zhí)行nextTuple方法、fail方法和ack方法)。如果nextTuple方法非常耗時(shí),某個(gè)消息被成功執(zhí)行完畢后,Acker會給Spout發(fā)送消息,Spout若無法及時(shí)消費(fèi),可能造成ACK消息超時(shí)后被丟棄,然后Spout反而認(rèn)為這個(gè)消息執(zhí)行失敗了,造成邏輯錯(cuò)誤。反之若fail方法或者ack方法的操作耗時(shí)較多,則會影響Spout發(fā)射數(shù)據(jù)的量,造成Topology吞吐量降低。

4.注意fieldsGrouping的數(shù)據(jù)均衡性

fieldsGrouping是根據(jù)一個(gè)或者多個(gè)Field對數(shù)據(jù)進(jìn)行分組,不同的目標(biāo)Task收到不同的數(shù)據(jù),而同一個(gè)Task收到的數(shù)據(jù)會相同。

假設(shè)某個(gè)Bolt根據(jù)用戶ID對數(shù)據(jù)進(jìn)行fieldsGrouping,如果某一些用戶的數(shù)據(jù)特別多,而另外一些用戶的數(shù)據(jù)又比較少,那么就可能使得下一級處理Bolt收到的數(shù)據(jù)不均衡,整個(gè)處理的性能就會受制于某些數(shù)據(jù)量大的節(jié)點(diǎn)。可以加入更多的分組條件或者更換分組策略,使得數(shù)據(jù)具有均衡性。

5.優(yōu)先使用localOrShuffleGrouping

localOrShuffleGrouping是指如果目標(biāo)Bolt中的一個(gè)或者多個(gè)Task和當(dāng)前產(chǎn)生數(shù)據(jù)的Task在同一個(gè)Worker進(jìn)程里面,那么就走內(nèi)部的線程間通信,將Tuple直接發(fā)給在當(dāng)前Worker進(jìn)程的目的Task。否則,同shuffleGrouping。

localOrShuffleGrouping的數(shù)據(jù)傳輸性能優(yōu)于shuffleGrouping,因?yàn)樵赪orker內(nèi)部傳輸,只需要通過Disruptor隊(duì)列就可以完成,沒有網(wǎng)絡(luò)開銷和序列化開銷。因此在數(shù)據(jù)處理的復(fù)雜度不高,而網(wǎng)絡(luò)開銷和序列化開銷占主要地位的情況下,可以優(yōu)先使用localOrShuffleGrouping來代替shuffleGrouping。

6.設(shè)置合理的MaxSpoutPending值

在啟用Ack的情況下,Spout中有個(gè)RotatingMap用來保存Spout已經(jīng)發(fā)送出去,但還沒有等到Ack結(jié)果的消息。RotatingMap的最大個(gè)數(shù)是有限制的,為p*num-tasks。其中p是topology.max.spout.pending值,也就是MaxSpoutPending(也可以由TopologyBuilder在setSpout通過setMaxSpoutPending方法來設(shè)定),num-tasks是Spout的Task數(shù)。如果不設(shè)置MaxSpoutPending的大小或者設(shè)置得太大,可能消耗掉過多的內(nèi)存導(dǎo)致內(nèi)存溢出,設(shè)置太小則會影響Spout發(fā)射Tuple的速度。

7.設(shè)置合理的Worker數(shù)

Worker數(shù)越多,性能越好?先看一張Worker數(shù)量和吞吐量對比的曲線(來源于JStorm文檔:

https://github.com/alibaba/jstorm/tree/master/docs/ 0.9.4.1jstorm性能測試.docx)。

從圖可以看出,在12個(gè)Worker的情況下,吞吐量最大,整體性能最優(yōu)。這是由于一方面,每新增加一個(gè)Worker進(jìn)程,都會將一些原本線程間的內(nèi)存通信變?yōu)檫M(jìn)程間的網(wǎng)絡(luò)通信,這些進(jìn)程間的網(wǎng)絡(luò)通信還需要進(jìn)行序列化與反序列化操作,這些降低了吞吐率。

另一方面,每新增加一個(gè)Worker進(jìn)程,都會額外地增加多個(gè)線程(Netty發(fā)送和接收線程、心跳線程、SystemBolt線程以及其他系統(tǒng)組件對應(yīng)的線程等),這些線程切換消耗了不少CPU,sys系統(tǒng)CPU消耗占比增加,在CPU總使用率受限的情況下,降低了業(yè)務(wù)線程的使用效率。

8.平衡吞吐量和時(shí)效性

Storm的數(shù)據(jù)傳輸默認(rèn)使用Netty。在數(shù)據(jù)傳輸性能方面,有如下的參數(shù)可以調(diào)整:

storm.messaging.netty.server_worker_threads和storm.messaging.netty.client_worker_threads分別為接收消息線程和發(fā)送消息線程的數(shù)量。

netty.transfer.batch.size是指每次 Netty Client向 Netty Server發(fā)送的數(shù)據(jù)的大小,如果需要發(fā)送的Tuple消息大于netty.transfer.batch.size,則Tuple消息會按照netty.transfer.batch.size進(jìn)行切分,然后多次發(fā)送。

storm.messaging.netty.buffer_size為每次批量發(fā)送的Tuple序列化之后的TaskMessage消息的大storm.messaging.netty.flush.check.interval.ms表示當(dāng)有TaskMessage需要發(fā)送的時(shí)候, Netty Client檢查可以發(fā)送數(shù)據(jù)的頻率。

降低storm.messaging.netty.flush.check.interval.ms的值,可以提高時(shí)效性。增加netty.transfer.batch.size和storm.messaging.netty.buffer_size的值,可以提升網(wǎng)絡(luò)傳輸?shù)耐峦塘?,使得網(wǎng)絡(luò)的有效載荷提升(減少TCP包的數(shù)量,并且TCP包中的有效數(shù)據(jù)量增加),通常時(shí)效性就會降低一些。因此需要根據(jù)自身的業(yè)務(wù)情況,合理在吞吐量和時(shí)效性直接的平衡。

除了這些參數(shù),怎么找到Storm中性能的瓶頸,可以通過如下的一些途徑來進(jìn)行:

在Storm的UI中,對每個(gè)Topology都提供了相應(yīng)的統(tǒng)計(jì)信息,其中有3個(gè)參數(shù)對性能來說參考意義比較明顯,包括Execute latency、Process latency和Capacity。

分別看一下這3個(gè)參數(shù)的含義和作用。

(1)Execute latency:消息的平均處理時(shí)間,單位為毫秒。

(2)Process latency:消息從收到到被ack掉所花的時(shí)間,單位為毫秒。如果沒有啟用Acker機(jī)制,那么Process latency的值為0。

(3)Capacity:計(jì)算公式為Capacity = Bolt或者Executor調(diào)用execute方法處理的消息數(shù)量 * 消息平均執(zhí)行時(shí)間 /時(shí)間區(qū)間。這個(gè)值越接近1,說明Bolt或者Executor基本一直在調(diào)用execute方法,因此并行度不夠,需要擴(kuò)展這個(gè)組件的Executor數(shù)量。為了在Storm中達(dá)到高性能,在設(shè)計(jì)和開發(fā)Topology的時(shí)候,需要注意以下原則。

(1)模塊和模塊之間解耦,模塊之間的層次清晰,每個(gè)模塊可以獨(dú)立擴(kuò)展,并且符合流水線的原則。

(2)無狀態(tài)設(shè)計(jì),無鎖設(shè)計(jì),水平擴(kuò)展支持。

(3)為了達(dá)到高的吞吐量,延遲會加大;為了低延遲,吞吐量可能降低,需要在二者之間平衡。

(4)性能的瓶頸永遠(yuǎn)在熱點(diǎn),解決熱點(diǎn)問題。

(5)優(yōu)化的前提是測量,而不是主觀臆測。收集相關(guān)數(shù)據(jù),再動(dòng)手,事半功倍。

關(guān)于計(jì)算框架的后續(xù)問題

目前Hadoop/Hive專注于離線分析業(yè)務(wù),每天點(diǎn)評有1.6萬個(gè)離線分析任務(wù)。Storm專注于實(shí)時(shí)業(yè)務(wù),實(shí)時(shí)每天會處理100億+條的數(shù)據(jù)。

在這兩個(gè)框架目前有很大的gap,一個(gè)是天級別,一個(gè)是秒級別,然后有大量的業(yè)務(wù)是準(zhǔn)實(shí)時(shí)的,比如分鐘級別。因此會使用Spark來做中間的補(bǔ)充。

Spark Streaming + Spark SQL也能夠降低很大的開發(fā)難度。相對而言,目前Storm的學(xué)習(xí)和開發(fā)成本還是偏高。要做一個(gè)10萬+TPS的業(yè)務(wù)在Storm上穩(wěn)定運(yùn)行,需要對Storm了解比較深入才能做到,不然會發(fā)現(xiàn)有這樣或者那樣的問題。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容