1. Storm介紹:
Storm是實(shí)時(shí)流計(jì)算框架。企業(yè)中典型實(shí)時(shí)分析框架搭建模式: Flume + Kafka + Storm + Hbase ,對(duì)這類要求分析結(jié)果能妙級(jí)甚至毫秒級(jí)反饋的場(chǎng)景,需要用專門的實(shí)時(shí)分析框架,例如Storm和Spark Streaming。
最初由BackType公司研發(fā)出來(lái),2011年7月被Twitter公司收購(gòu),Twitter公司將Storm應(yīng)用到他們的海量數(shù)據(jù)實(shí)時(shí)處理上。Storm核心部分是由Clojure語(yǔ)言編寫的,Clojure是面向函數(shù)式編程語(yǔ)言,運(yùn)行在JVM上。阿里將Storm核心部分用java改寫,同時(shí)對(duì)原先Storm性能不好的地方做了優(yōu)化,這就是JStorm,目前Strom和JStorm都由Apache來(lái)維護(hù)。
1. 使用場(chǎng)景:
1.信息流的實(shí)時(shí)處理:例如實(shí)時(shí)抓拍汽車行車速度,計(jì)算判斷是否超速并實(shí)時(shí)處罰。
2.實(shí)時(shí)日志分析
3.分布式RPC(遠(yuǎn)程過(guò)程調(diào)用) DRPC: 查詢請(qǐng)求 --> storm處理 -->實(shí)時(shí)返回結(jié)果。
2.特性
1.分布式框架。
2.可靠性高、容錯(cuò)性高。
3.計(jì)算結(jié)果可靠:消息可靠性保障機(jī)制。
4.性能高、處理速度快。為了提高性能,可以不使用storm的消息可靠性保障機(jī)制。
3.與hadoop的區(qū)別和聯(lián)系
Storm并不屬于Hadoop生態(tài)系統(tǒng)的框架。MapReduce是離線數(shù)據(jù)分析,批次處理數(shù)據(jù),數(shù)據(jù)處理完成后程序就會(huì)終止;Storm是實(shí)時(shí)數(shù)據(jù)分析,任務(wù)提交后會(huì)一直循環(huán)運(yùn)行。
Storm其實(shí)也可以進(jìn)行批量處理數(shù)據(jù),但是不會(huì)這么用,可以讀取HDFS上的文件,一行一行處理,直到文件處理結(jié)束。
2. Storm架構(gòu):
Storm是Master-Slaves 主從架構(gòu)(與hadoop等一致)。主節(jié)點(diǎn)是Nimbus,從節(jié)點(diǎn)是Supervisor。主節(jié)點(diǎn)和從節(jié)點(diǎn)借助Zookeeper集群來(lái)溝通。
Storm集群的各個(gè)組件本身都是無(wú)狀態(tài)的(不保存狀態(tài)信息),例如nimbus、supervisor、worker等,所有的狀態(tài)信息都保存在zookeeper集群中。這樣的話,集群的可靠性非常高,某個(gè)Nimbus(或者Supervisor)宕機(jī)后,只需要重啟一個(gè)Nimbus節(jié)點(diǎn)(或者Supervisor節(jié)點(diǎn)),再?gòu)腪ookeeper集群中讀取狀態(tài)信息就可以了。

