在創(chuàng)建Storm的Topology時,我們通常使用如下代碼:
builder.setBolt("cpp",new CppBolt(), 3).setNumTasks(5).noneGrouping(pre_name);
Config conf = new Config();
conf.setNumWorkers(3);
參數(shù)1:bolt名稱 "cpp"
參數(shù)2:bolt類型 CppBolt
參數(shù)3:bolt的并行數(shù),parallelismNum,即運(yùn)行topology時,該bolt的線程數(shù)
setNumTasks() 設(shè)置bolt的task數(shù)
noneGrouping() 設(shè)置輸入流方式及字段
conf.setNumWorkers()設(shè)置worker數(shù)據(jù)。
經(jīng)過多次試驗(yàn)總結(jié),得出如下結(jié)論:
1)Topology的worker數(shù)通過config設(shè)置,即執(zhí)行該topology的worker(java)進(jìn)程數(shù)。它可以通過storm rebalance 命令任意調(diào)整。
- Topology中某個bolt的executor數(shù),即parallelismNum,即執(zhí)行該bolt的線程數(shù),在setBolt時由第三個參數(shù)指定。它可以通過storm rebalance 命令調(diào)整,但最大不能超過該bolt的task數(shù);
- bolt的task數(shù),通過setNumTasks()設(shè)置。(也可不設(shè)置,默認(rèn)取bolt的executor數(shù)),無法在運(yùn)行時調(diào)整。
4)Bolt實(shí)例數(shù),這個比較特別,它和task數(shù)相等。有多少個task就會new 多少個Bolt對象。而這些Bolt對象在運(yùn)行時由Bolt的thread進(jìn)行調(diào)度。
??也即是說
builder.setBolt("cpp",newCppBolt(),3).setNumTasks(5).noneGrouping(pre_name);
會創(chuàng)建3個線程,但有內(nèi)存中會5個CppBolt對象,三個線程調(diào)度5個對象。
一個運(yùn)行中的拓?fù)涞睦?br>
下 面的圖表展示了1個簡單拓?fù)湓趯?shí)際操作中看起來是怎樣的。這個拓?fù)浒?個組件:
1個spout叫做BlueSpout,
2個bolt分別叫 GreenBolt和YellowBolt。BlueSpout發(fā)送它的輸出到GreenBolt,GreenBolt又把它的輸出發(fā)到 YellowBolt。

下面是對上圖的簡要分析:
3個組件的并發(fā)度加起來是10,就是說拓?fù)湟还灿?0個executor,一共有2個worker,每個worker產(chǎn)生10 / 2 = 5條線程。
綠色的bolt配置成2個executor和4個task。為此每個executor為這個bolt運(yùn)行2個task。
下面的代碼配置了這3個組件,相關(guān)代碼如下:
Config conf = new Config();
conf.setNumWorkers(2); // 使用2個worker進(jìn)程
topologyBuilder.setSpout(“blue-spout”, new BlueSpout(), 2); // parallelism hint為2
topologyBuilder.setBolt(“green-bolt”, new GreenBolt(), 2) .setNumTasks(4) .shuffleGrouping(“blue-spout”);
topologyBuilder.setBolt(“yellow-bolt”, new YellowBolt(), 6) .shuffleGrouping(“green-bolt”);
StormSubmitter.submitTopology( “mytopology”, conf, topologyBuilder.createTopology() );
And of course Storm comes with additional configuration settings to control the parallelism of a topology, including:
此外還有其他的配置來控制拓?fù)涞牟l(fā)度,包括了:
TOPOLOGY_MAX_TASK_PARALLELISM: 這個設(shè)置指定了1個單獨(dú)的組件的executor的數(shù)量的上限。當(dāng)在測試階段使用本地模式運(yùn)行1個拓?fù)鋾r,用來限制生成的線程的數(shù)量。你可以像下面這樣來使用:
Config#setMaxTaskParallelism()
如何改變1個正在運(yùn)行的拓?fù)涞牟l(fā)度
Storm有一個不錯的特性,你可以在不需要重啟集群或拓?fù)?,來增加或減少worker進(jìn)程和executor的數(shù)量。這樣行為成為rebalancing。
你有兩個選項(xiàng)可以rebalance1個拓?fù)洌?/p>
- 使用Storm的web UI來rebalance。
- 像下面描述的那樣,使用命令行工具來做:
# 重新配置拓?fù)?“mytopology” 使用5個worker進(jìn)程。
# spout “blue-spout” 使用3個executor
# bolt “yellow-bolt” 使用10個executor
$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10