storm筆記:storm基本概念

本文主要介紹storm中的基本概念,從基礎(chǔ)上了解strom的體系結(jié)構(gòu),便于后續(xù)編程過程中作為基礎(chǔ)指導(dǎo)。主要的概念包括:

  1. topology(拓?fù)洌?/li>
  2. stream(數(shù)據(jù)流)
  3. spout(水龍頭、數(shù)據(jù)源)
  4. bolt(螺栓,數(shù)據(jù)篩選處理)
  5. stream group(數(shù)據(jù)流分組)
  6. reliability(可靠性)
  7. task(任務(wù))
  8. worker(執(zhí)行者)

因?yàn)樯鲜龈拍钪谐丝煽啃詒eliability翻譯起來(lái)比較合適,其他幾個(gè)詞實(shí)在找不到合適的對(duì)應(yīng)詞語(yǔ),就直接使用原詞。
另外一點(diǎn)需要注意的是,本文使用的storm-core版本是0.10.0,包路徑為backtype.storm。因?yàn)榘⒗锇桶烷_源了jstorm,據(jù)說(shuō)strom2.0之后使用jstorm作為master主干,從github上可以看到包路徑修改為了org.apache.storm,如果發(fā)現(xiàn)有包路徑錯(cuò)誤的地方,請(qǐng)對(duì)應(yīng)修改。

topology

Storm實(shí)時(shí)運(yùn)行應(yīng)用包邏輯上成為一個(gè)topology,一個(gè)Storm的topology相當(dāng)于MapReduce的job。關(guān)鍵的不同是MapReduce的job有明確的起始和結(jié)束,而Storm的topology會(huì)一直運(yùn)行下去(除非進(jìn)程被殺死或取消部署)。一個(gè)topology是有多個(gè)spout、bolt通過數(shù)據(jù)流分組連接起來(lái)的圖結(jié)構(gòu)。

storm topology

本地調(diào)試

本地調(diào)試模擬了集群模式運(yùn)行方式,對(duì)于開發(fā)和調(diào)試topology很有用。而且本地模式下運(yùn)行topology與集群模式下類似,只是使用backtype.storm.LocalCluster來(lái)模擬集群狀態(tài)。使用backtype.storm.LocalCluster#submitTopology方法提交topology,定義topology唯一名字、topology的配置(使用的是backtype.storm.Config對(duì)象)、以及topology對(duì)象(通過backtype.storm.topology.TopologyBuilder#createTopology方法創(chuàng)建)。通過backtype.storm.LocalCluster#killTopology殺掉指定topology,通過backtype.storm.LocalCluster#shutdown停止運(yùn)行的本地集群模式。比如:

LocalCluster cluster = new LocalCluster();
cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology(DEFAULT_TOPOLOGY_NAME);
cluster.shutdown();

本地模式常用的配置如下:

  1. Config.TOPOLOGY_MAX_TASK_PARALLELISM:這個(gè)配置項(xiàng)主要用來(lái)設(shè)置每個(gè)組件線程數(shù)的上限。在生產(chǎn)環(huán)境中,每個(gè)topology中有很多并行線程,但是在本地調(diào)試過程中,沒有必要存在這么多并行線程,可以通過這個(gè)配置來(lái)進(jìn)行設(shè)置。
  2. Config.TOPOLOGY_DEBUG:設(shè)置為true,Storm將記錄每個(gè)tuple提交后的日志信息,對(duì)于調(diào)試程序很有用。

集群模式運(yùn)行

集群模式下運(yùn)行topology與本地模式下類似,具體步驟如下:

  1. 定義topology(java下使用backtype.storm.topology.TopologyBuilder#createTopology創(chuàng)建)
  2. 通過backtype.storm.StormSubmitter#submitTopology提交topology到集群。StormSubmitter需要的參數(shù)與LocalCluster`的參數(shù)一致:topology名、topology配置、topology對(duì)象。比如:
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);
  1. 將自己的代碼與依賴的代碼打成jar包(除了storm自己的代碼,storm自己的代碼已經(jīng)在classpath下了)。
    如果使用的是Mava,可以使用Maven Assembly Plugin打包,在pom.xml中加入如下代碼:
<plugin>
  <artifactId>maven-assembly-plugin</artifactId>
  <configuration>
    <descriptorRefs>  
      <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    <archive>
      <manifest>
        <mainClass>com.path.to.main.Class</mainClass>
      </manifest>
    </archive>
  </configuration>
</plugin>
  1. 使用storm客戶端將topology提交到集群,需要指定jar包路徑、類名、以及提交到main方法的參數(shù)列表:
./bin/storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
  1. 可以使用storm kill命令停止一個(gè)topology:
./bin/storm kill topologyName

數(shù)據(jù)流

數(shù)據(jù)流是Storm核心定義的抽象概念,由無(wú)限制的tuple組成的序列,tuple包含一個(gè)或多個(gè)鍵值對(duì)列表,可以包含java自帶的類型或者自定義的可序列化的類型。

