Strom流式處理框架的基本概念

前言

Strom特點
進程常駐內(nèi)存,數(shù)據(jù)不經(jīng)過磁盤,在內(nèi)存中處理,速度非???,可以達到毫秒(秒)級別

Storm數(shù)據(jù)傳輸
采用Netty,基于NIO,更加高效,早期采用ZMQ,但是ZMQ和Strom的license不兼容。

Storm可靠性

  • 異常處理
  • 消息可靠性的保證機制ACK

Strom和其他框架的對比
1.Spark-Streaming 微批處理框架 秒級,不是純流式計算框架和Spark核心之上的計算模型,和Spark的其他組件兼容較好。
2.MR 批處理框架,分鐘級別,MR模型,反復(fù)啟停。
3.Strom 秒(毫秒)級別,流式處理,DAG模型有向無環(huán)圖,常駐運行,不關(guān)閉,獨立系統(tǒng)專為流式處理設(shè)計。

一.Storm編程模型

  • Spout:用于持續(xù)不斷的發(fā)送數(shù)據(jù)

  • Tuple:數(shù)據(jù)都被封裝進tuple容器,進行傳輸,Storm中發(fā)送數(shù)據(jù)的基本單元,Tuple不斷的向后傳輸,像水滴一樣

  • Bolt:用于接受并處理數(shù)據(jù)

  • Stream:從Spout中源源不斷傳遞數(shù)據(jù)給Blot,以及上一個Blog傳遞數(shù)據(jù)給下一個Blot,所組成的數(shù)據(jù)通道叫做Stream,Stream默認的名稱為default,可以為其指定id。

  • 并行度:可以使用多線程模型,充分利用CPU,可以有效應(yīng)對高并發(fā),高數(shù)據(jù)量的應(yīng)用場景,還可以多臺服務(wù)器,多節(jié)點,多線程運行任務(wù)

  • 有向無環(huán)圖(Directed Acyclic Graph):對于Storm實時計算邏輯的封裝,即通過一系列由數(shù)據(jù)流相互關(guān)聯(lián)的Spout、Blot所組成的拓撲結(jié)構(gòu)

二.Storm的分發(fā)策略

1、Shuffle Grouping
隨機分發(fā),隨機派發(fā)Stream里面的tuple,保證每個bolt task接收到的tuple的數(shù)量大致相同

2、Field Grouping
根據(jù)字段分發(fā),例如根據(jù)OutputFieldsDeclarer中聲明的一個field屬性進行分發(fā),field的對應(yīng)的tuple的值相同,就會分發(fā)到同一bolt中去進行處理,filed不同可能就會被分發(fā)到不同的task

3、All Grouping
廣播模式分發(fā),每一個tuple都會被分發(fā)到下一個階段的所有的bolt中

4、Global Grouping分發(fā)
將tuple分發(fā)到后續(xù)bolt中taskid最小的task中去執(zhí)行,可以看做是高可用,其他的bolt作為備用,一旦taskid最小的task對應(yīng)的bolt掛了,還有bolt可以使用,可以保證Storm集群的可用性

5、None Grouping
類似于Shuffle Grouping,不同的是None Grouping采用的是輪訓(xùn)的形式,Shuffle采用的是隨機分發(fā)有一點不同的是storm會把使用none grouping的這個bolt放到這個bolt的訂閱者同一個線程里面去執(zhí)行

6、Direct Grouping
指向性分發(fā),這種分發(fā)策略意味著消息(tuple)的發(fā)送者指定由消息接受者的哪個task來處理消息,只有被聲明為Direct Stream的消息流可以聲明這種分組方法,而且這種消息必須由emitDirect方法來發(fā)送??梢允褂肨opologyContext獲取taskId,還有outputCollector.emit()方法也可以返回taskid

7、Local or Shuffle Grouping
本地隨機分組,如果目標bolt有一個或者多個task與源bolt的task在同進程中,則隨機分發(fā)到同進程的task中,否則和Shuffle Grouping一樣

8、自定義分發(fā) customGrouping

三.Storm 架構(gòu)模型

1.Nimbus 主節(jié)點的守護進程

  • 負責(zé)資源調(diào)度
  • 任務(wù)分配
  • 接受Client上傳的jar包

2.Supervisor 從節(jié)點的守護進程,具體完成計算工作
接受NimBus分配的任務(wù)->監(jiān)視ZK節(jié)點看是否有分配給自己的任務(wù)

