本文主要介紹storm中的基本概念,從基礎(chǔ)上了解strom的體系結(jié)構(gòu),便于后續(xù)編程過程中作為基礎(chǔ)指導(dǎo)。主要的概念包括:
- topology(拓?fù)洌?/li>
- stream(數(shù)據(jù)流)
- spout(水龍頭、數(shù)據(jù)源)
- bolt(螺栓,數(shù)據(jù)篩選處理)
- stream group(數(shù)據(jù)流分組)
- reliability(可靠性)
- task(任務(wù))
- worker(執(zhí)行者)
因?yàn)樯鲜龈拍钪谐丝煽啃詒eliability翻譯起來(lái)比較合適,其他幾個(gè)詞實(shí)在找不到合適的對(duì)應(yīng)詞語(yǔ),就直接使用原詞。
另外一點(diǎn)需要注意的是,本文使用的storm-core版本是0.10.0,包路徑為backtype.storm。因?yàn)榘⒗锇桶烷_源了jstorm,據(jù)說(shuō)strom2.0之后使用jstorm作為master主干,從github上可以看到包路徑修改為了org.apache.storm,如果發(fā)現(xiàn)有包路徑錯(cuò)誤的地方,請(qǐng)對(duì)應(yīng)修改。
topology
Storm實(shí)時(shí)運(yùn)行應(yīng)用包邏輯上成為一個(gè)topology,一個(gè)Storm的topology相當(dāng)于MapReduce的job。關(guān)鍵的不同是MapReduce的job有明確的起始和結(jié)束,而Storm的topology會(huì)一直運(yùn)行下去(除非進(jìn)程被殺死或取消部署)。一個(gè)topology是有多個(gè)spout、bolt通過數(shù)據(jù)流分組連接起來(lái)的圖結(jié)構(gòu)。

