18. Storm基礎(chǔ)

1. Storm介紹:

Storm是實(shí)時(shí)流計(jì)算框架。企業(yè)中典型實(shí)時(shí)分析框架搭建模式: Flume + Kafka + Storm + Hbase ,對(duì)這類要求分析結(jié)果能妙級(jí)甚至毫秒級(jí)反饋的場(chǎng)景,需要用專門的實(shí)時(shí)分析框架,例如Storm和Spark Streaming。
最初由BackType公司研發(fā)出來(lái),2011年7月被Twitter公司收購(gòu),Twitter公司將Storm應(yīng)用到他們的海量數(shù)據(jù)實(shí)時(shí)處理上。Storm核心部分是由Clojure語(yǔ)言編寫的,Clojure是面向函數(shù)式編程語(yǔ)言,運(yùn)行在JVM上。阿里將Storm核心部分用java改寫,同時(shí)對(duì)原先Storm性能不好的地方做了優(yōu)化,這就是JStorm,目前Strom和JStorm都由Apache來(lái)維護(hù)。

1. 使用場(chǎng)景:

1.信息流的實(shí)時(shí)處理:例如實(shí)時(shí)抓拍汽車行車速度,計(jì)算判斷是否超速并實(shí)時(shí)處罰。
2.實(shí)時(shí)日志分析
3.分布式RPC(遠(yuǎn)程過(guò)程調(diào)用) DRPC: 查詢請(qǐng)求 --> storm處理 -->實(shí)時(shí)返回結(jié)果。

2.特性

1.分布式框架。
2.可靠性高、容錯(cuò)性高。
3.計(jì)算結(jié)果可靠:消息可靠性保障機(jī)制。
4.性能高、處理速度快。為了提高性能,可以不使用storm的消息可靠性保障機(jī)制。

3.與hadoop的區(qū)別和聯(lián)系

Storm并不屬于Hadoop生態(tài)系統(tǒng)的框架。MapReduce是離線數(shù)據(jù)分析,批次處理數(shù)據(jù),數(shù)據(jù)處理完成后程序就會(huì)終止;Storm是實(shí)時(shí)數(shù)據(jù)分析,任務(wù)提交后會(huì)一直循環(huán)運(yùn)行。
Storm其實(shí)也可以進(jìn)行批量處理數(shù)據(jù),但是不會(huì)這么用,可以讀取HDFS上的文件,一行一行處理,直到文件處理結(jié)束。

2. Storm架構(gòu):

Storm是Master-Slaves 主從架構(gòu)(與hadoop等一致)。主節(jié)點(diǎn)是Nimbus,從節(jié)點(diǎn)是Supervisor。主節(jié)點(diǎn)和從節(jié)點(diǎn)借助Zookeeper集群來(lái)溝通。
Storm集群的各個(gè)組件本身都是無(wú)狀態(tài)的(不保存狀態(tài)信息),例如nimbus、supervisor、worker等,所有的狀態(tài)信息都保存在zookeeper集群中。這樣的話,集群的可靠性非常高,某個(gè)Nimbus(或者Supervisor)宕機(jī)后,只需要重啟一個(gè)Nimbus節(jié)點(diǎn)(或者Supervisor節(jié)點(diǎn)),再?gòu)腪ookeeper集群中讀取狀態(tài)信息就可以了。

Storm整體架構(gòu).png

1.Nimbus主節(jié)點(diǎn):

1.監(jiān)控從節(jié)點(diǎn)Supervisor的狀態(tài):Supervisor啟動(dòng)和運(yùn)行過(guò)程中會(huì)定時(shí)將心跳狀態(tài)信息發(fā)送到Zookeeper上,也就是在Zookeeper集群上創(chuàng)建Znode節(jié)點(diǎn),Nimbus監(jiān)聽(tīng)該Znode節(jié)點(diǎn)來(lái)監(jiān)控Supervisor節(jié)點(diǎn)的狀態(tài)。
2.接收客戶端任務(wù)的提交Topology
3.進(jìn)行任務(wù)代碼的分發(fā)、任務(wù)Task的分配協(xié)調(diào),異常任務(wù)的重新分配。