每個(gè)數(shù)據(jù)流可以在定義時(shí)通過backtype.storm.topology.OutputFieldsDeclarer的declareStream方法指定id。默認(rèn)的id是“default”(直接使用declare將使用默認(rèn)id)。

在上面的topology圖中,每個(gè)藍(lán)色、綠色、紅色的條帶是一個(gè)數(shù)據(jù)流,每個(gè)數(shù)據(jù)流內(nèi)部由tuple組成。

spout

spout是topology中數(shù)據(jù)流的數(shù)據(jù)入口,充當(dāng)數(shù)據(jù)采集器功能,通常spout從外部數(shù)據(jù)源讀取數(shù)據(jù),將數(shù)據(jù)轉(zhuǎn)化為tuple,然后將它們發(fā)送到topology中。spout可以是可靠的或不可靠的??煽康膕pout能夠保證在storm處理tuple出現(xiàn)異常情況下,能夠重新發(fā)送該tuple,而不可靠的spout不再處理已發(fā)送的tuple。

spout通過backtype.storm.topology.OutputFieldsDeclarerdeclareStream方法定義數(shù)據(jù)流,通過backtype.storm.spout.SpoutOutputCollectoremit方法發(fā)送tream。

backtype.storm.spout.ISpout#nextTuple方法是spout的主要方法,可以發(fā)送用于發(fā)送新的tuple,或直接return(不需要發(fā)送新的tuple時(shí),可以直接return)。

當(dāng)Storm檢測(cè)到由某一spout發(fā)送的tuple成功處理后,將調(diào)用backtype.storm.spout.ISpout#ack方法;當(dāng)調(diào)用失敗,將調(diào)用backtype.storm.spout.ISpout#fail方法。具體可以查看后面的可靠性。

bolt

在topology中所有操作都是在bolt中執(zhí)行的,它可以進(jìn)行過濾、計(jì)算、連接、聚合、數(shù)據(jù)庫(kù)讀寫,以及其他操作??梢詫⒁粋€(gè)或多個(gè)spout作為輸入,對(duì)數(shù)據(jù)進(jìn)行運(yùn)算后,選擇性的輸出一個(gè)或多個(gè)數(shù)據(jù)流。一個(gè)bolt可以做一些簡(jiǎn)單的數(shù)據(jù)變換,復(fù)雜的數(shù)據(jù)處理需要多個(gè)步驟或多個(gè)bolt。

bolt可以訂閱一個(gè)或多個(gè)spout或bolt的數(shù)據(jù),通過backtype.storm.topology.OutputFieldsDeclarer#declareStream方法定義輸出的數(shù)據(jù)流,通過backtype.storm.topology.BasicOutputCollector#emit方法提交數(shù)據(jù)。

bolt通過backtype.storm.topology.InputDeclarer類的shuffleGrouping方法指定需要訂閱的數(shù)據(jù)流,比如:declarer.shuffleGrouping("1", "stream_id"),同時(shí)InputDeclarer也提供了接收所有數(shù)據(jù)流的語(yǔ)法糖,比如:declarer.shuffleGrouping("1"),相當(dāng)于declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)。這個(gè)地方有點(diǎn)亂,簡(jiǎn)單的說(shuō),bolt B前面有一個(gè)spout A或bolt A,從A中發(fā)送一個(gè)id為a_id的數(shù)據(jù)流,如果B向只訂閱id為a_id的數(shù)據(jù)流,就使用第一個(gè)方法,如果可以接收所有id類型的數(shù)據(jù)流,就用第二個(gè)方法。

該類型中主要執(zhí)行的方法是cn.howardliu.demo.storm.kafka.wordCount.SentenceBolt#execute,用來(lái)獲取新的tuple,并進(jìn)行處理。同樣使用backtype.storm.topology.BasicOutputCollector#emit方法發(fā)送新的tuple。bolt可以調(diào)用backtype.storm.task.OutputCollector#ack方法來(lái)通知Storm該tuple已經(jīng)處理完成。

數(shù)據(jù)流分組

