【譯】Apache Storm系列 之一(核心概念)

本文列出 Storm 的幾個(gè)主要概念,并會(huì)給出相關(guān)資源的鏈接以便你獲取更多信息,概念主要如下:

  • Topologies
  • Streams
  • Spouts
  • Bolts
  • Stream groupings
  • Reliability
  • Tasks
  • Workers

拓?fù)洌═opologies)

實(shí)時(shí)應(yīng)用程序的邏輯被打包到 Storm 拓?fù)渲?。一個(gè) Storm 拓?fù)漕?lèi)似于一個(gè) MapReduce 任務(wù)。關(guān)鍵的區(qū)別在于 MapReduce 任務(wù)最終會(huì)結(jié)束,而拓?fù)鋾?huì)一直運(yùn)行(當(dāng)然,除非你強(qiáng)制 kill 掉拓?fù)湎嚓P(guān)的進(jìn)程)。拓?fù)淇梢岳斫鉃橥ㄟ^(guò)數(shù)據(jù)流(Stream Grouping)將 Spout 和 Bolt 相互連接而組成的圖狀結(jié)構(gòu)的程序。spouts 和 bolts 的概念會(huì)在下文介紹。

相關(guān)資源:

流(Streams)

流是 Storm 的核心抽象。Storm中,一個(gè)流指的是在分布式環(huán)境中被并行創(chuàng)建以及處理的元組(tuple)序列集。流是無(wú)限的元組(tuple)序列,以分布式方式并行創(chuàng)建和處理。流往往有固定的模式(我們稱(chēng)之為“fields”),不同模式由不同的元組(tuple)類(lèi)型以一定的方式組成。通常,元組(tuple)可以包含 integers, longs, shorts, bytes, strings, doubles, floats, booleans, 以及 byte arrays。當(dāng)然,你也可以通過(guò)定義可序列化的對(duì)象來(lái)實(shí)現(xiàn)自定義的元組類(lèi)型。

相關(guān)資源:

數(shù)據(jù)源(Spouts)

在拓?fù)渲校?spout 是流的來(lái)源。通常情況,Spouts 會(huì)從外部源(例如消息隊(duì)列或者 Twitter API)讀取數(shù)據(jù)并將數(shù)據(jù)發(fā)送到拓?fù)渲?。Spouts 既可以是可靠的,也可以是不可靠的。可靠的情況是如果數(shù)據(jù)流沒(méi)有被 Storm 處理,Spouts 將重新發(fā)送數(shù)據(jù)。不可靠的情況則是對(duì)發(fā)送過(guò)的數(shù)據(jù)不予確認(rèn)。

Spouts 一次可以發(fā)送多個(gè)流。為了實(shí)現(xiàn)多流發(fā)送,我們可以使用(實(shí)現(xiàn)) OutputFieldsDeclarer 接口中的 declareStream 方法來(lái)指定多個(gè)流,并使用(實(shí)現(xiàn)) SpoutOutputCollector 接口中的 emit 方法進(jìn)行發(fā)送。

nextTuple 是 Spouts 中的主要方法。nextTuple 方法要么發(fā)送一個(gè)新的元組到 topology 中,要么直接返回(如果沒(méi)有新的元組需要發(fā)送)。需要注意的是,nextTuple 不應(yīng)該被 Spout 的任何其他方法所阻塞,否則會(huì)導(dǎo)致數(shù)據(jù)流的停止接入,這是因?yàn)?Spout 的所有方法是在一個(gè)線(xiàn)程中執(zhí)行。

ack 和 fail 是 Spouts 中另外兩個(gè)重要的方法。Spouts 為可靠模式時(shí),Storm 會(huì)檢測(cè)每一個(gè)從 Spouts 發(fā)送出去的元組是否成功,成功調(diào)用 ack,失敗調(diào)用 fail。當(dāng)然,在不可靠模式下,是不會(huì)調(diào)用這兩個(gè)方法的。

相關(guān)資源:

處理組件(Bolts)

topologies 所有的處理都是在 bolts 中進(jìn)行。bolts 可以做很多事情,例如:過(guò)濾流、邏輯處理、聚合、連接、數(shù)據(jù)庫(kù)交互等等。

