主要涉及到的技術(shù)框架:flume(日志收集及傳輸)、kafka(消息隊(duì)列)、storm(流式計(jì)算)、hadoop(離線分析),這幾項(xiàng)技術(shù)也是大數(shù)據(jù)方面較為成熟和常用的技術(shù)方案。大數(shù)據(jù)是未來的一個(gè)熱點(diǎn)方向,涉及的技術(shù)和思想也十分豐富。本文僅對此次服務(wù)監(jiān)控中應(yīng)用到的主要技術(shù)框架的基本原理和核心概念做一介紹
Flume:
1. 簡介:
Flume ng是cloudera提供的一個(gè)分布式、可靠、高可用的系統(tǒng),它能夠?qū)⒉煌瑪?shù)據(jù)源的海量日志數(shù)據(jù)進(jìn)行高效收集、聚合、移動,最后存儲到一個(gè)數(shù)據(jù)存儲系統(tǒng)中(flume 的核心就是把數(shù)據(jù)從數(shù)據(jù)源收集過來,再送到目的地)。由原來的flume og到現(xiàn)在的flume ng,進(jìn)行了架構(gòu)重構(gòu)。改動的另一原因是將 flume 納入 apache 旗下,cloudera flume 改名為 apache flume。
2. 基本架構(gòu):

? Event:
代表著一個(gè)數(shù)據(jù)流的最小完整單元,通過source從外部數(shù)據(jù)源輸入,流經(jīng)channel,最終通過sink向外部輸出,比如我們?nèi)罩局械囊粭l日志
? Flow:
Event從源點(diǎn)到達(dá)目的點(diǎn)的遷移的抽象
? Agent:
一個(gè)獨(dú)立的flume進(jìn)程,包含三個(gè)核心組件source、channel、sink,通過這些組件,event可以從一個(gè)地方流向下一個(gè)地方
? Source:
用來消費(fèi)傳遞到該組件的event,可以接收外部源發(fā)送過來的數(shù)據(jù)。不同的 source可以接受不同的數(shù)據(jù)格式。flume支持avro、exec、spool、http等source類型
1. Exec source:以運(yùn)行 linux 命令的方式,持續(xù)的輸出最新的數(shù)據(jù),如 tail -f 指令。 exec source 可以實(shí)現(xiàn)對日志的實(shí)時(shí)收集,但是存在flume不運(yùn)行或者指令執(zhí)行出錯(cuò)時(shí),將無法收集到日志數(shù)據(jù),無法保證日志數(shù)據(jù)的完整性
2. Spooling directory source:監(jiān)測配置的目錄下新增的文件,并將文件中的數(shù)據(jù)讀取出來。spool source 雖然無法實(shí)現(xiàn)實(shí)時(shí)的收集數(shù)據(jù),但是可以使用以分鐘的方式分割文件,趨近于實(shí)時(shí)。
更多其他的source類型參考官方文檔http://flume.apache.org/flumeuserguide.html#flume-sources
? Channel:
中轉(zhuǎn)event的一個(gè)臨時(shí)存儲,保存有source組件傳遞過來的event,可以理解為一個(gè)隊(duì)列。flume支持memory、jdbc、file、kafka等channel類型
1. Memory channel:可以實(shí)現(xiàn)高速的吞吐,但是無法保證數(shù)據(jù)的完整性,如果 java 進(jìn)程死掉,任何存儲在內(nèi)存的事件將會丟失。另外,內(nèi)存的空間受到分配內(nèi)存大小的限制
2. File channel:是一個(gè)持久化的channel,它持久化所有的event,并將其存儲到磁盤中。因此,即使 java 虛擬機(jī)當(dāng)?shù)?,也不會造成?shù)據(jù)丟失
3. Kafka channel(1.6版本新增):event被存儲在kafka集群中,速度慢于memory channel,但是高于file channel,并且可靠性較高。另外此種方式相比之前的source->channel->sink->kafka也更加便捷
更多其他的channel類型參考官方文檔http://flume.apache.org/flumeuserguide.html#flume-channels
? Sink:
從channel中讀取并移除event,將event傳遞到下一個(gè)節(jié)點(diǎn),可以是storage,也可以是下一個(gè)agent(如果有的話)。flume支持hdfs、logger、file roll、elastic search、kafka等sink類型
更多其他的sink類型參考官方文檔http://flume.apache.org/flumeuserguide.html#flume-sinks
Kafka:
1. 簡介:
Kafka是由linkedin開發(fā)的一個(gè)分布式的消息(發(fā)布/訂閱)系統(tǒng),使用scala編寫,作為多種類型的數(shù)據(jù)管道和消息系統(tǒng)使用。主要特性如下:
? 以時(shí)間復(fù)雜度為o(1)的方式提供消息持久化能力,即使對tb級以上數(shù)據(jù)也能保證常數(shù)時(shí)間復(fù)雜度的訪問性能(得益于其對數(shù)據(jù)文件的順序?qū)懭耄╝ppend))
? 高吞吐率。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100k條以上消息的傳輸
? 支持kafka server間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè)partition內(nèi)的消息順序傳輸
? 同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理(group機(jī)制)
? Scale out:支持在線水平擴(kuò)展
2. 基本架構(gòu):