2.Supervisor從節(jié)點(diǎn):

Supervisor是真正的工作節(jié)點(diǎn)。Supervisor接收到任務(wù)分配后,啟動(dòng)Worker節(jié)點(diǎn)(一個(gè)或者多個(gè))。
1.獲取任務(wù):Nimbus分配任務(wù)的時(shí)候,將任務(wù)信息發(fā)送到Zookeeper上,Supervisor監(jiān)聽(tīng)相應(yīng)的Znode節(jié)點(diǎn)信息,拿到分配給他的任務(wù)。
2.獲取到任務(wù)后,啟動(dòng)相應(yīng)的Worker進(jìn)程,Worker進(jìn)程數(shù)根據(jù)具體的Topology來(lái)決定的,之后監(jiān)控Worker進(jìn)程狀態(tài)(Worker進(jìn)程啟動(dòng)好之后,在運(yùn)行過(guò)程中也會(huì)將心跳狀態(tài)信息發(fā)送到zookeeper上,supervisor也是通過(guò)監(jiān)聽(tīng)znode節(jié)點(diǎn)信息來(lái)監(jiān)控worker進(jìn)程的)

3.Worker進(jìn)程(JVM):

Worker進(jìn)程不是常駐進(jìn)程(Nimbus和Supervisor是Storm框架的常駐進(jìn)程),并且跟具體的Topology相關(guān)。Worker進(jìn)程上具體運(yùn)行相應(yīng)的Topology的Task任務(wù)。
Worker進(jìn)程啟動(dòng)Executor線程。Task運(yùn)行時(shí)真正地跑在線程上,Executor線程真正執(zhí)行Task。

4.Zookeeper集群:

用來(lái)存儲(chǔ)各節(jié)點(diǎn)、各組件的狀態(tài)信息。

5.topology任務(wù)執(zhí)行整體流程:

Client端向Storm集群提交Topology程序,Nimbus接收到Topology程序后進(jìn)行任務(wù)分配,將執(zhí)行代碼以及相關(guān)的配置信息分發(fā)到各個(gè)Supervisor上(注意,這里不通過(guò)zookeeper集群。通過(guò)Thrift直接分發(fā)到Supervisor節(jié)點(diǎn)。)。將任務(wù)分配信息發(fā)送到Zookeeper集群上,Supervisor從Zookeeper上獲取相應(yīng)的任務(wù),根據(jù)任務(wù)的要求啟動(dòng)Worker進(jìn)程,Worker進(jìn)程啟動(dòng)后,Worker進(jìn)程會(huì)啟動(dòng)一些Executor線程(也是根據(jù)任務(wù)的要求啟動(dòng))。Executor線程才是最終真正執(zhí)行Task邏輯的組件。

3. Storm集群環(huán)境搭建:

集群環(huán)境的搭建,需要根據(jù)官網(wǎng)文檔指導(dǎo),選用的版本是比較穩(wěn)定的0.9.6版本。官網(wǎng)文檔:http://storm.apache.org/releases/0.9.6/Setting-up-a-Storm-cluster.html
1.創(chuàng)建zookeeper集群;
2.在Nimbus和worker機(jī)器上安裝前置依賴(java和Python);
3.在Nimbus和worker機(jī)器上下載和解壓Strom安裝包;
4.在配置文件storm.yaml中增加必要配置;
5.啟動(dòng)Storm進(jìn)程。

前置安裝內(nèi)容:網(wǎng)絡(luò)、IP、主機(jī)名、主機(jī)名和IP的映射關(guān)系、免密碼登陸、zookeeper集群、JDK、Python安裝。

1. 搭建Zookeeper集群:

Zookeeper集群的搭建方法,前面文章已有介紹。在zookeeper配置中需要做些修改,刪掉以下兩項(xiàng)的注釋:

autopurge.snapRetainCount=3
autopurge.purgeInterval=1
$ scp /opt/modules/zookeeper-3.4.5-cdh5.3.6/conf/zoo.cfg natty@hadoop-senior02.pmpa.com:/opt/modules/zookeeper-3.4.5-cdh5.3.6/conf/