bolts 可以從事簡(jiǎn)單的數(shù)據(jù)流轉(zhuǎn)換。處理復(fù)雜的數(shù)據(jù)流轉(zhuǎn)換通常需要將流程分成多步,這也就意味著我們可以使用多類(lèi)(個(gè)) bolt。例如,從微博數(shù)據(jù)流中得出一個(gè)趨勢(shì)圖,實(shí)現(xiàn)這個(gè)需求我們至少需要兩步:第一個(gè) bolt 計(jì)算每個(gè)圖片的點(diǎn)擊數(shù),第二個(gè) bolt 在第一個(gè)基礎(chǔ)上得出 TOP X 的圖片(當(dāng)然為了流程可擴(kuò)展,我們可以使用更多的 bolt,不僅限于兩個(gè))。

bolts 一次可以發(fā)送多個(gè)流。為了實(shí)現(xiàn)多流發(fā)送,我們可以使用(實(shí)現(xiàn)) OutputFieldsDeclarer 接口中的 declareStream 方法來(lái)指定多個(gè)流,并使用(實(shí)現(xiàn)) OutputCollector 接口中的 emit 方法進(jìn)行發(fā)送(跟 spout 類(lèi)似)。

在定義 Bolt 的輸入數(shù)據(jù)流時(shí),你需要從其他的 Storm 組件中訂閱指定的數(shù)據(jù)流。如果你需要從其他所有的組件中訂閱數(shù)據(jù)流,你就必須要在定義 Bolt 時(shí)分別注冊(cè)每一個(gè)組件。對(duì)于聲明為默認(rèn) id 的數(shù)據(jù)流,InputDeclarer 接口有訂閱此類(lèi)數(shù)據(jù)流的語(yǔ)法糖。調(diào)用 <font color=orange size=2 >declarer.shuffleGrouping("1") </font> 將訂閱來(lái)自 id 為“1” 的組件(spout/bolt)產(chǎn)生的數(shù)據(jù)流,其等價(jià)于調(diào)用 <font color=orange size=2 >declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)</font>

execute 是 bolt 的主要方法,它接收新的元組作為輸入。bolt 使用 OutputCollector 對(duì)象來(lái)發(fā)送新的元組。bolt 必須為每個(gè)經(jīng)由它處理的元組調(diào)用 OutputCollector 中的 ack 方法,這樣以便 Storm 知道這些元組什么時(shí)候被處理完成(最終判斷對(duì)原始 spout 元組的響應(yīng)是否合適)。處理元組的一般情況是,我們可以發(fā)送多個(gè)元組或者直接不發(fā)送,然后響應(yīng)下一個(gè)輸入元組,我們可以實(shí)現(xiàn) IBasicBolt 接口來(lái)完成 bolt 操作。

我們可以在 bolt 任務(wù)中開(kāi)啟一個(gè)新的線(xiàn)程來(lái)完成異步操作。OutputCollector 線(xiàn)程安全并且可以隨時(shí)被調(diào)用。

相關(guān)資源:

流分組(Stream groupings)

定義一個(gè) topology 的重要一部分是指定每個(gè) bolt 應(yīng)該接收哪些流作為輸入。流分組(stream grouping)定義了流如何分發(fā)到各個(gè) bolt 中。

