運行中的topology由這些組成:worker processes, executors and tasks
Storm集群的topology主要由以下實體組成:
- Worker processes
- Executors (threads)
-
Tasks
它們的關(guān)系如下:
The relationships of worker processes, executors (threads) and tasks in Storm
Worker processes執(zhí)行topology的一個子集,它屬于某個topology,可以運行一個或多個executors。在集群中運行的topology一般都有多個Worker processes跑在多個機(jī)器上。
executor 是Worker 創(chuàng)建的線程,它可能會運行同一個組件的(spout或bolt)一個或多個任務(wù)。
task真正負(fù)責(zé)數(shù)據(jù)處理。你代碼中實現(xiàn)的spout或bolt在集群中以task對形式執(zhí)行。
在topology的整個生命周期中,task的數(shù)量都不會變,但是executors(threads)的數(shù)量可能會經(jīng)常變化。下面這個等式總是正確的:#threads ≤ #tasks
默認(rèn)情況下,task和executors數(shù)量相同,也就是說,Storm會每個線程執(zhí)行一個任務(wù)。
配置topology的并行度
Storm的并行度指的是組件最開始的executor數(shù)量。在這篇文章中,并行度是一個更寬泛的概念,你不僅可以配置executor的數(shù)量,也可以配置worker和task的數(shù)量。
下面的章節(jié)給出了多種配置選項以及如何在代碼中設(shè)置它們。Storm目前的配置獲取順序:defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration。
Number of worker processes
- 配置選項:TOPOLOGY_WORKERS
- 如何在代碼中設(shè)置(例子):Config#setNumWorkers
Number of executors (threads)
- 配置選項:無(給setSpout或setBolt傳parallelism_hint參數(shù))
- 如何在代碼中設(shè)置:
TopologyBuilder#setSpout()
TopologyBuilder#setBolt()
Number of tasks
- 配置選項:TOPOLOGY_TASKS
- 如何在代碼中設(shè)置:
ComponentConfigurationDeclarer#setNumTasks()
代碼片段:
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout");
上面的代碼配置Storm以2個executors (threads)和4個tasks來運行GreenBolt,也就是每個線程跑兩個任務(wù)。
一個正在運行的topology的例子
例子中但topology包含三個組件:BlueSpout、GreenBolt和YellowBolt。BlueSpout的輸出交給GreenBolt,GreenBolt的輸出交給YellowBolt。

如下是相關(guān)配置代碼:
Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout");
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
.shuffleGrouping("green-bolt");
StormSubmitter.submitTopology(
"mytopology",
conf,
topologyBuilder.createTopology()
);
當(dāng)然,Storm也有其他的配置設(shè)置來控制topology的并發(fā)度,包括:
- TOPOLOGY_MAX_TASK_PARALLELISM: 這個配置限制了每個組件的最大executors數(shù)量。一般在本地模式下運行topology時,會用這個配置來限制創(chuàng)建的線程數(shù)??梢赃@樣設(shè)置:Config#setMaxTaskParallelism()。
如何更改運行中的topology的并發(fā)度
Storm的一大優(yōu)點就是可以不用重啟集群或topology就更改worker和executors的數(shù)量。這種操作稱為再平衡。
有兩種再平衡topology的方式:
- Storm web UI
- CLI
這是使用CLI的例子:
//Reconfigure the topology "mytopology" to use worker processes,
//the spout "blue-spout" to use 3 executors and
//the bolt "yellow-bolt" to use 10 executors.
$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