本地調(diào)試
本地調(diào)試模擬了集群模式運(yùn)行方式,對(duì)于開發(fā)和調(diào)試topology很有用。而且本地模式下運(yùn)行topology與集群模式下類似,只是使用backtype.storm.LocalCluster來(lái)模擬集群狀態(tài)。使用backtype.storm.LocalCluster#submitTopology方法提交topology,定義topology唯一名字、topology的配置(使用的是backtype.storm.Config對(duì)象)、以及topology對(duì)象(通過backtype.storm.topology.TopologyBuilder#createTopology方法創(chuàng)建)。通過backtype.storm.LocalCluster#killTopology殺掉指定topology,通過backtype.storm.LocalCluster#shutdown停止運(yùn)行的本地集群模式。比如:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology(DEFAULT_TOPOLOGY_NAME);
cluster.shutdown();
本地模式常用的配置如下:
- Config.TOPOLOGY_MAX_TASK_PARALLELISM:這個(gè)配置項(xiàng)主要用來(lái)設(shè)置每個(gè)組件線程數(shù)的上限。在生產(chǎn)環(huán)境中,每個(gè)topology中有很多并行線程,但是在本地調(diào)試過程中,沒有必要存在這么多并行線程,可以通過這個(gè)配置來(lái)進(jìn)行設(shè)置。
- Config.TOPOLOGY_DEBUG:設(shè)置為true,Storm將記錄每個(gè)tuple提交后的日志信息,對(duì)于調(diào)試程序很有用。
集群模式運(yùn)行
集群模式下運(yùn)行topology與本地模式下類似,具體步驟如下:
- 定義topology(java下使用
backtype.storm.topology.TopologyBuilder#createTopology創(chuàng)建) - 通過
backtype.storm.StormSubmitter#submitTopology提交topology到集群。StormSubmitter需要的參數(shù)與LocalCluster`的參數(shù)一致:topology名、topology配置、topology對(duì)象。比如:
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);
- 將自己的代碼與依賴的代碼打成jar包(除了storm自己的代碼,storm自己的代碼已經(jīng)在classpath下了)。
如果使用的是Mava,可以使用Maven Assembly Plugin打包,在pom.xml中加入如下代碼:
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.path.to.main.Class</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
- 使用storm客戶端將topology提交到集群,需要指定jar包路徑、類名、以及提交到main方法的參數(shù)列表:
./bin/storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
- 可以使用
storm kill命令停止一個(gè)topology:
./bin/storm kill topologyName
數(shù)據(jù)流
數(shù)據(jù)流是Storm核心定義的抽象概念,由無(wú)限制的tuple組成的序列,tuple包含一個(gè)或多個(gè)鍵值對(duì)列表,可以包含java自帶的類型或者自定義的可序列化的類型。
每個(gè)數(shù)據(jù)流可以在定義時(shí)通過backtype.storm.topology.OutputFieldsDeclarer的declareStream方法指定id。默認(rèn)的id是“default”(直接使用declare將使用默認(rèn)id)。
在上面的topology圖中,每個(gè)藍(lán)色、綠色、紅色的條帶是一個(gè)數(shù)據(jù)流,每個(gè)數(shù)據(jù)流內(nèi)部由tuple組成。
spout
spout是topology中數(shù)據(jù)流的數(shù)據(jù)入口,充當(dāng)數(shù)據(jù)采集器功能,通常spout從外部數(shù)據(jù)源讀取數(shù)據(jù),將數(shù)據(jù)轉(zhuǎn)化為tuple,然后將它們發(fā)送到topology中。spout可以是可靠的或不可靠的??煽康膕pout能夠保證在storm處理tuple出現(xiàn)異常情況下,能夠重新發(fā)送該tuple,而不可靠的spout不再處理已發(fā)送的tuple。
spout通過backtype.storm.topology.OutputFieldsDeclarer的declareStream方法定義數(shù)據(jù)流,通過backtype.storm.spout.SpoutOutputCollector的emit方法發(fā)送tream。
backtype.storm.spout.ISpout#nextTuple方法是spout的主要方法,可以發(fā)送用于發(fā)送新的tuple,或直接return(不需要發(fā)送新的tuple時(shí),可以直接return)。
當(dāng)Storm檢測(cè)到由某一spout發(fā)送的tuple成功處理后,將調(diào)用backtype.storm.spout.ISpout#ack方法;當(dāng)調(diào)用失敗,將調(diào)用backtype.storm.spout.ISpout#fail方法。具體可以查看后面的可靠性。
bolt
在topology中所有操作都是在bolt中執(zhí)行的,它可以進(jìn)行過濾、計(jì)算、連接、聚合、數(shù)據(jù)庫(kù)讀寫,以及其他操作??梢詫⒁粋€(gè)或多個(gè)spout作為輸入,對(duì)數(shù)據(jù)進(jìn)行運(yùn)算后,選擇性的輸出一個(gè)或多個(gè)數(shù)據(jù)流。一個(gè)bolt可以做一些簡(jiǎn)單的數(shù)據(jù)變換,復(fù)雜的數(shù)據(jù)處理需要多個(gè)步驟或多個(gè)bolt。
bolt可以訂閱一個(gè)或多個(gè)spout或bolt的數(shù)據(jù),通過backtype.storm.topology.OutputFieldsDeclarer#declareStream方法定義輸出的數(shù)據(jù)流,通過backtype.storm.topology.BasicOutputCollector#emit方法提交數(shù)據(jù)。
bolt通過backtype.storm.topology.InputDeclarer類的shuffleGrouping方法指定需要訂閱的數(shù)據(jù)流,比如:declarer.shuffleGrouping("1", "stream_id"),同時(shí)InputDeclarer也提供了接收所有數(shù)據(jù)流的語(yǔ)法糖,比如:declarer.shuffleGrouping("1"),相當(dāng)于declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)。這個(gè)地方有點(diǎn)亂,簡(jiǎn)單的說(shuō),bolt B前面有一個(gè)spout A或bolt A,從A中發(fā)送一個(gè)id為a_id的數(shù)據(jù)流,如果B向只訂閱id為a_id的數(shù)據(jù)流,就使用第一個(gè)方法,如果可以接收所有id類型的數(shù)據(jù)流,就用第二個(gè)方法。
該類型中主要執(zhí)行的方法是cn.howardliu.demo.storm.kafka.wordCount.SentenceBolt#execute,用來(lái)獲取新的tuple,并進(jìn)行處理。同樣使用backtype.storm.topology.BasicOutputCollector#emit方法發(fā)送新的tuple。bolt可以調(diào)用backtype.storm.task.OutputCollector#ack方法來(lái)通知Storm該tuple已經(jīng)處理完成。
數(shù)據(jù)流分組
定義topology的很重要的一部分就是定義數(shù)據(jù)流數(shù)據(jù)流應(yīng)該發(fā)送到那些bolt中。數(shù)據(jù)流分組就是將數(shù)據(jù)流進(jìn)行分組,按需要進(jìn)入不同的bolt中??梢允褂肧torm提供的分組規(guī)則,也可以實(shí)現(xiàn)backtype.storm.grouping.CustomStreamGrouping自定義分組規(guī)則。Storm定義了8種內(nèi)置的數(shù)據(jù)流分組方法:
- Shuffle grouping(隨機(jī)分組):隨機(jī)分發(fā)tuple給bolt的各個(gè)task,每個(gè)bolt實(shí)例接收到相同數(shù)量的tuple;
- Fields grouping(按字段分組):根據(jù)指定字段的值進(jìn)行分組。比如,一個(gè)數(shù)據(jù)流按照"user-id"分組,所有具有相同"user-id"的tuple將被路由到同一bolt的task中,不同"user-id"可能路由到不同bolt的task中;
- Partial Key grouping(部分key分組):數(shù)據(jù)流根據(jù)field進(jìn)行分組,類似于按字段分組,但是將在兩個(gè)下游bolt之間進(jìn)行均衡負(fù)載,當(dāng)資源發(fā)生傾斜的時(shí)候能夠更有效率的使用資源。The Power of Both Choices: Practical Load
Balancing for Distributed Stream Processing Engines提供了更加詳細(xì)的說(shuō)明; - All grouping(全復(fù)制分組):將所有tuple復(fù)制后分發(fā)給所有bolt的task。小心使用。
- Global grouping(全局分組):將所有的tuple路由到唯一一個(gè)task上。Storm按照最小的task ID來(lái)選取接收數(shù)據(jù)的task;(注意,當(dāng)時(shí)用全局分組是,設(shè)置bolt的task并發(fā)是沒有意義的,因?yàn)樗衪uple都轉(zhuǎn)發(fā)到一個(gè)task上。同時(shí)需要注意的是,所有tuple轉(zhuǎn)發(fā)到一個(gè)jvm實(shí)例上,可能會(huì)引起storm集群某個(gè)jvm或服務(wù)器出現(xiàn)性能瓶頸或崩潰)
- None grouping(不分組):這種分組方式指明不需要關(guān)心分組方式。實(shí)際上,不分組功能與隨機(jī)分組相同。預(yù)留功能。
- Direct grouping(指向型分組):數(shù)據(jù)源會(huì)調(diào)用emitDirect來(lái)判斷一個(gè)tuple應(yīng)該由哪個(gè)storm組件接收,只能在聲明了指向型的數(shù)據(jù)流上使用。
- Local or shuffle grouping(本地或隨機(jī)分組):當(dāng)同一個(gè)worker進(jìn)程中有目標(biāo)bolt,將把數(shù)據(jù)發(fā)送到這些bolt中。否則,功能將與隨機(jī)分組相同。該方法取決與topology的并發(fā)度,本地或隨機(jī)分組可以減少網(wǎng)絡(luò)傳輸,降低IO,提高topology性能。
可靠行
storm可以保證每一個(gè)spout發(fā)出的tuple能夠被完整處理,通過跟蹤tuple樹上的每個(gè)tuple,檢查是否被成功處理。每個(gè)topology有一個(gè)超時(shí)時(shí)間,如果storm檢查到某個(gè)tuple已經(jīng)超時(shí),將重新發(fā)送該tuple。為了使用這種特性,需要定義tuple的起點(diǎn),以及tuple被成功處理。更多內(nèi)容查看Guaranteeing message processing。
task
task是spout和bolt的實(shí)例,他們的nextTuple()和execute()方法會(huì)被executors線程調(diào)用執(zhí)行。根據(jù)數(shù)據(jù)流分組來(lái)確定如何從某個(gè)task中的tuple發(fā)送到其他的task。
worker
topology運(yùn)行在一個(gè)或多個(gè)worker進(jìn)程上,worker是jvm虛擬機(jī),運(yùn)行topology所有task的一部分。比如,topology的并發(fā)是300,有50個(gè)worker,那每個(gè)worker就有6個(gè)task。Storm會(huì)平衡所有worker的task數(shù)量。通過Config.TOPOLOGY_WORKERS來(lái)設(shè)置topology的worker數(shù)量。
個(gè)人主頁(yè): http://www.howardliu.cn
個(gè)人博文: storm筆記:storm基本概念
CSDN主頁(yè): http://blog.csdn.net/liuxinghao
CSDN博文: storm筆記:storm基本概念