作為一個(gè)訂閱/發(fā)布系統(tǒng),kafka最基本的結(jié)構(gòu)類似上圖,主要包含三類角色:producer(消息發(fā)布(數(shù)據(jù)寫入))、kafka cluster(數(shù)據(jù)存儲)、consumer(訂閱消費(fèi)(數(shù)據(jù)讀取))。更為全面的拓?fù)浣Y(jié)構(gòu)類似下圖:

一個(gè)典型的kafka集群中包含若干producer,若干broker(kafka集群,支持水平擴(kuò)展),若干consumer group,以及一個(gè)zookeeper集群。kafka通過zookeeper管理集群配置。producer使用push模式將消息發(fā)布到broker,consumer使用pull模式從broker訂閱并消費(fèi)消息
3. 核心概念:
? Topic:
一個(gè)topic可以認(rèn)為是一類消息,topic在邏輯上可以被認(rèn)為是一個(gè)queue,每條消息都必須指定它所屬的topic,可以簡單理解為必須指明把這條消息放進(jìn)哪個(gè)queue里,consumer也要指明消費(fèi)哪個(gè)topic的數(shù)據(jù)
? Partition:
1. 每個(gè)topic將被分成一個(gè)或多個(gè)partition,每個(gè)partition在物理上對應(yīng)一個(gè)目錄,該目錄下存儲這個(gè)partition的所有消息文件和索引文件。任何發(fā)布到此partition的消息都會被直接append到log文件的尾部,屬于順序?qū)懘疟P,因此效率非常高,這是kafka高吞吐率的一個(gè)很重要的保證,邏輯結(jié)構(gòu)如下圖:

2. Partitions的設(shè)計(jì)目的有多個(gè):最根本原因是kafka基于文件存儲,通過分區(qū),可以將日志內(nèi)容分散到多個(gè)kafka實(shí)例(broker)上,來避免文件尺寸達(dá)到單機(jī)磁盤的上限,也能分散讀寫壓力;此外越多的partition意味著可以容納更多的consumer,有效提升并發(fā)消費(fèi)的能力(原因見下文)
? Log:
Kafka底層通過文件系統(tǒng)存儲數(shù)據(jù),這里所說的文件在kafka里稱作log,如下圖:

1. 日志文件中保存了一序列“l(fā)og entries”(日志條目)。每個(gè)日志都有一個(gè)offset來唯一的標(biāo)記一條消息,每個(gè)partition在物理存儲層面有多個(gè)log file組成(稱為segment)。當(dāng)segment文件尺寸達(dá)到一定閥值時(shí),將會創(chuàng)建一個(gè)新的文件(控制單個(gè)文件大?。.?dāng)buffer中消息的條數(shù)達(dá)到閥值時(shí)將會觸發(fā)數(shù)據(jù)flush到數(shù)據(jù)文件中,同時(shí)如果“距離最近一次flush的時(shí)間差”達(dá)到閾值時(shí)也會觸發(fā)flush到日志文件(批量寫入,避免頻繁io操作)
2. 對于傳統(tǒng)的mq而言,一般會刪除已經(jīng)被消費(fèi)的消息,而kafka集群會保留所有的消息,無論其是否被消費(fèi)(這樣做的一點(diǎn)好處是當(dāng)有需要的時(shí)候可以重復(fù)消費(fèi)已消費(fèi)過的數(shù)據(jù))。當(dāng)然,因?yàn)榇疟P限制,不可能永久保留所有數(shù)據(jù)(實(shí)際上也沒必要),因此kafka提供兩種策略刪除舊數(shù)據(jù)。一是基于時(shí)間,二是基于partition文件大小
? Producer:
Producer將消息發(fā)布到指定的topic中,同時(shí)producer也能通過paritition策略選擇將其存儲到哪一個(gè)partition,如果一個(gè)topic只有一個(gè)partition,那這個(gè)partition所在的機(jī)器的io將有可能成為這個(gè)topic的性能瓶頸,而有了多partition后,不同的消息可以并行寫入不同broker的不同partition里,極大的提高了吞吐率
? Consumer group:
1. 本質(zhì)上kafka只支持topic,每個(gè)consumer屬于一個(gè)consumer group。反過來說,每個(gè)group中可以有多個(gè)consumer。發(fā)送到topic的一條消息,只會被訂閱此topic的每個(gè)group中的一個(gè)consumer消費(fèi),如下圖:

2. 這是kafka用來實(shí)現(xiàn)一個(gè)topic消息的廣播(發(fā)給所有的consumer group)和單播(發(fā)給某一個(gè)consumer group)的手段。多個(gè)consumer group可以訂閱同一個(gè)topic。如果需要實(shí)現(xiàn)廣播,只要consumer歸屬于不同的group就可以了(同一份數(shù)據(jù)采用不同的處理方式,比如實(shí)時(shí)計(jì)算和離線存儲)。要實(shí)現(xiàn)單播只要所有的consumer在同一個(gè)group里
? Push vs pull
在kafka中,采用了pull方式,即consumer在和broker建立連接之后,主動去pull消息。這種模式有其優(yōu)點(diǎn),consumer端可以根據(jù)自己的消費(fèi)能力適時(shí)的去pull消息并處理,且可以控制消息消費(fèi)的進(jìn)度,消費(fèi)者可以良好的控制消息消費(fèi)的數(shù)量
? 核心概念總結(jié):
1. 一個(gè)partition中的消息只會被group中的一個(gè)consumer消費(fèi)
2. 每個(gè)group中consumer消費(fèi)互相獨(dú)立
3. 我們可以認(rèn)為一個(gè)group是一個(gè)“訂閱”者,一個(gè)topic中的每個(gè)partition,只會被一個(gè)group中的一個(gè)consumer消費(fèi),不過一個(gè)consumer可以消費(fèi)多個(gè)partition中的消息
4. Kafka只能保證一個(gè)partition中的消息被某個(gè)consumer消費(fèi)時(shí)是有序的
? Replication:
1. Kafka將每個(gè)partition數(shù)據(jù)復(fù)制到多個(gè)server上,每個(gè)partition有一個(gè)leader和多個(gè)follower
2. Leader處理所有的讀寫請求,follower需要和leader保持同步。leader負(fù)責(zé)跟蹤所有follower的狀態(tài),如果follower"落后"太多或者斷開連接,leader將會把它從同步列表中刪除
3. 當(dāng)所有的follower都將一條消息保存成功,此消息才被認(rèn)為是"committed",那么此時(shí)consumer才能消費(fèi)它
4. 即使只有一個(gè)實(shí)例存活,仍然可以保證消息的正常發(fā)送和接收,只要zookeeper集群存活即可
Storm:
1. 簡介:
Storm是一個(gè)分布式的、高容錯(cuò)的實(shí)時(shí)計(jì)算系統(tǒng)。類似于hadoop的map和reduce原語,storm提供了spout和bolt原語,使得實(shí)時(shí)計(jì)算的編程模型變得非常簡單。目前主要的應(yīng)用場景包括:實(shí)時(shí)分析(計(jì)算)、在線機(jī)器學(xué)習(xí)等
2. 基本架構(gòu):