定義topology的很重要的一部分就是定義數(shù)據(jù)流數(shù)據(jù)流應(yīng)該發(fā)送到那些bolt中。數(shù)據(jù)流分組就是將數(shù)據(jù)流進(jìn)行分組,按需要進(jìn)入不同的bolt中??梢允褂肧torm提供的分組規(guī)則,也可以實(shí)現(xiàn)backtype.storm.grouping.CustomStreamGrouping自定義分組規(guī)則。Storm定義了8種內(nèi)置的數(shù)據(jù)流分組方法:

  1. Shuffle grouping(隨機(jī)分組):隨機(jī)分發(fā)tuple給bolt的各個(gè)task,每個(gè)bolt實(shí)例接收到相同數(shù)量的tuple;
  2. Fields grouping(按字段分組):根據(jù)指定字段的值進(jìn)行分組。比如,一個(gè)數(shù)據(jù)流按照"user-id"分組,所有具有相同"user-id"的tuple將被路由到同一bolt的task中,不同"user-id"可能路由到不同bolt的task中;
  3. Partial Key grouping(部分key分組):數(shù)據(jù)流根據(jù)field進(jìn)行分組,類似于按字段分組,但是將在兩個(gè)下游bolt之間進(jìn)行均衡負(fù)載,當(dāng)資源發(fā)生傾斜的時(shí)候能夠更有效率的使用資源。The Power of Both Choices: Practical Load
    Balancing for Distributed Stream Processing Engines
    提供了更加詳細(xì)的說(shuō)明;
  4. All grouping(全復(fù)制分組):將所有tuple復(fù)制后分發(fā)給所有bolt的task。小心使用。
  5. Global grouping(全局分組):將所有的tuple路由到唯一一個(gè)task上。Storm按照最小的task ID來(lái)選取接收數(shù)據(jù)的task;(注意,當(dāng)時(shí)用全局分組是,設(shè)置bolt的task并發(fā)是沒有意義的,因?yàn)樗衪uple都轉(zhuǎn)發(fā)到一個(gè)task上。同時(shí)需要注意的是,所有tuple轉(zhuǎn)發(fā)到一個(gè)jvm實(shí)例上,可能會(huì)引起storm集群某個(gè)jvm或服務(wù)器出現(xiàn)性能瓶頸或崩潰)
  6. None grouping(不分組):這種分組方式指明不需要關(guān)心分組方式。實(shí)際上,不分組功能與隨機(jī)分組相同。預(yù)留功能。
  7. Direct grouping(指向型分組):數(shù)據(jù)源會(huì)調(diào)用emitDirect來(lái)判斷一個(gè)tuple應(yīng)該由哪個(gè)storm組件接收,只能在聲明了指向型的數(shù)據(jù)流上使用。
  8. Local or shuffle grouping(本地或隨機(jī)分組):當(dāng)同一個(gè)worker進(jìn)程中有目標(biāo)bolt,將把數(shù)據(jù)發(fā)送到這些bolt中。否則,功能將與隨機(jī)分組相同。該方法取決與topology的并發(fā)度,本地或隨機(jī)分組可以減少網(wǎng)絡(luò)傳輸,降低IO,提高topology性能。

可靠行

storm可以保證每一個(gè)spout發(fā)出的tuple能夠被完整處理,通過跟蹤tuple樹上的每個(gè)tuple,檢查是否被成功處理。每個(gè)topology有一個(gè)超時(shí)時(shí)間,如果storm檢查到某個(gè)tuple已經(jīng)超時(shí),將重新發(fā)送該tuple。為了使用這種特性,需要定義tuple的起點(diǎn),以及tuple被成功處理。更多內(nèi)容查看Guaranteeing message processing。

task

task是spout和bolt的實(shí)例,他們的nextTuple()和execute()方法會(huì)被executors線程調(diào)用執(zhí)行。根據(jù)數(shù)據(jù)流分組來(lái)確定如何從某個(gè)task中的tuple發(fā)送到其他的task。

worker

topology運(yùn)行在一個(gè)或多個(gè)worker進(jìn)程上,worker是jvm虛擬機(jī),運(yùn)行topology所有task的一部分。比如,topology的并發(fā)是300,有50個(gè)worker,那每個(gè)worker就有6個(gè)task。Storm會(huì)平衡所有worker的task數(shù)量。通過Config.TOPOLOGY_WORKERS來(lái)設(shè)置topology的worker數(shù)量。


個(gè)人主頁(yè): http://www.howardliu.cn

個(gè)人博文: storm筆記:storm基本概念

CSDN主頁(yè): http://blog.csdn.net/liuxinghao

CSDN博文: storm筆記:storm基本概念

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Storm入門系列之一:storm核心概念及特性 本文的將介紹一些 storm 入門的基礎(chǔ)知識(shí),包括 storm ...
    zhaif閱讀 3,388評(píng)論 0 17
  • 什么是實(shí)時(shí)流計(jì)算? 主要的處理模式可以分為:流處理,批處理 流處理是直接處理,有時(shí)也分為在線,離線,近線(st...
    Bloo_m閱讀 5,202評(píng)論 1 1
  • 一. wordCount Topology開發(fā): 1.spout數(shù)據(jù)收集器(SentenceSpout類): 有...
    奉先閱讀 1,286評(píng)論 0 0
  • Storm 系統(tǒng)中包含以下幾個(gè)基本概念:拓?fù)洌═opologies)流(Streams)數(shù)據(jù)源(Spouts)數(shù)據(jù)...
    發(fā)光的魚閱讀 884評(píng)論 0 0
  • 清凌絕對(duì)是個(gè)有主見的姑娘。她從小就有自己的想法,吃還是不吃,買還是不買。她想要的東西,任憑踏遍千山萬(wàn)水也要尋到,就...
    娑喬閱讀 500評(píng)論 0 0

友情鏈接更多精彩內(nèi)容