啟動、關(guān)閉自己管理的Worker進程,可以有多個Worker進程,Work進程的數(shù)量由配置文件設(shè)定,配置文件是Client上傳jar包的配置文件,即由Client指定。

3.Worker 從節(jié)點的工作進程
由Supervisor控制啟動關(guān)閉,專門用于計算,所有的拓撲作業(yè)在Worker上運行。

運行具體運算組件的進程,一個topology可能會在一個或者多個worker(工作進程)里面執(zhí)行。

Worker任務(wù)的類型只有兩種, Spout和Bolt。

Executor是Worker JVM內(nèi)部的一個線程,一般每個Executor負責(zé)運行一個或多個任務(wù),但僅用于特定的spout或bolt。但一般默認
每個executor只執(zhí)行一個task。

Worker中可以啟動多個線程Executor,執(zhí)行特定的spout對應(yīng)的多個task任務(wù),提高并行度,但是注意每個task的taskid不一致,taskid起到了區(qū)分作用。

4.Zookeeper
替代了部分Nimbus的作用,也是目前storm是不支持nimbus高可用的但能保證系統(tǒng)不受太大影響的一個支撐。

Zookeeper本身已經(jīng)是按至少三臺部署的HA架構(gòu)了。Supervisor進程和Nimbus進程,需要用Daemon程序如monit來啟動,失效時自動重新啟動。因為它們在進程內(nèi)都不保存狀態(tài),狀態(tài)都保存在本地文件和ZooKeeper,因此進程可以隨便殺。

如果Nimbus進程所在的機器都直接倒了,需要在其他機器上重新啟動,Storm目前沒有自建支持,需要自己寫腳本實現(xiàn)。即使Nimbus進程不在了,也只是不能部署新任務(wù),有節(jié)點失效時不能重新分配而已,不影響已有的線程。同樣,如果Supervisor進程失效,不影響已存在的Worker進程。

目前storm官方或許是出于nimbus宕機對集群影響不大的考慮,并沒有在這方面有所進展。

四.Strom 任務(wù)提交流程

image.png

1.Client提交topology作業(yè)的相關(guān)jar包到Nimbus

2.提交的jar包會被上傳到Nimbus服務(wù)器下的Store-local/nimbus/inbox目錄下

3.submitTopology方法負責(zé)對這個topology進行處理。

  • 首先對storm本身和topology進行一些校驗,檢查storm狀態(tài)是否是active的
  • 檢查是否存在同名的topology已經(jīng)在storm中運行了
  • 檢查topology中的spout和bolt是否使用了相同的id,以及id是否規(guī)范,不能以開頭,是系統(tǒng)保留的命名方式

4.建立topology的本地文件目錄 /nimbus/stormdist/topology-uuid

該目錄包括三個文件

  • stormjar.jar :包含這個topology的所有代碼的jar包
  • stomecode.ser --這個topology對象的序列化
  • stomeconf.ser --運行這個topology的配置

5.numbus分配任務(wù),獲取空閑的work,根據(jù)topology定義中給的numworker參數(shù)、parallelism參數(shù)和numTask數(shù)量,給spout和bolt設(shè)置task數(shù)目,并且分配相應(yīng)的task-id,分配worker

6.numbus在ZK上創(chuàng)建taskbeat目錄,要求每個task每隔一定時間就要發(fā)送心跳匯報狀態(tài)

7.將分配好的任務(wù)寫入到zk中,此刻任務(wù)才算提交完畢,zk上節(jié)點為assignment/topology-uuid

8.將topology的信息寫入到zookeeper/storms目錄

9.Supervisor監(jiān)聽zookeeper上的storms目錄,看看是否有所屬的新的任務(wù),有就根據(jù)任務(wù)信息,啟動worker,下載下來jar包

10.刪除本地不再運行的topology代碼

11.supervisor根據(jù)nimbus指定的任務(wù)信息啟動worker進行工作

12.work根據(jù)taskid分辨出spout和blot

13.計算出所代表的spout/blot會給哪些task發(fā)送消息

14.根據(jù)ip和端口號創(chuàng)建響應(yīng)的網(wǎng)絡(luò)連接用來發(fā)送消息

Storm本地目錄樹

Storm本地目錄樹

Zookeeper目錄樹

Zookeeper目錄樹

Storm程序的并發(fā)機制

并發(fā)機制