? Nimbus:負(fù)責(zé)資源分配和任務(wù)調(diào)度
? Supervisor:負(fù)責(zé)接受nimbus分配的任務(wù),啟動和停止屬于自己管理的worker進(jìn)程
? Worker:運(yùn)行具體處理組件邏輯的進(jìn)程
? Task:worker中運(yùn)行的具體的組件(spout/bolt)
數(shù)據(jù)流:

太抽象?再來張具體點(diǎn)兒的!

圖中可以看到一些詞匯:spout(數(shù)據(jù)來源),bolt(數(shù)據(jù)處理),stream grouping(通過分組決定數(shù)據(jù)具體流向),而這一切最終繪制出一幅圖(topology),上圖展示了storm中的數(shù)據(jù)流,也看到了其中最核心的一些組件,下面具體介紹
3. 核心概念:
? Topology:
一個(gè)實(shí)時(shí)計(jì)算應(yīng)用程序的邏輯在storm里面被封裝到topology對象里面, 翻譯過來可以叫做計(jì)算拓補(bǔ)。storm里面的topology相當(dāng)于hadoop里面的一個(gè)mapreduce job,它們的關(guān)鍵區(qū)別是:一個(gè)mapreduce job最終總是會結(jié)束的, 然而一個(gè)storm的topology會一直運(yùn)行,除非你顯式的kill。一個(gè)topology是spouts和bolts組成的圖狀結(jié)構(gòu),而連接spouts和bolts的則是stream groupings
? Tuple:
一次消息傳遞的基本單元,本來應(yīng)該是一個(gè)key-value的map,但是由于各個(gè)組件間傳遞的tuple的字段名稱已經(jīng)事先定義好,所以tuple中只要按序填入各個(gè)value就行了,所以就是一個(gè)value list
? Stream:
Stream是storm里面的關(guān)鍵抽象。一個(gè)stream是一個(gè)沒有邊界的tuple序列,也就是流式計(jì)算里的流
? Spout:
在一個(gè)topology中產(chǎn)生源數(shù)據(jù)流的組件。通常情況下spout會從外部數(shù)據(jù)源中讀取數(shù)據(jù),然后轉(zhuǎn)換為topology內(nèi)部的源數(shù)據(jù)。spout接口中有個(gè)nexttuple()方法,storm框架會不停地調(diào)用此函數(shù),用戶只要在其中生成源數(shù)據(jù)即可
? Bolt:
在一個(gè)topology中接受數(shù)據(jù)然后執(zhí)行處理的組件。bolt可以執(zhí)行過濾、合并、寫數(shù)據(jù)庫等任何操作。bolt接口中有個(gè)execute(tuple input) 方法,在接受到數(shù)據(jù)后會調(diào)用此方法,用戶可以在其中執(zhí)行自己想要的操作
? Stream grouping:
定義了一個(gè)流在bolt任務(wù)間該如何被切分,說白了就是誰來處理哪些數(shù)據(jù),數(shù)據(jù)該流向哪里(消息分發(fā)策略),下面列舉幾種storm中常用的stream grouping類型:
1. Shuffle grouping:隨機(jī)分組,隨機(jī)派發(fā)stream里面的tuple給后續(xù)的bolt,保證每個(gè)bolt接收到的tuple數(shù)目基本相同
2. Fields grouping:按字段分組,比如按userid來分組,具有同樣userid的tuple會被分發(fā)到相同的bolt,而不同的userid則會被分配到不同的bolt
3. All grouping:廣播發(fā)送,對于每一個(gè)tuple,所有的bolt都會收到
4. Global grouping:全局分組,tuple被分配到storm中的一個(gè)bolt,再具體一點(diǎn)就是分配給id值最低的那個(gè)task
更多grouping方式詳見:http://storm.apache.org/documentation/concepts.html
? 并發(fā)度:

影響storm topology并發(fā)度的因素(遇到性能問題時(shí)通過這三個(gè)方面進(jìn)行調(diào)整):
1. Worker進(jìn)程數(shù)
2. Task線程數(shù)
3. Task數(shù)
下面看個(gè)具體的例子:

// use two worker processes
Conf.setnumworkers(2);
// set parallelism hint to 2
Topologybuilder.setspout("blue-spout", new bluespout(), 2);
Topologybuilder.setbolt("green-bolt", new greenbolt(), 2).setnumtasks(4);
Topologybuilder.setbolt("yellow-bolt", new yellowbolt(), 6);
Hadoop:
1. 簡介:
Hadoop是一個(gè)開源的可運(yùn)行于大規(guī)模集群上的分布式文件系統(tǒng)和運(yùn)行處理基礎(chǔ)框架,擅長于在廉價(jià)機(jī)器搭建的集群上進(jìn)行海量數(shù)據(jù)的存儲與離線處理,具有可靠、高效、可伸縮的特點(diǎn)。Hadoop是一個(gè)生態(tài)系統(tǒng),其下包含了很多相關(guān)技術(shù)組件和框架,主要包括hdfs(分布式文件系統(tǒng))、mapreduce(分布式計(jì)算框架)、hive(基于hadoop的數(shù)據(jù)倉庫)、hbase(分布式列存數(shù)據(jù)庫)zookeeper(分布式協(xié)作服務(wù))、avro(序列化系統(tǒng))等。今天我們主要介紹一下我們在實(shí)際使用中接觸較多的mapreduce
2. 基本架構(gòu):
那么究竟什么是mapreduce?舉個(gè)簡單的例子來說明,你想數(shù)出一摞牌中有多少張黑桃,直觀方式是自己一張一張檢查并且數(shù)出有多少張是黑桃。mapreduce方法則是:
? 把牌分給其他幾個(gè)人
? 讓每個(gè)人數(shù)自己手中的牌有幾張是黑桃,然后把這個(gè)數(shù)字匯報(bào)給你(通過把牌分給多個(gè)玩家并且讓他們各自數(shù)數(shù),你就在并行執(zhí)行運(yùn)算,因?yàn)槊總€(gè)玩家都在同時(shí)計(jì)數(shù),這同時(shí)把這項(xiàng)工作變成了分布式的)——map
? 你把所有玩家告訴你的數(shù)字加起來,得到最后的結(jié)論——reduce
當(dāng)然這只是打個(gè)比方,實(shí)際上并不嚴(yán)謹(jǐn),我們來看下面的圖:

Mapreduce可以理解為是一套編程模型,更具體一點(diǎn)可以理解為一個(gè)過程。在這個(gè)過程中主要包括了以下幾個(gè)階段:map、shuffle(分map端及reduce端)、reduce,下面通過wordcount(假設(shè)我們有n篇文章,我們需要對這n篇文章中出現(xiàn)的單詞分別統(tǒng)計(jì)出現(xiàn)的次數(shù))為例(此例被譽(yù)為hadoop上的hello world)分別對這幾個(gè)階段作一介紹
3. 關(guān)鍵步驟:

對讀取的語句進(jìn)行map操作,輸出為形式,其中key為單詞,value為單詞出現(xiàn)1次
?Shuffle:
Shuffle要解決的問題就是怎樣把map的輸出結(jié)果有效地傳送到reduce端。也可以這樣理解, shuffle描述著數(shù)據(jù)從map端輸出到reduce端輸入的這段過程。這段過程在map端大致包含partition、spill、merge,在reduce端包含copy、merge
Map端:
1.Partition:
假設(shè)將來我們的job有3個(gè)reduce task,那么就要決定map之后的數(shù)據(jù)將來交由哪個(gè)reduce后續(xù)處理,這就是partition階段做的工作Mapreduce提供partitioner接口,它的作用就是根據(jù)key及reduce的數(shù)量來決定當(dāng)前的這對輸出數(shù)據(jù)最終應(yīng)該交由哪個(gè)reduce task處理。默認(rèn)對key hash后再以reduce task數(shù)量取模。默認(rèn)的取模方式只是為了平均reduce的處理能力,如果用戶自己對partition有需求,可以自行定制
2.Spill:
接下來,需要將數(shù)據(jù)寫入內(nèi)存緩沖區(qū)中,緩沖區(qū)的作用是批量收集map結(jié)果,減少磁盤io的影響。當(dāng)緩沖區(qū)使用率到達(dá)一個(gè)閾值(100*0.8),就會將緩沖區(qū)數(shù)據(jù)寫入磁盤,這個(gè)過程就是spill,中文可以翻譯為溢寫,很形象也很好理解。spill過程會啟動一個(gè)新的線程,不會影響map task繼續(xù)輸出結(jié)果。spill過程中會對將要寫入磁盤的數(shù)據(jù)按照key進(jìn)行排序(sort)在wordcount事例中, map的輸出可以看到有很多類似、、、、這樣相同的,如果把這些全部寫入磁盤,第一浪費(fèi)空間,第二增加磁盤io。所以在這里有一個(gè)潛在的優(yōu)化點(diǎn),就是把key相同的數(shù)據(jù)進(jìn)行合并,合并后的數(shù)據(jù)類似、,這個(gè)過程叫做combine。combine過程是可選的,我們可以在job中設(shè)置相應(yīng)的combiner。combiner有助于優(yōu)化中間結(jié)果,但并不是所有場景都適合使用,一個(gè)總的原則就是,combiner絕不能影響最終的計(jì)算結(jié)果(累加這種場景就可以使用)
3.Merge:
每一次spill都會產(chǎn)生一個(gè)溢寫文件,當(dāng)數(shù)據(jù)量使得spill多次發(fā)生時(shí),就會產(chǎn)生多個(gè)溢寫文件。而最終當(dāng)map過程完成的時(shí)候,需要將這些溢寫文件合并為一個(gè),這個(gè)過程就是merge在這個(gè)過程中可能會遇到類似這樣的數(shù)據(jù)、、、,因?yàn)樗麄兙哂邢嗤膋ey,所以會被合并,結(jié)果類似、至此,map端的shuffle過程就完成了,下面再看一下reduce端的shuffle過程
Reduce端:
1. Copy:
在上一階段,map端生成了作為reduce端輸入的數(shù)據(jù)文件,在reduce真正運(yùn)行之前,所有的時(shí)間都是在拉取數(shù)據(jù),做merge。copy過程,就是reduce端啟動一些線程,從map端拉取數(shù)據(jù)的過程
2. Merge:
Copy過來的數(shù)據(jù)首先會存入內(nèi)存緩沖區(qū),這里緩沖區(qū)的大小要比map端更為靈活,它基于jvm的heap。因?yàn)樵趕huffle階段,reduce還沒有開始運(yùn)行,所以可以把更多的內(nèi)存分配給shuffle過程來使用。類似于map端,當(dāng)內(nèi)存緩沖區(qū)達(dá)到一定閾值后,將數(shù)據(jù)寫入磁盤,并最終生成一個(gè)文件,這個(gè)文件就是reduce端數(shù)據(jù)輸出的文件
至此,整個(gè)shuffle過程就完成了
? Reduce:

Reduce的輸入為,經(jīng)過reduce匯總(遍歷累計(jì)合并結(jié)果),輸出,key為單詞,value為單詞出現(xiàn)的總次數(shù)