Storm 提供了 8 種流分組策略。當(dāng)然,你也可以通過(guò)實(shí)現(xiàn) CustomStreamGrouping 接口來(lái)實(shí)現(xiàn)一個(gè)用戶(hù)自定義的流分組:

  • Shuffle grouping : 元組被隨機(jī)分發(fā)到各個(gè) bolt 任務(wù)中,也就是說(shuō)每個(gè) bolt 接收到大致相同數(shù)目的元組。
  • Fields grouping : 根據(jù)指定的 field 進(jìn)行分組 ,同一個(gè) field 的值一定會(huì)被發(fā)送到同一個(gè) task 上。例如,如果流按照 "user-id" 這個(gè) field 進(jìn)行分組,那么相同的 "user-id" 值會(huì)進(jìn)入相同的任務(wù)(task),如果不同,則進(jìn)入不同的任務(wù)。
  • Partial Key grouping : 與 Fields grouping 類(lèi)似,根據(jù)指定的 field 的一部分進(jìn)行分組分發(fā),能夠很好地實(shí)現(xiàn) load balance,將元組發(fā)送給下游的 bolt 對(duì)應(yīng)的 task,特別是在存在數(shù)據(jù)傾斜的場(chǎng)景,使用 Partial Key grouping 能夠更好地提高資源利用率
  • All grouping : 流復(fù)制到所有 bolt task 上。
  • Global grouping: 所有的流都指向一個(gè) bolt 的同一個(gè) task,也就是Task ID最小的。
  • None grouping : 使用這個(gè)分組,用戶(hù)不用關(guān)心流是如何進(jìn)行分組的。目前,這個(gè)分組類(lèi)似于 Shuffle grouping。不過(guò)未來(lái) Storm 可能會(huì)考慮通過(guò)這種分組來(lái)讓 Bolt 和它所訂閱的 Spout 或 Bolt 在同一個(gè)線(xiàn)程中執(zhí)行。
  • Direct grouping : 由 tupe 的生產(chǎn)者來(lái)決定發(fā)送給下游的哪一個(gè) bolt 的 task ,這個(gè)要在實(shí)際開(kāi)發(fā)編寫(xiě) bolt 代碼的邏輯中進(jìn)行精確控制。
  • Local or shuffle grouping : 如果目標(biāo) bolt 有1個(gè)或多個(gè) task 都在同一個(gè) worker 進(jìn)程對(duì)應(yīng)的 JVM 實(shí)例中,則 tuple 只發(fā)送給這些 task。

可靠性(Reliability)

Storm 保證每個(gè) spout 元組都能在拓?fù)渲斜惶幚怼Mㄟ^(guò)跟蹤由 Spout 發(fā)出的每個(gè)元組構(gòu)成的元組樹(shù)可以確定元組是否已經(jīng)完成處理。每個(gè)拓?fù)涠加信c之相關(guān)的消息超時(shí)。如果在超時(shí)時(shí)間內(nèi)沒(méi)有檢測(cè)到元組是否被完整處理,該原則將會(huì)被標(biāo)記并重新發(fā)送。

想要使用 Storm 這個(gè)可靠性功能,你必須在元組創(chuàng)建以及處理完成時(shí)告訴 Storm。你可以使用用于發(fā)送數(shù)據(jù)流的 OutputCollector 對(duì)象,并使用 ack 方法表明你已經(jīng)完成了元組的處理。

任務(wù)(Tasks)

集群中,每一個(gè) spout 和 bolt 運(yùn)行了多個(gè)任務(wù)。每個(gè)任務(wù)對(duì)應(yīng)一個(gè)執(zhí)行線(xiàn)程,流分組定義如何將元組從一組任務(wù)發(fā)送到另一組任務(wù)。你可以使用 TopologyBuilder 中的 setSpout 和 setBolt 方法來(lái)設(shè)置任務(wù)并行度。

Workers

一個(gè)拓?fù)渲羞\(yùn)行了一個(gè)或多個(gè) worker 進(jìn)程。每個(gè)進(jìn)程都是一個(gè)物理 JVM,并且拓?fù)渲械乃?task 都在這些進(jìn)程中執(zhí)行。例如,如果并行度為 300,我們有 50 個(gè)worker 進(jìn)程,那么每個(gè)進(jìn)程將處理 6 個(gè) task。Storm 有其機(jī)制致力于將所有任務(wù)盡量平均地分配到每個(gè)進(jìn)程中。

相關(guān)資源:
Config.TOPOLOGY_WORKERS: 設(shè)置 worker 數(shù)量的配置


掃碼關(guān)注微信公眾號(hào)"Kooola大數(shù)據(jù)",聊人生|聊技術(shù)
掃碼關(guān)注微信公眾號(hào)"Kooola大數(shù)據(jù)",聊人生|聊技術(shù)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容