Storm 系統(tǒng)中包含以下幾個基本概念:
拓撲(Topologies)
流(Streams)
數(shù)據(jù)源(Spouts)
數(shù)據(jù)流處理組件(Bolts)
數(shù)據(jù)流分組(Stream groupings)
可靠性(Reliability)
任務(wù)(Tasks)
工作進程(Workers)
譯者注:由于 Storm 的幾個基礎(chǔ)概念無論是直譯還是意譯均不夠清晰,而且還會讓習慣了 Storm 編程模型的讀者感到困惑,因此后文在提及這些概念時大多還會以英文原文出現(xiàn),希望大家能夠諒解。
拓撲(Topologies)
Storm 的拓撲是對實時計算應(yīng)用邏輯的封裝,它的作用與 MapReduce 的任務(wù)(Job)很相似,區(qū)別在于 MapReduce 的一個 Job 在得到結(jié)果之后總會結(jié)束,而拓撲會一直在集群中運行,直到你手動去終止它。拓撲還可以理解成由一系列通過數(shù)據(jù)流(Stream Grouping)相互關(guān)聯(lián)的 Spout 和 Bolt 組成的的拓撲結(jié)構(gòu)。Spout 和 Bolt 稱為拓撲的組件(Component)。我們會在后文中給出這些概念的解釋。
相關(guān)資料
TopologyBuilder:在 Java 中使用此類構(gòu)造拓撲
在生產(chǎn)環(huán)境中運行拓撲
本地模式:通過本文學(xué)習如何在本地模式中開發(fā)、測試拓撲
數(shù)據(jù)流(Streams)
數(shù)據(jù)流(Streams)是 Storm 中最核心的抽象概念。一個數(shù)據(jù)流指的是在分布式環(huán)境中并行創(chuàng)建、處理的一組元組(tuple)的無界序列。數(shù)據(jù)流可以由一種能夠表述數(shù)據(jù)流中元組的域(fields)的模式來定義。在默認情況下,元組(tuple)包含有整型(Integer)數(shù)字、長整型(Long)數(shù)字、短整型(Short)數(shù)字、字節(jié)(Byte)、雙精度浮點數(shù)(Double)、單精度浮點數(shù)(Float)、布爾值以及字節(jié)數(shù)組等基本類型對象。當然,你也可以通過定義可序列化的對象來實現(xiàn)自定義的元組類型。
在聲明數(shù)據(jù)流的時候需要給數(shù)據(jù)流定義一個有效的 id。不過,由于在實際應(yīng)用中使用最多的還是單一數(shù)據(jù)流的 Spout 與 Bolt,這種場景下不需要使用 id 來區(qū)分數(shù)據(jù)流,因此可以直接使用 OutputFieldsDeclarer來定義“無 id”的數(shù)據(jù)流。實際上,系統(tǒng)默認會給這種數(shù)據(jù)流定義一個名為“default”的 id。
相關(guān)資料
元組(Tuple):數(shù)據(jù)流由多個元組構(gòu)成
OutputFieldsDeclarer:用于聲明數(shù)據(jù)流和數(shù)據(jù)流對應(yīng)的模式
序列化(Serialization):關(guān)于 Storm 元組的動態(tài)類型以及聲明自定義序列化模型的相關(guān)內(nèi)容
ISerialization:自定義的序列化模型必須實現(xiàn)該接口
CONFIG.TOPOLOGY_SERIALIZATIONS:自定義的序列化模型可以通過這個配置項實現(xiàn)注冊
數(shù)據(jù)源(Spouts)
數(shù)據(jù)源(Spout)是拓撲中數(shù)據(jù)流的來源。一般 Spout 會從一個外部的數(shù)據(jù)源讀取元組然后將他們發(fā)送到拓撲中。根據(jù)需求的不同,Spout 既可以定義為可靠的數(shù)據(jù)源,也可以定義為不可靠的數(shù)據(jù)源。一個可靠的 Spout 能夠在它發(fā)送的元組處理失敗時重新發(fā)送該元組,以確保所有的元組都能得到正確的處理;相對應(yīng)的,不可靠的 Spout 就不會在元組發(fā)送之后對元組進行任何其他的處理。
一個 Spout 可以發(fā)送多個數(shù)據(jù)流。為了實現(xiàn)這個功能,可以先通過 OutputFieldsDeclarer 的 declareStream
方法來聲明定義不同的數(shù)據(jù)流,然后在發(fā)送數(shù)據(jù)時在 SpoutOutputCollector 的 emit
方法中將數(shù)據(jù)流 id 作為參數(shù)來實現(xiàn)數(shù)據(jù)發(fā)送的功能。
Spout 中的關(guān)鍵方法是 nextTuple
。顧名思義,nextTuple
要么會向拓撲中發(fā)送一個新的元組,要么會在沒有可發(fā)送的元組時直接返回。需要特別注意的是,由于 Storm 是在同一個線程中調(diào)用所有的 Spout 方法,nextTuple
不能被 Spout 的任何其他功能方法所阻塞,否則會直接導(dǎo)致數(shù)據(jù)流的中斷(關(guān)于這一點,阿里的 JStorm 修改了 Spout 的模型,使用不同的線程來處理消息的發(fā)送,這種做法有利有弊,好處在于可以更加靈活地實現(xiàn) Spout,壞處在于系統(tǒng)的調(diào)度模型更加復(fù)雜,如何取舍還是要看具體的需求場景吧——譯者注)。
Spout 中另外兩個關(guān)鍵方法是 ack
和 fail
,他們分別用于在 Storm 檢測到一個發(fā)送過的元組已經(jīng)被成功處理或處理失敗后的進一步處理。注意,ack
和 fail
方法僅僅對上述“可靠的” Spout 有效。
相關(guān)資料
IRichSpout:這是實現(xiàn) Spout 的接口
消息的可靠性處理
數(shù)據(jù)流處理組件(Bolts)
拓撲中所有的數(shù)據(jù)處理均是由 Bolt 完成的。通過數(shù)據(jù)過濾(filtering)、函數(shù)處理(functions)、聚合(aggregations)、聯(lián)結(jié)(joins)、數(shù)據(jù)庫交互等功能,Bolt 幾乎能夠完成任何一種數(shù)據(jù)處理需求。
一個 Bolt 可以實現(xiàn)簡單的數(shù)據(jù)流轉(zhuǎn)換,而更復(fù)雜的數(shù)據(jù)流變換通常需要使用多個 Bolt 并通過多個步驟完成。例如,將一個微博數(shù)據(jù)流轉(zhuǎn)換成一個趨勢圖像的數(shù)據(jù)流至少包含兩個步驟:其中一個 Bolt 用于對每個圖片的微博轉(zhuǎn)發(fā)進行滾動計數(shù),另一個或多個 Bolt 將數(shù)據(jù)流輸出為“轉(zhuǎn)發(fā)最多的圖片”結(jié)果(相對于使用2個Bolt,如果使用3個 Bolt 你可以讓這種轉(zhuǎn)換具有更好的可擴展性)。
與 Spout 相同,Bolt 也可以輸出多個數(shù)據(jù)流。為了實現(xiàn)這個功能,可以先通過 OutputFieldsDeclarer 的 declareStream
方法來聲明定義不同的數(shù)據(jù)流,然后在發(fā)送數(shù)據(jù)時在 OutputCollector 的 emit
方法中將數(shù)據(jù)流 id 作為參數(shù)來實現(xiàn)數(shù)據(jù)發(fā)送的功能。
在定義 Bolt 的輸入數(shù)據(jù)流時,你需要從其他的 Storm 組件中訂閱指定的數(shù)據(jù)流。如果你需要從其他所有的組件中訂閱數(shù)據(jù)流,你就必須要在定義 Bolt 時分別注冊每一個組件。對于聲明為默認 id(即上文中提到的“default”——譯者注)的數(shù)據(jù)流,InputDeclarer支持訂閱此類數(shù)據(jù)流的語法糖。也就是說,如果需要訂閱來自組件“1”的數(shù)據(jù)流,declarer.shuffleGrouping("1")
與 declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)
兩種聲明方式是等價的。
Bolt 的關(guān)鍵方法是 execute
方法。execute
方法負責接收一個元組作為輸入,并且使用 OutputCollector 對象發(fā)送新的元組。如果有消息可靠性保障的需求,Bolt 必須為它所處理的每個元組調(diào)用 OutputCollector
的 ack
方法,以便 Storm 能夠了解元組是否處理完成(并且最終決定是否可以響應(yīng)最初的 Spout 輸出元組樹)。一般情況下,對于每個輸入元組,在處理之后可以根據(jù)需要選擇不發(fā)送還是發(fā)送多個新元組,然后再響應(yīng)(ack)輸入元組。IBasicBolt 接口能夠?qū)崿F(xiàn)元組的自動應(yīng)答。
在 Bolt 中啟動新線程來進行異步處理是一種非常好的方式,因為 OutputCollector 是線程安全的對象,可以在任意時刻被調(diào)用(此處譯者保留意見,由于 Storm 的并發(fā)設(shè)計和集群的彈性擴展機制,在 Bolt 中新建的線程可能存在一定的不可控風險——譯者注)。
請注意 OutputCollector 不是線程安全的對象,所有的 emit、ack 和 fail 操作都需要在同一個線程中進行處理。更多信息請參考問題與解決一文。
相關(guān)資料
IRichBolt:用于定義 Bolt 的基本接口
IBasicBolt: 用于定義帶有過濾或者其他簡單的函數(shù)操作功能的 Bolt 的簡便接口
OutputCollector:Bolt 使用此類來發(fā)送數(shù)據(jù)流
消息的可靠性處理
數(shù)據(jù)流分組(Stream groupings)
為拓撲中的每個 Bolt 的確定輸入數(shù)據(jù)流是定義一個拓撲的重要環(huán)節(jié)。數(shù)據(jù)流分組定義了在 Bolt 的不同任務(wù)(tasks)中劃分數(shù)據(jù)流的方式。
在 Storm 中有八種內(nèi)置的數(shù)據(jù)流分組方式(原文有誤,現(xiàn)在已經(jīng)已經(jīng)有八種分組模型——譯者注),而且你還可以通過CustomStreamGrouping 接口實現(xiàn)自定義的數(shù)據(jù)流分組模型。這八種分組分時分別為:
隨機分組(Shuffle grouping):這種方式下元組會被盡可能隨機地分配到 Bolt 的不同任務(wù)(tasks)中,使得每個任務(wù)所處理元組數(shù)量能夠能夠保持基本一致,以確保集群的負載均衡。
域分組(Fields grouping):這種方式下數(shù)據(jù)流根據(jù)定義的“域”來進行分組。例如,如果某個數(shù)據(jù)流是基于一個名為“user-id”的域進行分組的,那么所有包含相同的“user-id”的元組都會被分配到同一個任務(wù)中,這樣就可以確保消息處理的一致性。
部分關(guān)鍵字分組(Partial Key grouping):這種方式與域分組很相似,根據(jù)定義的域來對數(shù)據(jù)流進行分組,不同的是,這種方式會考慮下游 Bolt 數(shù)據(jù)處理的均衡性問題,在輸入數(shù)據(jù)源關(guān)鍵字不平衡時會有更好的性能1
。感興趣的讀者可以參考這篇論文,其中詳細解釋了這種分組方式的工作原理以及它的優(yōu)點。
完全分組(All grouping):這種方式下數(shù)據(jù)流會被同時發(fā)送到 Bolt 的所有任務(wù)中(也就是說同一個元組會被復(fù)制多份然后被所有的任務(wù)處理),使用這種分組方式要特別小心。
全局分組(Global grouping):這種方式下所有的數(shù)據(jù)流都會被發(fā)送到 Bolt 的同一個任務(wù)中,也就是 id 最小的那個任務(wù)。
非分組(None grouping):使用這種方式說明你不關(guān)心數(shù)據(jù)流如何分組。目前這種方式的結(jié)果與隨機分組完全等效,不過未來 Storm 社區(qū)可能會考慮通過非分組方式來讓 Bolt 和它所訂閱的 Spout 或 Bolt 在同一個線程中執(zhí)行。
直接分組(Direct grouping):這是一種特殊的分組方式。使用這種方式意味著元組的發(fā)送者可以指定下游的哪個任務(wù)可以接收這個元組。只有在數(shù)據(jù)流被聲明為直接數(shù)據(jù)流時才能夠使用直接分組方式。使用直接數(shù)據(jù)流發(fā)送元組需要使用 OutputCollector 的其中一個 emitDirect 方法。Bolt 可以通過 TopologyContext 來獲取它的下游消費者的任務(wù) id,也可以通過跟蹤 OutputCollector 的 emit
方法(該方法會返回它所發(fā)送元組的目標任務(wù)的 id)的數(shù)據(jù)來獲取任務(wù) id。
本地或隨機分組(Local or shuffle grouping):如果在源組件的 worker 進程里目標 Bolt 有一個或更多的任務(wù)線程,元組會被隨機分配到那些同進程的任務(wù)中。換句話說,這與隨機分組的方式具有相似的效果。
相關(guān)資料
TopologyBuilder:使用此類構(gòu)造拓撲
InputDeclarer:在 TopologyBuilder
中調(diào)用 setBolt
方法時會返回這個對象的實例,通過該對象就可以定義 Bolt 的輸入數(shù)據(jù)流以及數(shù)據(jù)流的分組方式
CoordinatedBolt:這個 Bolt 主要用于分布式 RPC 拓撲,其中大量使用了直接數(shù)據(jù)流與直接分組模型
可靠性(Reliability)
Storm 可以通過拓撲來確保每個發(fā)送的元組都能得到正確處理。通過跟蹤由 Spout 發(fā)出的每個元組構(gòu)成的元組樹可以確定元組是否已經(jīng)完成處理。每個拓撲都有一個“消息延時”參數(shù),如果 Storm 在延時時間內(nèi)沒有檢測到元組是否處理完成,就會將該元組標記為處理失敗,并會在稍后重新發(fā)送該元組。
為了充分利用 Storm 的可靠性機制,你必須在元組樹創(chuàng)建新結(jié)點的時候以及元組處理完成的時候通知 Storm。這個過程可以在 Bolt 發(fā)送元組時通過 OutputCollector 實現(xiàn):在 emit
方法中實現(xiàn)元組的錨定(Anchoring),同時使用 ack
方法表明你已經(jīng)完成了元組的處理。
關(guān)于可靠性保障的更多內(nèi)容可以參考這篇文章:消息的可靠性處理。
任務(wù)(Tasks)
在 Storm 集群中每個 Spout 和 Bolt 都由若干個任務(wù)(tasks)來執(zhí)行。每個任務(wù)都與一個執(zhí)行線程相對應(yīng)。數(shù)據(jù)流分組可以決定如何由一組任務(wù)向另一組任務(wù)發(fā)送元組。你可以在 TopologyBuilder 的 setSpout
方法和 setBolt
方法中設(shè)置 Spout/Bolt 的并行度。
工作進程(Workers)
拓撲是在一個或多個工作進程(worker processes)中運行的。每個工作進程都是一個實際的 JVM 進程,并且執(zhí)行拓撲的一個子集。例如,如果拓撲的并行度定義為300,工作進程數(shù)定義為50,那么每個工作進程就會執(zhí)行6個任務(wù)(進程內(nèi)部的線程)。Storm 會在所有的 worker 中分散任務(wù),以便實現(xiàn)集群的負載均衡。
相關(guān)資料
Config.TOPOLOGY_WORKERS:這個配置項用于設(shè)置拓撲的工作進程數(shù)