1.Nimbus主節(jié)點(diǎn):
1.監(jiān)控從節(jié)點(diǎn)Supervisor的狀態(tài):Supervisor啟動(dòng)和運(yùn)行過(guò)程中會(huì)定時(shí)將心跳狀態(tài)信息發(fā)送到Zookeeper上,也就是在Zookeeper集群上創(chuàng)建Znode節(jié)點(diǎn),Nimbus監(jiān)聽(tīng)該Znode節(jié)點(diǎn)來(lái)監(jiān)控Supervisor節(jié)點(diǎn)的狀態(tài)。
2.接收客戶端任務(wù)的提交Topology
3.進(jìn)行任務(wù)代碼的分發(fā)、任務(wù)Task的分配協(xié)調(diào),異常任務(wù)的重新分配。
2.Supervisor從節(jié)點(diǎn):
Supervisor是真正的工作節(jié)點(diǎn)。Supervisor接收到任務(wù)分配后,啟動(dòng)Worker節(jié)點(diǎn)(一個(gè)或者多個(gè))。
1.獲取任務(wù):Nimbus分配任務(wù)的時(shí)候,將任務(wù)信息發(fā)送到Zookeeper上,Supervisor監(jiān)聽(tīng)相應(yīng)的Znode節(jié)點(diǎn)信息,拿到分配給他的任務(wù)。
2.獲取到任務(wù)后,啟動(dòng)相應(yīng)的Worker進(jìn)程,Worker進(jìn)程數(shù)根據(jù)具體的Topology來(lái)決定的,之后監(jiān)控Worker進(jìn)程狀態(tài)(Worker進(jìn)程啟動(dòng)好之后,在運(yùn)行過(guò)程中也會(huì)將心跳狀態(tài)信息發(fā)送到zookeeper上,supervisor也是通過(guò)監(jiān)聽(tīng)znode節(jié)點(diǎn)信息來(lái)監(jiān)控worker進(jìn)程的)
3.Worker進(jìn)程(JVM):
Worker進(jìn)程不是常駐進(jìn)程(Nimbus和Supervisor是Storm框架的常駐進(jìn)程),并且跟具體的Topology相關(guān)。Worker進(jìn)程上具體運(yùn)行相應(yīng)的Topology的Task任務(wù)。
Worker進(jìn)程啟動(dòng)Executor線程。Task運(yùn)行時(shí)真正地跑在線程上,Executor線程真正執(zhí)行Task。
4.Zookeeper集群:
用來(lái)存儲(chǔ)各節(jié)點(diǎn)、各組件的狀態(tài)信息。
5.topology任務(wù)執(zhí)行整體流程:
Client端向Storm集群提交Topology程序,Nimbus接收到Topology程序后進(jìn)行任務(wù)分配,將執(zhí)行代碼以及相關(guān)的配置信息分發(fā)到各個(gè)Supervisor上(注意,這里不通過(guò)zookeeper集群。通過(guò)Thrift直接分發(fā)到Supervisor節(jié)點(diǎn)。)。將任務(wù)分配信息發(fā)送到Zookeeper集群上,Supervisor從Zookeeper上獲取相應(yīng)的任務(wù),根據(jù)任務(wù)的要求啟動(dòng)Worker進(jìn)程,Worker進(jìn)程啟動(dòng)后,Worker進(jìn)程會(huì)啟動(dòng)一些Executor線程(也是根據(jù)任務(wù)的要求啟動(dòng))。Executor線程才是最終真正執(zhí)行Task邏輯的組件。
3. Storm集群環(huán)境搭建:
集群環(huán)境的搭建,需要根據(jù)官網(wǎng)文檔指導(dǎo),選用的版本是比較穩(wěn)定的0.9.6版本。官網(wǎng)文檔:http://storm.apache.org/releases/0.9.6/Setting-up-a-Storm-cluster.html
1.創(chuàng)建zookeeper集群;
2.在Nimbus和worker機(jī)器上安裝前置依賴(java和Python);
3.在Nimbus和worker機(jī)器上下載和解壓Strom安裝包;
4.在配置文件storm.yaml中增加必要配置;
5.啟動(dòng)Storm進(jìn)程。
前置安裝內(nèi)容:網(wǎng)絡(luò)、IP、主機(jī)名、主機(jī)名和IP的映射關(guān)系、免密碼登陸、zookeeper集群、JDK、Python安裝。
1. 搭建Zookeeper集群:
Zookeeper集群的搭建方法,前面文章已有介紹。在zookeeper配置中需要做些修改,刪掉以下兩項(xiàng)的注釋:
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
$ scp /opt/modules/zookeeper-3.4.5-cdh5.3.6/conf/zoo.cfg natty@hadoop-senior02.pmpa.com:/opt/modules/zookeeper-3.4.5-cdh5.3.6/conf/
autopurge.snapRetainCount是Zookeeper的快照的保存的配置,如果不開(kāi)啟此項(xiàng),zookeeper的快照會(huì)很快占用很大的空間。自動(dòng)合并,每隔1小時(shí)自動(dòng)合并zookeeper上的數(shù)據(jù),相當(dāng)于日志清理。
Zookeeper的批量啟動(dòng)腳本:
#!/bin/bash
if [ $# -ne 1 ]
then
echo "Useage: sh zkServer_batch.sh [start|status|stop]"
exit 2
fi
for node in 192.168.8.128 192.168.8.129 192.168.8.130
do
echo "$1 in $node"
ssh $node "source /etc/profile && /opt/modules/zookeeper-3.4.5-cdh5.3.6/bin/zkServer.sh $1"
done
免密登陸,telnet XXX:3888 如何開(kāi)通3888端口的訪問(wèn)。
2. 安裝Storm集群:
1.檢查python的版本,一般情況下CentOS都會(huì)安裝python。對(duì)于該版本storm,python不能是3.x版本,必須是2.x版本。
$ python --version
2.下載并解壓storm release文件:
$ tar zxf apache-storm-0.9.6.tar.gz -C /opt/modules/
3.修改配置文件conf/storm.yaml:
storm.zookeeper.servers:
- "hadoop-senior01.pmpa.com"
- "hadoop-senior02.pmpa.com"
- "hadoop-senior03.pmpa.com"
nimbus.host: "hadoop-senior01.pmpa.com"
storm.local.dir: "/opt/modules/apache-storm-0.9.6/local"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 6704
ui.port: 8081
默認(rèn)日志會(huì)寫到根目錄下:
01:13:54,021 |-ERROR in ch.qos.logback.core.rolling.RollingFileAppender[A1] - openFile(/nimbus.log,true) call failed. java.io.FileNotFoundException: /nimbus.log (Permission denied)
啟動(dòng)Storm 的 nimbus和 supervisor:
$ bin/storm nimbus > ./logs/nimbus.out 2>&1 &
$ bin/storm supervisor > ./logs/supervisor.out 2>&1 &
啟動(dòng)UI:
$ bin/storm ui > ./logs/ui.out 2>&1 &
測(cè)試提交topology:
$ bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount


4. Storm集群?jiǎn)⑼D_本:
Nimbus topology任務(wù)提交后,程序是運(yùn)行在supervisor節(jié)點(diǎn)上,nimbus不參與程序的運(yùn)行。
如果nimbus出現(xiàn)故障,不能提交topology,但是已經(jīng)提交了的topology還是正常運(yùn)行在集群上的。已經(jīng)運(yùn)行在集群上的topology,如果這時(shí)候某些task出現(xiàn)異常則無(wú)法重新分配節(jié)點(diǎn)。
1.查看topology的日志:
上邊我們啟動(dòng)了例子的wordcount topology,如下圖:

為了查看Topology(wordcount)的運(yùn)行日志, 需要啟動(dòng)進(jìn)程logviewer,需要在每個(gè)supervisor節(jié)點(diǎn)上啟動(dòng),不用在nimbus節(jié)點(diǎn)啟動(dòng)。
$ bin/storm logviewer > ./logs/logviewer.out 2>&1 &
在supervisor上啟動(dòng)了logviewer后,就可以在Storm UI界面上查看wordcount的日志了。在ui界面上點(diǎn)擊“wordcount”(topology名稱)。

上邊可以看到topology的組件 Spouts、Bolts等等。
2.停止Topology:
1.可以在ui界面上停止topology:

Activate : 激活
Deactivate :暫停
Rebalance : 當(dāng)動(dòng)態(tài)添加了一個(gè)Supervisor節(jié)點(diǎn),想要讓Topology部分任務(wù)能使用該新增的Supervisor,那就可以通過(guò)Rebalance實(shí)現(xiàn)。
Kill : 將Topology從Storm集群上移除。
2.通過(guò)命令行停止:
可以使用下命令kill掉topology:
$ bin/storm kill wordcount
其中wordcount是你啟動(dòng)topology時(shí),所指定的名稱。
3.在Zookeeper查看存儲(chǔ)的Storm節(jié)點(diǎn)信息:
為了查看zookeeper存儲(chǔ)的節(jié)點(diǎn)信息,需要打開(kāi)zookeeper的客戶端。選擇任何一個(gè)zookeeper節(jié)點(diǎn),執(zhí)行命令:
$ bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 1] ls /
[storm, hbase, zookeeper]
[zk: localhost:2181(CONNECTED) 2] ls /storm
[workerbeats, errors, supervisors, storms, assignments]
在zookeeper的根節(jié)點(diǎn)下,有個(gè)Storm子節(jié)點(diǎn)。也就是,在zookeeper中有一個(gè)storm Znode,而在storm Znode下有5個(gè)子Znode:workerbeats, errors, supervisors, storms, assignments。
/workerbeats : worker的心跳信息;
/errors : topology運(yùn)行過(guò)程中Task運(yùn)行異常信息(Task在哪個(gè)Supervisor上運(yùn)行失敗的,nimbus需要對(duì)異常任務(wù)重新分配);
/supervisors :記錄supervisor狀態(tài)心跳信息;
/storms :記錄topology任務(wù)信息(哪個(gè)jar包,jar包位置等)
/assignments :記錄的是topology任務(wù)的分配信息。
4.storm的nimubs、supervisor、ui、logviewer進(jìn)程的關(guān)閉:
不像Hadoop,Storm沒(méi)有提供這些進(jìn)程的stop腳本??梢酝ㄟ^(guò)kill -9 方式來(lái)關(guān)閉這些進(jìn)程。查看進(jìn)程pid的方式:
$ ps -ef | grep daemon.nimbus | awk '{print $2}' | head -n 1
在關(guān)閉進(jìn)程前,一定記得先要關(guān)閉topology。 下面腳本(在主節(jié)點(diǎn)上執(zhí)行)完成storm的批量啟停。
5.storm的批量啟動(dòng)腳本:
先創(chuàng)建一個(gè)文件,來(lái)保存supervisor節(jié)點(diǎn)的主機(jī)名,supervisor_cluster.conf :
hadoop-senior02.pmpa.com
hadoop-senior03.pmpa.com
Storm批量啟動(dòng)腳本,start_storm.sh
#!/bin/bash
source /etc/profile
STROM_HOME=/opt/modules/apache-storm-0.9.6
##主節(jié)點(diǎn):?jiǎn)?dòng)nimubs和ui
${STORM_HOME}/bin/storm nimbus > /dev/null 2>&1 &
${STORM_HOME}/bin/storm ui > /dev/null 2>&1 &
##從節(jié)點(diǎn),啟動(dòng)supervisor和logviewer:
for supervisor in `cat supervisor_cluster.conf`
do
ssh ${supervisor} "source /etc/profile && ${STORM_HOME}/bin/storm supervisor> /dev/null 2>&1 &" &
ssh ${supervisor} "source /etc/profile && ${STORM_HOME}/bin/storm logviewer> /dev/null 2>&1 &" &
done
6.storm的批量停止腳本:
在執(zhí)行該腳本之前,必須要保證所有的topology關(guān)閉。storm的關(guān)閉需要?dú)⒌鬾imbus和supervisor上的所有的進(jìn)程。
#!/bin/bash
# author: natty date:2017-08-22
# Storm batch close. Kill these processes: nimubs,supervisor,ui,logviewer
source /etc/profile
kill -9 `ps -ef | grep nimbus.daemon | awk '{print $2}'`
kill -9 `ps -ef | grep ui.daemon | awk '{print $2}'`
for supervisor in `cat supervisor_cluster.conf`
do
ssh ${supervisor} "source /etc/profile && kill -9 `ps -ef | grep supervisor.daemon | awk '{print $2}'`" &
ssh ${supervisor} "source /etc/profile && kill -9 `ps -ef | grep logviewer.daemon | awk '{print $2}'`" &
done