從官網(wǎng)的解釋中,我們可以得出以下幾點。

1.Workers (JVMs):

  • 一個Topology拓撲可以在一個或多個Worker上運行,(每個Worker進程只能從屬于一個特定的Topology)
  • 這些Worker進程會并行跑在集群中不同的服務(wù)器上,即一個Topology拓撲其實是由并行運行在Storm集群中多臺服務(wù)器上的進程所組成。

2.Executors (threads):

  • Executor是由Worker進程中生成的一個線程
  • 每個Worker進程中會運行拓撲中的一個或多個Executor線程。
  • 一個Executor線程線程可以執(zhí)行一個或者多個Task任務(wù),默認只運行一個任務(wù),但是這些任務(wù)都對應(yīng)相同的Component(Spout/Blot)。

3.Tasks(bolt/spout instances):

  • 實際執(zhí)行數(shù)據(jù)處理的最小單元
  • 每個task即為一個Spout或者一個Bolt。
  • Task數(shù)量在整個Topology生命周期中保持不變,worker,Executor數(shù)量可以變化或手動調(diào)整。
  • 默認情況下,Task數(shù)量和Executor是相同的,即每個Executor線程中默認運行一個Task任務(wù)
  • 可以調(diào)用TopologyBuilder.setSpout和TopBuilder.setBolt來設(shè)置并行度,也就是有多少個task。

4.對應(yīng)關(guān)系總結(jié)
Topology與Woker的關(guān)系

  • 1:n
  • 1:1

Executor與Task的關(guān)系

  • 1:1
  • 1:n(相同的Component)

image.png

對應(yīng)上圖的代碼示例

Config conf = newConfig();
//用2個worker
conf.setNumWorkers(2); 
//設(shè)置2個并發(fā)度
topologyBuilder.setSpout("blue-spout", newBlueSpout(), 2); 
////設(shè)置2個并發(fā)度,4個任務(wù)
topologyBuilder.setBolt("green-bolt", newGreenBolt(), 2).setNumTasks(4).shuffleGrouping("blue-spout"); 
 //設(shè)置6個并發(fā)度
topologyBuilder.setBolt("yellow-bolt", newYellowBolt(), 6).shuffleGrouping("green-bolt");
StormSubmitter.submitTopology("mytopology", conf, topologyBuilder.createTopology());

由上可知,3個組件的并發(fā)度加起來是10,就是說拓撲一共有10個executor,一共有2個worker,每個worker產(chǎn)生10 / 2 = 5條線程。

綠色的bolt配置成2個executor和4個task。為此每個executor為這個bolt運行2個task。黃色bolt和藍色的spout都是一個executor運行一個task。

對于并發(fā)度的配置, 在storm里面可以在多個地方進行配置, 優(yōu)先級為:
defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration

Storm的Rebalance – 再平衡

動態(tài)調(diào)整Topology拓撲的Worker進程數(shù)量、以及Executor線程數(shù)量。支持兩種調(diào)整方式:

1、通過Storm UI

2、通過Storm CLI
storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
將mytopology拓撲worker進程數(shù)量調(diào)整為5個,“ blue-spout ” 所使用的線程數(shù)量調(diào)整為3個,“ yellow-bolt ”所使用的線程數(shù)量調(diào)整為10個。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、Storm簡介 Storm是一個免費并開源的分布式實時計算系統(tǒng)。利用Storm可以很容易做到可靠地處理無限的數(shù)...
    達微閱讀 960評論 0 3
  • 本文借鑒官文,添加了一些解釋和看法,其中有些理解,寫的比較粗糙,有問題的地方希望大家指出。寫這篇文章,是想把一些官...
    達微閱讀 1,086評論 0 0
  • Storm是一個免費并開源的分布式實時計算系統(tǒng)。利用Storm可以很容易做到可靠地處理無限的數(shù)據(jù)流,像Hadoop...
    timothyue1閱讀 740評論 0 0
  • 1 Storm概述 Storm 是一個實時的、分布式的、可靠的流式數(shù)據(jù)處理系統(tǒng)。它的工作就是委派各種組件分別 獨立...
    lgh1008閱讀 1,726評論 0 0
  • 前言 前一段時間參與哨兵流式監(jiān)控功能設(shè)計,調(diào)研了兩個可以做流式計算的框架:storm和spark streamin...
    Java大生閱讀 803評論 0 0

友情鏈接更多精彩內(nèi)容