Date: Nov 17-24, 2017
1. 目的
- 積累Storm為主的流式大數(shù)據(jù)處理平臺(tái)對(duì)實(shí)時(shí)數(shù)據(jù)處理的相關(guān)技術(shù)
- 積累快捷的Storm部署、開發(fā)方式,例如Python和Java。
2. 閱讀資料
- Apache Storm官網(wǎng)Tutorial
- 阿里巴巴JStorm文檔
- intsmaze's blog
- Java 基礎(chǔ) Serializable 的使用
- Java 高級(jí) Serializable 序列化的源碼分析
- ITindex Storm 系列
3. 閱讀筆記
3.1 Apache Storm官網(wǎng)
3.1.1 Storm主要結(jié)構(gòu)概覽

如上圖所示,Storm是一個(gè)流數(shù)據(jù)處理平臺(tái)。它與Hadoop相近,采用Map-Reduce的計(jì)算框架,區(qū)別在于Hadoop的worker在完成工作后被釋放,而Storm的worker在完成工作后進(jìn)入等待狀態(tài)——等待“上級(jí)”分配下一個(gè)任務(wù)。
Storm的本質(zhì)是定義一個(gè)計(jì)算的過程,類似于設(shè)計(jì)中的數(shù)據(jù)流圖,即先定義數(shù)據(jù)處理的流程,再分模塊實(shí)現(xiàn)數(shù)據(jù)處理的細(xì)節(jié),結(jié)果由末端的節(jié)點(diǎn)返回或輸出。
Storm的核心是Clojure編寫、提供Java開發(fā)接口,核心離工業(yè)解主流編程語言(Java、C/C++)相對(duì)遙遠(yuǎn),阿里巴巴的工程師團(tuán)隊(duì)用Java重寫了Storm的核心,即為JStorm。
3.2 阿里巴巴JStorm
3.2.1 JStorm定位
JStorm 是一個(gè)分布式實(shí)時(shí)計(jì)算引擎。
JStorm 是一個(gè)類似Hadoop MapReduce的系統(tǒng), 用戶按照指定的接口實(shí)現(xiàn)一個(gè)任務(wù),然后將這個(gè)任務(wù)遞交給JStorm系統(tǒng),JStorm將這個(gè)任務(wù)跑起來,并且按7 * 24小時(shí)運(yùn)行起來,一旦中間一個(gè)Worker 發(fā)生意外故障, 調(diào)度器立即分配一個(gè)新的Worker替換這個(gè)失效的Worker。
因此,從應(yīng)用的角度,JStorm應(yīng)用是一種遵守某種編程規(guī)范的分布式應(yīng)用。從系統(tǒng)角度, JStorm是一套類似MapReduce的調(diào)度系統(tǒng)。 從數(shù)據(jù)的角度,JStorm是一套基于流水線的消息處理機(jī)制。
| JStorm | Hadoop | |
|---|---|---|
| 角色 | Nimubs | JobTracker |
| Supervisor | TaskTracker | |
| Worker | Child | |
| 應(yīng)用名稱 | Topology | Job |
| 編程接口 | Spout/Bolt | Mapper/Reducer |
| “設(shè)計(jì)模式” | 資本主義 | 恐怖主義 |
3.2.2 優(yōu)點(diǎn)
在Storm和JStorm出現(xiàn)以前,市面上出現(xiàn)很多實(shí)時(shí)計(jì)算引擎,但自Storm和JStorm出現(xiàn)后,基本上可以說一統(tǒng)江湖: 究其優(yōu)點(diǎn):
- 開發(fā)非常迅速:接口簡(jiǎn)單,容易上手,只要遵守Topology、Spout和Bolt的編程規(guī)范即可開發(fā)出一個(gè)擴(kuò)展性極好的應(yīng)用,底層RPC、Worker之間冗余,數(shù)據(jù)分流之類的動(dòng)作完全不用考慮
- 擴(kuò)展性極好:當(dāng)一級(jí)處理單元速度,直接配置一下并發(fā)數(shù),即可線性擴(kuò)展性能
- 健壯強(qiáng):當(dāng)Worker失效或機(jī)器出現(xiàn)故障時(shí), 自動(dòng)分配新的Worker替換失效Worker
- 數(shù)據(jù)準(zhǔn)確性:可以采用Ack機(jī)制,保證數(shù)據(jù)不丟失。 如果對(duì)精度有更多一步要求,采用事務(wù)機(jī)制,保證數(shù)據(jù)準(zhǔn)確。
- 實(shí)時(shí)性高: JStorm 的設(shè)計(jì)偏向單行記錄,因此,在時(shí)延較同類產(chǎn)品更低
3.2.3 應(yīng)用場(chǎng)景
JStorm處理數(shù)據(jù)的方式是基于消息的流水線處理, 因此特別適合無狀態(tài)計(jì)算,也就是計(jì)算單元的依賴的數(shù)據(jù)全部在接受的消息中可以找到, 并且最好一個(gè)數(shù)據(jù)流不依賴另外一個(gè)數(shù)據(jù)流。
因此,常常用于:
- 日志分析,從日志中分析出特定的數(shù)據(jù),并將分析的結(jié)果存入外部存儲(chǔ)器如數(shù)據(jù)庫。目前,主流日志分析技術(shù)就使用JStorm或Storm
- 管道系統(tǒng), 將一個(gè)數(shù)據(jù)從一個(gè)系統(tǒng)傳輸?shù)搅硗庖粋€(gè)系統(tǒng), 比如將數(shù)據(jù)庫同步到Hadoop
- 消息轉(zhuǎn)化器, 將接受到的消息按照某種格式進(jìn)行轉(zhuǎn)化,存儲(chǔ)到另外一個(gè)系統(tǒng)如消息中間件
- 統(tǒng)計(jì)分析器, 從日志或消息中,提煉出某個(gè)字段,然后做count或sum計(jì)算,最后將統(tǒng)計(jì)值存入外部存儲(chǔ)器。中間處理過程可能更復(fù)雜。
- 實(shí)時(shí)推薦系統(tǒng), 將推薦算法運(yùn)行在jstorm中,達(dá)到秒級(jí)的推薦效果
3.2.4 基本概念
[站外圖片上傳中...(image-96ea41-1511406796108)]
- Spout (中文意為水龍頭)即數(shù)據(jù)的來源、出水口,來源可以是Kafka、DB、HBase、HDFS等。
- Bolt(中文意為插銷)即數(shù)據(jù)流向過程中的關(guān)鍵點(diǎn)、數(shù)據(jù)流處理點(diǎn)。
- Topology(中文意為拓?fù)浣Y(jié)構(gòu))即上述圖中所示的數(shù)據(jù)處理流程形成的數(shù)據(jù)流網(wǎng)絡(luò)結(jié)構(gòu)。
3.2.5 組件接口
- Spout組件接口:
nextTuple拉取下一條消息,執(zhí)行時(shí)JStorm框架回不停調(diào)用該接口從數(shù)據(jù)源拉取數(shù)據(jù)發(fā)往Bolt。 - Bolt組件接口:
execute執(zhí)行處理邏輯
3.2.6 調(diào)度和執(zhí)行
對(duì)于一個(gè)Topology,JStorm調(diào)度一個(gè)/多個(gè)Worker (每個(gè)Worker對(duì)應(yīng)操作系統(tǒng)的進(jìn)程),分布到集群的一臺(tái)或多臺(tái)機(jī)器上并行執(zhí)行。
在一個(gè)Worker (進(jìn)程) 中,分為多個(gè)Task (線程),每個(gè)線程對(duì)應(yīng)于Spout/Bolt的實(shí)現(xiàn)。
工作流程:
- 根據(jù)業(yè)務(wù)設(shè)計(jì)Topology
- 根據(jù)業(yè)務(wù)流程實(shí)現(xiàn)Spout的
nextTuple接口中的數(shù)據(jù)輸入- 根據(jù)業(yè)務(wù)細(xì)節(jié)實(shí)現(xiàn)Bolt的
execute接口中的處理邏輯- 提交Topology開始執(zhí)行
3.2.6.1 提交Topology時(shí)的參數(shù)
總Worker數(shù)目
if #worker <= 10 then
_topology_master 以Task形式存在,不獨(dú)占Worker
else
_topology_master 以Task形式存在,獨(dú)占Worker
end
每個(gè)component的并行度
并行度(parallelism) 代表有多少個(gè)Task線程來執(zhí)行這個(gè)Spout/Bolt。
同一個(gè)Component中的Task id一定是連續(xù)的。
每個(gè)Component之間的關(guān)系
聲明Spout和Bolt之間的對(duì)應(yīng)關(guān)系,JStorm使用均勻調(diào)度算法,奇偶不同數(shù)目的Spout/Bolt會(huì)存在某個(gè)進(jìn)程只有Spout或只有Bolt的情形。若topology運(yùn)行過程中掛掉,JStorm會(huì)不斷嘗試重啟進(jìn)程。
3.2.7 消息通信
Spout發(fā)消息
-
JStorm 計(jì)算消息目標(biāo) Task Id列表
if Task_id 在本進(jìn)程 then 直接將消息放入目標(biāo)Task執(zhí)行隊(duì)列 else netty跨進(jìn)程發(fā)送至目標(biāo)Task中 end
3.2.8 實(shí)時(shí)計(jì)算結(jié)果輸出
JStorm的Spout或Bolt中會(huì)有一個(gè)定時(shí)往外部存儲(chǔ)寫計(jì)算結(jié)果的邏輯,將數(shù)據(jù)按照業(yè)務(wù)需求被實(shí)時(shí)或近似實(shí)時(shí)地輸出。
3.2.9 小結(jié)
JStorm是阿里巴巴平臺(tái)的產(chǎn)品,相對(duì)來說適用于大量數(shù)據(jù)集群的情況,目前我現(xiàn)有的資源很難使用。因此,選擇Python系的streamparser進(jìn)行閱讀。
3.3 折騰Storm平臺(tái)部署
3.3.1 部署storm平臺(tái)
下載[Java 8/9][1]、maven、zookeeper、storm、[lein][2]的release并依次安裝。(以上庫除lein外為storm運(yùn)行所必須,由于服務(wù)器在國外,下載時(shí)間較長(zhǎng))
-
將 JDK、maven、zookeeper、storm 等拷貝至
/opt目錄下,在~/.bash_profile中將相應(yīng)目錄加入PATH:export JAVA_HOME="/opt/jdk8" export MAVEN_HOME="/opt/maven" export ZOO_KEEPER_HOME="/opt/zookeeper" export STORM_HOME="/opt/storm" PATH=$STORM_HOME/bin:$ZOO_KEEPER_HOME/bin:$MAVEN_HOME/bin:JAVA_HOME/bin:$PATH export PATH?- 進(jìn)入
/opt/zookeeper/conf目錄,編輯zoo.cfg配置文件,如下:tickTime=2000 initLimit=10 syncLimit=5 dataDir=/var/zookeeper # 注意需要對(duì)該目錄有寫權(quán)限 clientPort=2181 -
進(jìn)入
/opt/storm/conf目錄,編輯storm.yaml配置文件,如下:storm.zookeeper.servers: # 注意此處有空格 - "10.211.55.37" # 填入配置機(jī)器的IP,若為集群則在下一行以同樣格式列出 # - "other server ip" # 此處為Nimbus服務(wù)器地址,單機(jī)運(yùn)行時(shí)無效,系統(tǒng)自動(dòng)使用本地hostname,[原因待求證] # nimbus.seeds:["host1","host2","host3"] storm.local.dir: "/var/storm" # 需要保證該目錄有寫權(quán)限,此處使用root賬戶所以不考慮。 # 設(shè)置supervisor slots supervisor.slots.ports: # 注意此處有空格 - 6700 - 6701 - 6702 - 6702 # 此處在storm 1.1.1版的配置模板文件中未提及,但配置后在集群中能看到,[原因待求證]?
-
啟動(dòng)zookeeper集群
bin/zkServer.sh start
?
-
在[Master服務(wù)器][3]上啟動(dòng)storm nimbus服務(wù)
bin/storm nimbus >> /dev/null & -
在[Worker服務(wù)器][3]上啟動(dòng)storm supervisor服務(wù)
bin/storm supervisor >> /dev/null & -
在[Master服務(wù)器][3]上啟動(dòng)storm UI工具
bin/storm ui & 在[Master服務(wù)器][3]上采用
jps查看服務(wù)的啟動(dòng)情況,若顯示config_value則表示服務(wù)正在初始化;若顯示nimbus、supervisor、core、jps、QuorumPeerMain則說明初始化完畢,打開瀏覽器輸入http://server_host:8080即可進(jìn)入Storm UI查看相關(guān)信息。
[1] Java8/9 推薦安裝Oracle官網(wǎng)下載的完整版JDK,因?yàn)楹罄m(xù)的 lein 需要完整的JDK。解壓JDK之后配置系統(tǒng)變量即可。(本次Linux機(jī)器采用 Java 8)
[2] lein全稱為leiningen,是自動(dòng)化管理Clojure腳本的工具,類似于Cargo。lein目前的腳本下載會(huì)出現(xiàn)證書不匹配的問題,解決方案為export HTTP_CLIENT="wget --no-check-certificate -O"。而且,上述設(shè)置后,下載release依舊很慢、需要代理,可以直接wget下載對(duì)應(yīng)的release,放到~/.lein/self-installs/leiningen-2.5.3-standalone.jar即可,參考這里。lein是一個(gè)可執(zhí)行腳本,需要放到/usr/bin或者/usr/local/bin下面,然后命令行中運(yùn)行./lein和lein repl完成安裝。
[3] 本地測(cè)試則僅僅在本機(jī)即可
3.3.2 案例工程 WordCount
主要參照《Get Started with Storm》一書,網(wǎng)上有中文版,此處參照為英文原版。
3.3.2.1 前提準(zhǔn)備
-
maven編譯工具,建立pom.xml來聲明該工程的編譯結(jié)構(gòu),包括注明編譯需要的maven版本、編譯所需的storm依賴庫在線地址、以及依賴的storm版本。<repositories> <!-- Repository where we can found the storm dependencies --> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories>
3.3.2.2 編寫對(duì)應(yīng)代碼文件
1. 建立文件結(jié)構(gòu)
建立對(duì)應(yīng)的文件結(jié)構(gòu)src/main/java/{spouts,bolts}、/src/main/resources等,其中resources文件夾要存放相應(yīng)的資源文件。
2. 編寫spouts實(shí)例
package spouts;
import ....;
public class WordReader implements IRichSpout {
private .....;
public void ack(Object msgId) {...;}
public void fail(Object msgId) {...;}
public void nextTuple() {...;}
// first method called in ANY spout
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {...;}
public void close() {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {...;}
}
3. 編寫bolts實(shí)例
package bolts;
import ...;
public class WordNormalizer implements IRichBolt {
private ...;
public void cleanup(){}
public void execute(Tuple input) {...;}
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {...;}
public void declareOutputFields(OutputFieldsDeclarer declarer) {...;}
}
4. 編寫topology結(jié)構(gòu)
import ...;
public class TopologyMain {
public static void main(String[] args) throws InterruptedException {
// Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout();
builder.setBolt().shuffleGrouping();
builder.setBolt().fieldGrouping();
// Configuration
Config conf = new Config();
conf.put("xxx", args[0]);
conf.setDebug(false);
// Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("xxxx",conf, builder.createTopology());
Thread.sleep(1000); // sleep to reduce server load
cluster.shutdown();
}
}
5. 使用mvm帶好相應(yīng)參數(shù)運(yùn)行
mvn clean install # maven會(huì)自動(dòng)下載相關(guān)的包
cd target # 注意目錄下有 `pom.xml` 中標(biāo)識(shí)的輸出的jar包
storm jar output-jar.jar path.to.your.topology # LocalCluster 執(zhí)行,然后關(guān)閉
storm jar output-jar.jar path.to.your.topology name-of-storm # 提交jar至storm集群,循環(huán)執(zhí)行,可在UI中查看
3.4 運(yùn)行Storm例子程序的問題記錄
3.4.1 存在問題以及解答記錄
1. 程序中的collector是指的什么?
collector是用來追蹤處理邏輯上每個(gè)emit的數(shù)據(jù)是否在下游bolt中被成功處理。collector是與storm通信的工具,反饋每個(gè)任務(wù)的處理情況。
2. 程序中collector最后emit的Value(…)是什么結(jié)構(gòu)?
官方文檔解釋:A convenience class for making tuple values using new Values("field1", 2, 3) syntax.
Value是構(gòu)建Tuple的一個(gè)元組類,該類實(shí)現(xiàn)了Serializable, Cloneable, Iterable<Object>, Collection<Object>, List<Object>, RandomAccess等接口,與Bolt中的execute接口相對(duì)應(yīng):public void execute(Tuple input, BasicOutputCollector collector)
3. Storm中的Ack/Fail機(jī)制中對(duì)fail情形的處理?
為了保證數(shù)據(jù)能正確的被處理, 對(duì)于spout產(chǎn)生的每一個(gè)tuple, storm都會(huì)進(jìn)行跟蹤。
這里面涉及到ack/fail的處理,如果一個(gè)tuple處理成功是指這個(gè)Tuple以及這個(gè)Tuple產(chǎn)生的所有Tuple都被成功處理, 會(huì)調(diào)用spout的ack方法;
如果失敗是指這個(gè)Tuple或這個(gè)Tuple產(chǎn)生的所有Tuple中的某一個(gè)tuple處理失敗, 則會(huì)調(diào)用spout的fail方法;
在處理tuple的每一個(gè)bolt都會(huì)通過OutputCollector來告知storm, 當(dāng)前bolt處理是否成功。
另外需要注意的,當(dāng)spout觸發(fā)fail動(dòng)作時(shí),不會(huì)自動(dòng)重發(fā)失敗的tuple,需要我們?cè)趕pout中重新獲取發(fā)送失敗數(shù)據(jù),手動(dòng)重新再發(fā)送一次。
4. Storm中的Ack原理
Storm中有個(gè)特殊的task名叫acker,他們負(fù)責(zé)跟蹤spout發(fā)出的每一個(gè)Tuple的Tuple樹(因?yàn)橐粋€(gè)tuple通過spout發(fā)出了,經(jīng)過每一個(gè)bolt處理后,會(huì)生成一個(gè)新的tuple發(fā)送出去)。當(dāng)acker(框架自啟動(dòng)的task)發(fā)現(xiàn)一個(gè)Tuple樹已經(jīng)處理完成了,它會(huì)發(fā)送一個(gè)消息給產(chǎn)生這個(gè)Tuple的那個(gè)task。
Acker的跟蹤算法是Storm的主要突破之一,對(duì)任意大的一個(gè)Tuple樹,它只需要恒定的20字節(jié)就可以進(jìn)行跟蹤。
Acker跟蹤算法的原理:acker對(duì)于每個(gè)spout-tuple保存一個(gè)ack-val的校驗(yàn)值,它的初始值是0,然后每發(fā)射一個(gè)Tuple或Ack一個(gè)Tuple時(shí),這個(gè)Tuple的id就要跟這個(gè)校驗(yàn)值異或一下,并且把得到的值更新為ack-val的新值。那么假設(shè)每個(gè)發(fā)射出去的Tuple都被ack了,那么最后ack-val的值就一定是0。Acker就根據(jù)ack-val是否為0來判斷是否完全處理,如果為0則認(rèn)為已完全處理。
要實(shí)現(xiàn)ack機(jī)制:
- spout發(fā)射tuple的時(shí)候指定messageId
- spout要重寫B(tài)aseRichSpout的fail和ack方法
- spout對(duì)發(fā)射的tuple進(jìn)行緩存(否則spout的fail方法收到acker發(fā)來的messsageId,spout也無法獲取到發(fā)送失敗的數(shù)據(jù)進(jìn)行重發(fā)),看看系統(tǒng)提供的接口,只有msgId這個(gè)參數(shù),這里的設(shè)計(jì)不合理,其實(shí)在系統(tǒng)里是有cache整個(gè)msg的,只給用戶一個(gè)messageid,用戶如何取得原來的msg貌似需要自己cache,然后用這個(gè)msgId去查詢,太坑爹了
- spout根據(jù)messageId對(duì)于ack的tuple則從緩存隊(duì)列中刪除,對(duì)于fail的tuple可以選擇重發(fā)。
- 設(shè)置acker數(shù)至少大于0;Config.setNumAckers(conf, ackerParal);
Storm的Bolt有BasicBolt和RichBolt:
在BasicBolt中,BasicOutputCollector在emit數(shù)據(jù)的時(shí)候,會(huì)自動(dòng)和輸入的tuple相關(guān)聯(lián),而在execute方法結(jié)束的時(shí)候那個(gè)輸入tuple會(huì)被自動(dòng)ack。
使用RichBolt需要在emit數(shù)據(jù)的時(shí)候,顯式指定該數(shù)據(jù)的源tuple要加上第二個(gè)參數(shù)anchor tuple,以保持tracker鏈路,即collector.emit(oldTuple, newTuple);并且需要在execute執(zhí)行成功后調(diào)用OutputCollector.ack(tuple),當(dāng)失敗處理時(shí),執(zhí)行OutputCollector.fail(tuple)。
由一個(gè)tuple產(chǎn)生一個(gè)新的tuple稱為:anchoring,你發(fā)射一個(gè)tuple的同時(shí)也就完成了一次anchoring。
ack機(jī)制即,spout發(fā)送的每一條消息,在規(guī)定的時(shí)間內(nèi),spout收到Acker的ack響應(yīng),即認(rèn)為該tuple被后續(xù)bolt成功處理;在規(guī)定的時(shí)間內(nèi)(默認(rèn)是30秒),沒有收到Acker的ack響應(yīng)tuple,就觸發(fā)fail動(dòng)作,即認(rèn)為該tuple處理失敗,timeout時(shí)間可以通過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來設(shè)定;或者收到Acker發(fā)送的fail響應(yīng)tuple,也認(rèn)為失敗,觸發(fā)fail動(dòng)作
注意,如果繼承BaseBasicBolt那么程序拋出異常,程序直接異常停止了,不會(huì)讓spout進(jìn)行重發(fā)。
5. Fail注意點(diǎn)小結(jié)
- 若某個(gè)task節(jié)點(diǎn)處理的tuple一直失敗,會(huì)導(dǎo)致spout節(jié)點(diǎn)存儲(chǔ)的tuple數(shù)據(jù)越來越多,直至內(nèi)存溢出
- 在某個(gè)tuple的眾多子tuple中,若某一個(gè)子tuple處理fail,但是其他子tuple仍會(huì)執(zhí)行。即當(dāng)所有子tuple都執(zhí)行數(shù)據(jù)存儲(chǔ)操作,其中一個(gè)子tuple出現(xiàn)fail,即使整個(gè)處理是fail,但是成功的子tuple仍會(huì)執(zhí)行而不會(huì)滾。
- Tuple的追蹤只要是spout開始,可以在任意層次bolt停止追蹤并作出應(yīng)答。
acker的數(shù)量可以通過Ackertask組件來設(shè)置。 - 一個(gè)Topology并不需要太多
acker,除非storm吞吐量不正常。 - 若不需要保證可靠性,即不追蹤tuple樹的執(zhí)行情況,則系統(tǒng)里的消息數(shù)量會(huì)減少一半。
- 關(guān)閉消息可靠性的三種方法:
config.Topology_ACKERS=0- Spout發(fā)送消息時(shí)不指定消息的
msgId - 在
emit方法中不指定輸入消息
6. Anchoring 錨定概念
拓?fù)涫且粋€(gè)消息(Tuple)沿著一個(gè)或多個(gè)分支的樹節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)將ack(tuple)或者fail(tuple),這樣當(dāng)消息失敗時(shí)Storm就會(huì)知道,并通知Spout重發(fā)消息。因?yàn)橐粋€(gè)Storm拓?fù)溥\(yùn)行在一個(gè)高度并發(fā)的環(huán)境中,跟蹤原始Spout示例的最好辦法是在消息Tuple中包含一個(gè)原始Spout的引用,這種行為(技術(shù))被稱為Anchoring(錨定)。
錨點(diǎn)發(fā)生的語句在collector.emit(tuple, new Values(word))中,傳遞元組(emit方法)使Storm能夠跟蹤原始Spout。collector.ack(tuple)和collector.fail(tuple)告訴Spout知道每個(gè)消息的處理結(jié)果。當(dāng)消息樹上的每個(gè)消息已經(jīng)被處理,Storm認(rèn)為來自Spout的元組被完全處理。當(dāng)消息樹在一個(gè)可配置的超時(shí)內(nèi)處理失敗,一個(gè)元組被認(rèn)為是失敗的。處理的每一個(gè)元組必須是確認(rèn)或者失敗,Storm會(huì)使用內(nèi)存來追蹤每個(gè)元組,如果不對(duì)每個(gè)元組進(jìn)行確認(rèn)/失敗,最終會(huì)耗盡內(nèi)存。
為了簡(jiǎn)化編碼,Storm為Bolt提供了一個(gè)IBasicBolt接口,它會(huì)在調(diào)用execute()方法之后正確調(diào)用ack()方法,BaseBasicBolt類是該接口的一個(gè)實(shí)現(xiàn),用來實(shí)現(xiàn)自動(dòng)確認(rèn)。
7. Storm組件與編程時(shí)遇到的概念
| 名稱 | 解釋 |
|---|---|
| Nimbus | 負(fù)責(zé)資源分配和任務(wù)調(diào)度,Nimbus分配任務(wù)到Zookeeper指定目錄。 |
| Supervisor | 去Zookeeper指定目錄接受Nimbusf分配的任務(wù),啟停自己的Worker進(jìn)程。 |
| Worker | 運(yùn)行具體處理組件邏輯的進(jìn)程(process),Worker的任務(wù)分為Spout和Bolt兩種。 |
| Task | Worker啟動(dòng)相應(yīng)的物理線程(Executor),Worker執(zhí)行的每一個(gè)Spout/Bolt線程成為一個(gè)Task,0.8版本后Spout/Bolt的Task可能共享一個(gè)Executor。 |
| Topology | 拓?fù)?,Storm集群,即定義的數(shù)據(jù)流處理的DAG。 |
| Spout | Storm集群的數(shù)據(jù)源 |
| Bolt | Storm任務(wù)的處理邏輯單元,在集群多個(gè)機(jī)器上并發(fā)執(zhí)行。 |
| Tuple | 消息元組,Spout、Bolt用來與Storm集群通信、反饋任務(wù)處理成功與否的載體。恒定為20Bit。 |
| Stream groupings | 數(shù)據(jù)流的分組策略,分7種,常見為shuffleGrouping()、fieldsGrouping()。 |
| Executor | Worker啟動(dòng)的實(shí)際物理線程,一般一個(gè)Executor執(zhí)行一個(gè)Task,但也能執(zhí)行多個(gè)Task。 |
| Configuration | Topology的配置 |
8. 序列化與反序列化
由于博客上特別提到了Java虛擬機(jī)序列化的性能極其辣雞,所以在此記錄。
把對(duì)象轉(zhuǎn)換為字節(jié)序列的過程稱為對(duì)象的序列化;把字節(jié)序列恢復(fù)為對(duì)象的過程稱為對(duì)象的反序列化。
對(duì)象的序列化主要有兩種用途:
把對(duì)象的字節(jié)序列永久地保存到硬盤上,通常存放在一個(gè)文件中;
在網(wǎng)絡(luò)上傳送對(duì)象的字節(jié)序列。
在很多應(yīng)用中,需要對(duì)某些對(duì)象進(jìn)行序列化,讓它們離開內(nèi)存空間,入住物理硬盤,以便長(zhǎng)期保存。比如最常見的是Web服務(wù)器中的Session對(duì)象,當(dāng)有 10萬用戶并發(fā)訪問,就有可能出現(xiàn)10萬個(gè)Session對(duì)象,內(nèi)存可能吃不消,于是Web容器就會(huì)把一些seesion先序列化到硬盤中,等要用了,再把保存在硬盤中的對(duì)象還原到內(nèi)存中。
當(dāng)兩個(gè)進(jìn)程在進(jìn)行遠(yuǎn)程通信時(shí),彼此可以發(fā)送各種類型的數(shù)據(jù)。無論是何種類型的數(shù)據(jù),都會(huì)以二進(jìn)制序列的形式在網(wǎng)絡(luò)上傳送。發(fā)送方需要把這個(gè)Java對(duì)象轉(zhuǎn)換為字節(jié)序列,才能在網(wǎng)絡(luò)上傳送;接收方則需要把字節(jié)序列再恢復(fù)為Java對(duì)象。
在Java/Storm中,可以理解為toString()函數(shù)的自定義實(shí)現(xiàn)。注意使用transient 修飾的對(duì)象無法序列化。
9. declareOutputFields() 函數(shù)的具體作用
該Spout代碼里面最核心的部分有兩個(gè):
- 用
collector.emit()方法發(fā)射tuple。我們不用自己實(shí)現(xiàn)tuple,我們只需要定義tuple的value,Storm會(huì)幫我們生成tuple。Values對(duì)象接受變長(zhǎng)參數(shù)。Tuple中以List存放Values,List的Index按照new Values(obj1, obj2,…)的參數(shù)的index,例如我們emit(new Values("v1", "v2")), 那么Tuple的屬性即為:{ [ "v1" ], [ "V2" ] } -
declarer.declare方法用來給我們發(fā)射的value在整個(gè)Stream中定義一個(gè)別名??梢岳斫鉃?code>key。該值必須在整個(gè)topology定義中唯一。
3.5 Windows 平臺(tái)部署本地測(cè)試環(huán)境的注意事項(xiàng)
3.5.1 所需安裝包
- Java SE Development Kit 8/9,安裝到非
C:\Program Files\目錄下,否則storm將無法啟動(dòng)。 - Apache-maven,解壓到本地目錄,推薦非系統(tǒng)盤
- Zookeeper,解壓到本地目錄,推薦非系統(tǒng)盤
- Storm,解壓到本地目錄,推薦非系統(tǒng)盤
3.5.2 環(huán)境變量配置
-
JAVA_HOME:path\to\your\jdk-file -
Path:path\to\your-storm\bin;path\to\your-zookeeper\bin;path\to\your-maven\bin;%JAVA_HOME%\bin
3.5.3 配置文件設(shè)定
- 配置
zoo.cfg,參照3.1 - 配置
storm.yaml,參照3.1,注意storm.local.dirs中的目錄使用\\來表示\。
3.5.6 啟動(dòng)集群
zkServer
storm nimbus
storm supervisor
storm ui
# 打開瀏覽器 http://127.0.0.1:8080/index.html