autopurge.snapRetainCount是Zookeeper的快照的保存的配置,如果不開(kāi)啟此項(xiàng),zookeeper的快照會(huì)很快占用很大的空間。自動(dòng)合并,每隔1小時(shí)自動(dòng)合并zookeeper上的數(shù)據(jù),相當(dāng)于日志清理。
Zookeeper的批量啟動(dòng)腳本:

#!/bin/bash

if [ $# -ne 1 ] 
then
   echo "Useage: sh zkServer_batch.sh [start|status|stop]"
   exit 2
fi

for node in 192.168.8.128 192.168.8.129 192.168.8.130
do
    echo "$1 in $node"
    ssh $node "source /etc/profile && /opt/modules/zookeeper-3.4.5-cdh5.3.6/bin/zkServer.sh $1"
done

免密登陸,telnet XXX:3888 如何開(kāi)通3888端口的訪問(wèn)。

2. 安裝Storm集群:

1.檢查python的版本,一般情況下CentOS都會(huì)安裝python。對(duì)于該版本storm,python不能是3.x版本,必須是2.x版本。

$ python --version

2.下載并解壓storm release文件:

$ tar zxf apache-storm-0.9.6.tar.gz -C /opt/modules/

3.修改配置文件conf/storm.yaml:

storm.zookeeper.servers:
     - "hadoop-senior01.pmpa.com"
     - "hadoop-senior02.pmpa.com"
     - "hadoop-senior03.pmpa.com"

nimbus.host: "hadoop-senior01.pmpa.com"

storm.local.dir: "/opt/modules/apache-storm-0.9.6/local"

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
    - 6704

ui.port: 8081

默認(rèn)日志會(huì)寫到根目錄下:

01:13:54,021 |-ERROR in ch.qos.logback.core.rolling.RollingFileAppender[A1] - openFile(/nimbus.log,true) call failed. java.io.FileNotFoundException: /nimbus.log (Permission denied)

啟動(dòng)Storm 的 nimbus和 supervisor:

$ bin/storm nimbus > ./logs/nimbus.out 2>&1 &
$ bin/storm supervisor > ./logs/supervisor.out 2>&1 &

啟動(dòng)UI:

$ bin/storm ui > ./logs/ui.out 2>&1 &

測(cè)試提交topology:

$ bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount
Paste_Image.png
topology視圖.png

4. Storm集群?jiǎn)⑼D_本:

Nimbus topology任務(wù)提交后,程序是運(yùn)行在supervisor節(jié)點(diǎn)上,nimbus不參與程序的運(yùn)行。
如果nimbus出現(xiàn)故障,不能提交topology,但是已經(jīng)提交了的topology還是正常運(yùn)行在集群上的。已經(jīng)運(yùn)行在集群上的topology,如果這時(shí)候某些task出現(xiàn)異常則無(wú)法重新分配節(jié)點(diǎn)。

1.查看topology的日志:

上邊我們啟動(dòng)了例子的wordcount topology,如下圖:

Storm UI界面.png

為了查看Topology(wordcount)的運(yùn)行日志, 需要啟動(dòng)進(jìn)程logviewer,需要在每個(gè)supervisor節(jié)點(diǎn)上啟動(dòng),不用在nimbus節(jié)點(diǎn)啟動(dòng)。

$ bin/storm logviewer > ./logs/logviewer.out 2>&1 &

在supervisor上啟動(dòng)了logviewer后,就可以在Storm UI界面上查看wordcount的日志了。在ui界面上點(diǎn)擊“wordcount”(topology名稱)。


Topology日志.png

上邊可以看到topology的組件 Spouts、Bolts等等。

2.停止Topology:

1.可以在ui界面上停止topology:


UI界面控制Topology.png

Activate : 激活
Deactivate :暫停
Rebalance : 當(dāng)動(dòng)態(tài)添加了一個(gè)Supervisor節(jié)點(diǎn),想要讓Topology部分任務(wù)能使用該新增的Supervisor,那就可以通過(guò)Rebalance實(shí)現(xiàn)。
Kill : 將Topology從Storm集群上移除。

2.通過(guò)命令行停止:
可以使用下命令kill掉topology:

$ bin/storm kill wordcount

其中wordcount是你啟動(dòng)topology時(shí),所指定的名稱。

3.在Zookeeper查看存儲(chǔ)的Storm節(jié)點(diǎn)信息:

為了查看zookeeper存儲(chǔ)的節(jié)點(diǎn)信息,需要打開(kāi)zookeeper的客戶端。選擇任何一個(gè)zookeeper節(jié)點(diǎn),執(zhí)行命令:

$ bin/zkCli.sh 

[zk: localhost:2181(CONNECTED) 1] ls /
[storm, hbase, zookeeper]

[zk: localhost:2181(CONNECTED) 2] ls /storm
[workerbeats, errors, supervisors, storms, assignments]

在zookeeper的根節(jié)點(diǎn)下,有個(gè)Storm子節(jié)點(diǎn)。也就是,在zookeeper中有一個(gè)storm Znode,而在storm Znode下有5個(gè)子Znode:workerbeats, errors, supervisors, storms, assignments。

/workerbeats : worker的心跳信息;
/errors : topology運(yùn)行過(guò)程中Task運(yùn)行異常信息(Task在哪個(gè)Supervisor上運(yùn)行失敗的,nimbus需要對(duì)異常任務(wù)重新分配);
/supervisors :記錄supervisor狀態(tài)心跳信息;
/storms :記錄topology任務(wù)信息(哪個(gè)jar包,jar包位置等)
/assignments :記錄的是topology任務(wù)的分配信息。

4.storm的nimubs、supervisor、ui、logviewer進(jìn)程的關(guān)閉:

不像Hadoop,Storm沒(méi)有提供這些進(jìn)程的stop腳本??梢酝ㄟ^(guò)kill -9 方式來(lái)關(guān)閉這些進(jìn)程。查看進(jìn)程pid的方式:

$ ps -ef | grep daemon.nimbus | awk '{print $2}' | head -n 1

在關(guān)閉進(jìn)程前,一定記得先要關(guān)閉topology。 下面腳本(在主節(jié)點(diǎn)上執(zhí)行)完成storm的批量啟停。

5.storm的批量啟動(dòng)腳本:

先創(chuàng)建一個(gè)文件,來(lái)保存supervisor節(jié)點(diǎn)的主機(jī)名,supervisor_cluster.conf :

hadoop-senior02.pmpa.com
hadoop-senior03.pmpa.com

Storm批量啟動(dòng)腳本,start_storm.sh

#!/bin/bash

source /etc/profile
STROM_HOME=/opt/modules/apache-storm-0.9.6

##主節(jié)點(diǎn):?jiǎn)?dòng)nimubs和ui
${STORM_HOME}/bin/storm nimbus > /dev/null 2>&1 &
${STORM_HOME}/bin/storm ui > /dev/null 2>&1 &

##從節(jié)點(diǎn),啟動(dòng)supervisor和logviewer:
for supervisor in `cat supervisor_cluster.conf`
do
      ssh ${supervisor} "source /etc/profile && ${STORM_HOME}/bin/storm supervisor> /dev/null 2>&1 &" & 
      ssh ${supervisor} "source /etc/profile && ${STORM_HOME}/bin/storm logviewer> /dev/null 2>&1 &" &
done

6.storm的批量停止腳本:

在執(zhí)行該腳本之前,必須要保證所有的topology關(guān)閉。storm的關(guān)閉需要?dú)⒌鬾imbus和supervisor上的所有的進(jìn)程。

#!/bin/bash

# author: natty    date:2017-08-22
# Storm batch close. Kill these processes: nimubs,supervisor,ui,logviewer

source /etc/profile
kill -9 `ps -ef | grep nimbus.daemon | awk '{print $2}'`
kill -9 `ps -ef | grep ui.daemon | awk '{print $2}'`

for supervisor in `cat supervisor_cluster.conf`
do
   ssh ${supervisor} "source /etc/profile && kill -9 `ps -ef | grep supervisor.daemon | awk '{print $2}'`" &
   ssh ${supervisor} "source /etc/profile && kill -9 `ps -ef | grep logviewer.daemon | awk '{print $2}'`" &
done
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容