Storm 折騰雜記

Date: Nov 17-24, 2017

1. 目的

  • 積累Storm為主的流式大數(shù)據(jù)處理平臺(tái)對(duì)實(shí)時(shí)數(shù)據(jù)處理的相關(guān)技術(shù)
  • 積累快捷的Storm部署、開發(fā)方式,例如Python和Java。

2. 閱讀資料

  1. Apache Storm官網(wǎng)Tutorial
  2. 阿里巴巴JStorm文檔
  3. intsmaze's blog
  4. Java 基礎(chǔ) Serializable 的使用
  5. Java 高級(jí) Serializable 序列化的源碼分析
  6. ITindex Storm 系列

3. 閱讀筆記

3.1 Apache Storm官網(wǎng)

3.1.1 Storm主要結(jié)構(gòu)概覽

Storm主要結(jié)構(gòu)

如上圖所示,Storm是一個(gè)流數(shù)據(jù)處理平臺(tái)。它與Hadoop相近,采用Map-Reduce的計(jì)算框架,區(qū)別在于Hadoop的worker在完成工作后被釋放,而Storm的worker在完成工作后進(jìn)入等待狀態(tài)——等待“上級(jí)”分配下一個(gè)任務(wù)。

Storm的本質(zhì)是定義一個(gè)計(jì)算的過程,類似于設(shè)計(jì)中的數(shù)據(jù)流圖,即先定義數(shù)據(jù)處理的流程,再分模塊實(shí)現(xiàn)數(shù)據(jù)處理的細(xì)節(jié),結(jié)果由末端的節(jié)點(diǎn)返回或輸出。

Storm的核心是Clojure編寫、提供Java開發(fā)接口,核心離工業(yè)解主流編程語言(Java、C/C++)相對(duì)遙遠(yuǎn),阿里巴巴的工程師團(tuán)隊(duì)用Java重寫了Storm的核心,即為JStorm。

3.2 阿里巴巴JStorm

3.2.1 JStorm定位

JStorm 是一個(gè)分布式實(shí)時(shí)計(jì)算引擎。

JStorm 是一個(gè)類似Hadoop MapReduce的系統(tǒng), 用戶按照指定的接口實(shí)現(xiàn)一個(gè)任務(wù),然后將這個(gè)任務(wù)遞交給JStorm系統(tǒng),JStorm將這個(gè)任務(wù)跑起來,并且按7 * 24小時(shí)運(yùn)行起來,一旦中間一個(gè)Worker 發(fā)生意外故障, 調(diào)度器立即分配一個(gè)新的Worker替換這個(gè)失效的Worker。

因此,從應(yīng)用的角度,JStorm應(yīng)用是一種遵守某種編程規(guī)范的分布式應(yīng)用。從系統(tǒng)角度, JStorm是一套類似MapReduce的調(diào)度系統(tǒng)。 從數(shù)據(jù)的角度,JStorm是一套基于流水線的消息處理機(jī)制。

JStorm Hadoop
角色 Nimubs JobTracker
Supervisor TaskTracker
Worker Child
應(yīng)用名稱 Topology Job
編程接口 Spout/Bolt Mapper/Reducer
“設(shè)計(jì)模式” 資本主義 恐怖主義

3.2.2 優(yōu)點(diǎn)

在Storm和JStorm出現(xiàn)以前,市面上出現(xiàn)很多實(shí)時(shí)計(jì)算引擎,但自Storm和JStorm出現(xiàn)后,基本上可以說一統(tǒng)江湖: 究其優(yōu)點(diǎn):

  • 開發(fā)非常迅速:接口簡(jiǎn)單,容易上手,只要遵守Topology、Spout和Bolt的編程規(guī)范即可開發(fā)出一個(gè)擴(kuò)展性極好的應(yīng)用,底層RPC、Worker之間冗余,數(shù)據(jù)分流之類的動(dòng)作完全不用考慮
  • 擴(kuò)展性極好:當(dāng)一級(jí)處理單元速度,直接配置一下并發(fā)數(shù),即可線性擴(kuò)展性能
  • 健壯強(qiáng):當(dāng)Worker失效或機(jī)器出現(xiàn)故障時(shí), 自動(dòng)分配新的Worker替換失效Worker
  • 數(shù)據(jù)準(zhǔn)確性:可以采用Ack機(jī)制,保證數(shù)據(jù)不丟失。 如果對(duì)精度有更多一步要求,采用事務(wù)機(jī)制,保證數(shù)據(jù)準(zhǔn)確。
  • 實(shí)時(shí)性高: JStorm 的設(shè)計(jì)偏向單行記錄,因此,在時(shí)延較同類產(chǎn)品更低

3.2.3 應(yīng)用場(chǎng)景

JStorm處理數(shù)據(jù)的方式是基于消息的流水線處理, 因此特別適合無狀態(tài)計(jì)算,也就是計(jì)算單元的依賴的數(shù)據(jù)全部在接受的消息中可以找到, 并且最好一個(gè)數(shù)據(jù)流不依賴另外一個(gè)數(shù)據(jù)流。

因此,常常用于:

  • 日志分析,從日志中分析出特定的數(shù)據(jù),并將分析的結(jié)果存入外部存儲(chǔ)器如數(shù)據(jù)庫。目前,主流日志分析技術(shù)就使用JStorm或Storm
  • 管道系統(tǒng), 將一個(gè)數(shù)據(jù)從一個(gè)系統(tǒng)傳輸?shù)搅硗庖粋€(gè)系統(tǒng), 比如將數(shù)據(jù)庫同步到Hadoop
  • 消息轉(zhuǎn)化器, 將接受到的消息按照某種格式進(jìn)行轉(zhuǎn)化,存儲(chǔ)到另外一個(gè)系統(tǒng)如消息中間件
  • 統(tǒng)計(jì)分析器, 從日志或消息中,提煉出某個(gè)字段,然后做count或sum計(jì)算,最后將統(tǒng)計(jì)值存入外部存儲(chǔ)器。中間處理過程可能更復(fù)雜。
  • 實(shí)時(shí)推薦系統(tǒng), 將推薦算法運(yùn)行在jstorm中,達(dá)到秒級(jí)的推薦效果

3.2.4 基本概念

[站外圖片上傳中...(image-96ea41-1511406796108)]

  • Spout (中文意為水龍頭)即數(shù)據(jù)的來源、出水口,來源可以是Kafka、DB、HBase、HDFS等。
  • Bolt(中文意為插銷)即數(shù)據(jù)流向過程中的關(guān)鍵點(diǎn)、數(shù)據(jù)流處理點(diǎn)。
  • Topology(中文意為拓?fù)浣Y(jié)構(gòu))即上述圖中所示的數(shù)據(jù)處理流程形成的數(shù)據(jù)流網(wǎng)絡(luò)結(jié)構(gòu)。

3.2.5 組件接口

  • Spout組件接口:nextTuple 拉取下一條消息,執(zhí)行時(shí)JStorm框架回不停調(diào)用該接口從數(shù)據(jù)源拉取數(shù)據(jù)發(fā)往Bolt。
  • Bolt組件接口:execute 執(zhí)行處理邏輯

3.2.6 調(diào)度和執(zhí)行

對(duì)于一個(gè)Topology,JStorm調(diào)度一個(gè)/多個(gè)Worker (每個(gè)Worker對(duì)應(yīng)操作系統(tǒng)的進(jìn)程),分布到集群的一臺(tái)或多臺(tái)機(jī)器上并行執(zhí)行。

在一個(gè)Worker (進(jìn)程) 中,分為多個(gè)Task (線程),每個(gè)線程對(duì)應(yīng)于Spout/Bolt的實(shí)現(xiàn)。

工作流程

  1. 根據(jù)業(yè)務(wù)設(shè)計(jì)Topology
  2. 根據(jù)業(yè)務(wù)流程實(shí)現(xiàn)Spout的 nextTuple 接口中的數(shù)據(jù)輸入
  3. 根據(jù)業(yè)務(wù)細(xì)節(jié)實(shí)現(xiàn)Bolt的 execute 接口中的處理邏輯
  4. 提交Topology開始執(zhí)行
3.2.6.1 提交Topology時(shí)的參數(shù)
總Worker數(shù)目
if #worker <= 10 then
    _topology_master 以Task形式存在,不獨(dú)占Worker
else
    _topology_master 以Task形式存在,獨(dú)占Worker
end
每個(gè)component的并行度

并行度(parallelism) 代表有多少個(gè)Task線程來執(zhí)行這個(gè)Spout/Bolt。

同一個(gè)Component中的Task id一定是連續(xù)的。

每個(gè)Component之間的關(guān)系

聲明Spout和Bolt之間的對(duì)應(yīng)關(guān)系,JStorm使用均勻調(diào)度算法,奇偶不同數(shù)目的Spout/Bolt會(huì)存在某個(gè)進(jìn)程只有Spout或只有Bolt的情形。若topology運(yùn)行過程中掛掉,JStorm會(huì)不斷嘗試重啟進(jìn)程。

3.2.7 消息通信

  1. Spout發(fā)消息

  2. JStorm 計(jì)算消息目標(biāo) Task Id列表

       if Task_id 在本進(jìn)程 then
         直接將消息放入目標(biāo)Task執(zhí)行隊(duì)列
       else
         netty跨進(jìn)程發(fā)送至目標(biāo)Task中
       end
    

3.2.8 實(shí)時(shí)計(jì)算結(jié)果輸出

JStorm的Spout或Bolt中會(huì)有一個(gè)定時(shí)往外部存儲(chǔ)寫計(jì)算結(jié)果的邏輯,將數(shù)據(jù)按照業(yè)務(wù)需求被實(shí)時(shí)或近似實(shí)時(shí)地輸出。

3.2.9 小結(jié)

JStorm是阿里巴巴平臺(tái)的產(chǎn)品,相對(duì)來說適用于大量數(shù)據(jù)集群的情況,目前我現(xiàn)有的資源很難使用。因此,選擇Python系的streamparser進(jìn)行閱讀。

3.3 折騰Storm平臺(tái)部署

3.3.1 部署storm平臺(tái)

  • 下載[Java 8/9][1]、maven、zookeeper、storm、[lein][2]的release并依次安裝。(以上庫除lein外為storm運(yùn)行所必須,由于服務(wù)器在國外,下載時(shí)間較長(zhǎng))

  • 將 JDK、maven、zookeeper、storm 等拷貝至/opt 目錄下,在~/.bash_profile中將相應(yīng)目錄加入PATH:

    export JAVA_HOME="/opt/jdk8"
    export MAVEN_HOME="/opt/maven"
    export ZOO_KEEPER_HOME="/opt/zookeeper"
    export STORM_HOME="/opt/storm"
    PATH=$STORM_HOME/bin:$ZOO_KEEPER_HOME/bin:$MAVEN_HOME/bin:JAVA_HOME/bin:$PATH
    export PATH
    

    ?- 進(jìn)入/opt/zookeeper/conf目錄,編輯zoo.cfg配置文件,如下:

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/var/zookeeper # 注意需要對(duì)該目錄有寫權(quán)限
    clientPort=2181
    
  • 進(jìn)入/opt/storm/conf目錄,編輯storm.yaml配置文件,如下:

    storm.zookeeper.servers: # 注意此處有空格
      - "10.211.55.37"  # 填入配置機(jī)器的IP,若為集群則在下一行以同樣格式列出
      # - "other server ip"
      
    # 此處為Nimbus服務(wù)器地址,單機(jī)運(yùn)行時(shí)無效,系統(tǒng)自動(dòng)使用本地hostname,[原因待求證]
    # nimbus.seeds:["host1","host2","host3"] 
    
    storm.local.dir: "/var/storm" # 需要保證該目錄有寫權(quán)限,此處使用root賬戶所以不考慮。
    
    # 設(shè)置supervisor slots
    supervisor.slots.ports: # 注意此處有空格
      - 6700
      - 6701
      - 6702
      - 6702
    # 此處在storm 1.1.1版的配置模板文件中未提及,但配置后在集群中能看到,[原因待求證]
    

    ?

  • 啟動(dòng)zookeeper集群

    bin/zkServer.sh start
    

?

  • 在[Master服務(wù)器][3]上啟動(dòng)storm nimbus服務(wù)

    bin/storm nimbus >> /dev/null &
    
  • 在[Worker服務(wù)器][3]上啟動(dòng)storm supervisor服務(wù)

    bin/storm supervisor >> /dev/null &
    
  • 在[Master服務(wù)器][3]上啟動(dòng)storm UI工具

    bin/storm ui &
    
  • 在[Master服務(wù)器][3]上采用jps查看服務(wù)的啟動(dòng)情況,若顯示config_value則表示服務(wù)正在初始化;若顯示nimbussupervisor、core、jps、QuorumPeerMain則說明初始化完畢,打開瀏覽器輸入http://server_host:8080即可進(jìn)入Storm UI查看相關(guān)信息。

[1] Java8/9 推薦安裝Oracle官網(wǎng)下載的完整版JDK,因?yàn)楹罄m(xù)的 lein 需要完整的JDK。解壓JDK之后配置系統(tǒng)變量即可。(本次Linux機(jī)器采用 Java 8)

[2] lein全稱為leiningen,是自動(dòng)化管理Clojure腳本的工具,類似于Cargo。lein目前的腳本下載會(huì)出現(xiàn)證書不匹配的問題,解決方案為export HTTP_CLIENT="wget --no-check-certificate -O"。而且,上述設(shè)置后,下載release依舊很慢、需要代理,可以直接wget下載對(duì)應(yīng)的release,放到~/.lein/self-installs/leiningen-2.5.3-standalone.jar即可,參考這里lein是一個(gè)可執(zhí)行腳本,需要放到/usr/bin或者/usr/local/bin下面,然后命令行中運(yùn)行./leinlein repl完成安裝。

[3] 本地測(cè)試則僅僅在本機(jī)即可

3.3.2 案例工程 WordCount

主要參照《Get Started with Storm》一書,網(wǎng)上有中文版,此處參照為英文原版。

3.3.2.1 前提準(zhǔn)備
  1. maven編譯工具,建立pom.xml來聲明該工程的編譯結(jié)構(gòu),包括注明編譯需要的maven版本、編譯所需的storm依賴庫在線地址、以及依賴的storm版本。

      <repositories>
            <!-- Repository where we can found the storm dependencies  -->
            <repository>
                <id>clojars.org</id>
                <url>http://clojars.org/repo</url>
            </repository>
      </repositories>
    
3.3.2.2 編寫對(duì)應(yīng)代碼文件
1. 建立文件結(jié)構(gòu)

建立對(duì)應(yīng)的文件結(jié)構(gòu)src/main/java/{spouts,bolts}、/src/main/resources等,其中resources文件夾要存放相應(yīng)的資源文件。

2. 編寫spouts實(shí)例
package spouts;
import ....;

public class WordReader implements IRichSpout {
    private .....;
     public void ack(Object msgId) {...;}
     public void fail(Object msgId) {...;}
     public void nextTuple() {...;}
  // first method called in ANY spout    
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {...;}
     public void close() {}
     public void declareOutputFields(OutputFieldsDeclarer declarer) {...;}
}
3. 編寫bolts實(shí)例
package bolts;
import ...;

public class WordNormalizer implements IRichBolt {
    private ...;
     public void cleanup(){}
     public void execute(Tuple input) {...;}
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {...;}
     public void declareOutputFields(OutputFieldsDeclarer declarer) {...;}
}
4. 編寫topology結(jié)構(gòu)
import ...;

public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {
        // Topology definition
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout();
         builder.setBolt().shuffleGrouping();
         builder.setBolt().fieldGrouping();
         // Configuration
         Config conf = new Config();
         conf.put("xxx", args[0]);
         conf.setDebug(false);
         // Topology run
         conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
         LocalCluster cluster = new LocalCluster();
         cluster.submitTopology("xxxx",conf, builder.createTopology());
         Thread.sleep(1000); // sleep to reduce server load
         cluster.shutdown();
    }
}
5. 使用mvm帶好相應(yīng)參數(shù)運(yùn)行
mvn clean install # maven會(huì)自動(dòng)下載相關(guān)的包
cd target # 注意目錄下有 `pom.xml` 中標(biāo)識(shí)的輸出的jar包
storm jar output-jar.jar path.to.your.topology # LocalCluster 執(zhí)行,然后關(guān)閉
storm jar output-jar.jar path.to.your.topology name-of-storm # 提交jar至storm集群,循環(huán)執(zhí)行,可在UI中查看

3.4 運(yùn)行Storm例子程序的問題記錄

3.4.1 存在問題以及解答記錄

1. 程序中的collector是指的什么?

collector是用來追蹤處理邏輯上每個(gè)emit的數(shù)據(jù)是否在下游bolt中被成功處理。collector是與storm通信的工具,反饋每個(gè)任務(wù)的處理情況。

2. 程序中collector最后emitValue(…)是什么結(jié)構(gòu)?

官方文檔解釋:A convenience class for making tuple values using new Values("field1", 2, 3) syntax.

Value是構(gòu)建Tuple的一個(gè)元組類,該類實(shí)現(xiàn)了Serializable, Cloneable, Iterable<Object>, Collection<Object>, List<Object>, RandomAccess等接口,與Bolt中的execute接口相對(duì)應(yīng):public void execute(Tuple input, BasicOutputCollector collector)

3. Storm中的Ack/Fail機(jī)制中對(duì)fail情形的處理?

為了保證數(shù)據(jù)能正確的被處理, 對(duì)于spout產(chǎn)生的每一個(gè)tuple, storm都會(huì)進(jìn)行跟蹤。

這里面涉及到ack/fail的處理,如果一個(gè)tuple處理成功是指這個(gè)Tuple以及這個(gè)Tuple產(chǎn)生的所有Tuple都被成功處理, 會(huì)調(diào)用spout的ack方法;

如果失敗是指這個(gè)Tuple或這個(gè)Tuple產(chǎn)生的所有Tuple中的某一個(gè)tuple處理失敗, 則會(huì)調(diào)用spout的fail方法;

在處理tuple的每一個(gè)bolt都會(huì)通過OutputCollector來告知storm, 當(dāng)前bolt處理是否成功。

另外需要注意的,當(dāng)spout觸發(fā)fail動(dòng)作時(shí),不會(huì)自動(dòng)重發(fā)失敗的tuple,需要我們?cè)趕pout中重新獲取發(fā)送失敗數(shù)據(jù),手動(dòng)重新再發(fā)送一次。

4. Storm中的Ack原理

Storm中有個(gè)特殊的task名叫acker,他們負(fù)責(zé)跟蹤spout發(fā)出的每一個(gè)Tuple的Tuple樹(因?yàn)橐粋€(gè)tuple通過spout發(fā)出了,經(jīng)過每一個(gè)bolt處理后,會(huì)生成一個(gè)新的tuple發(fā)送出去)。當(dāng)acker(框架自啟動(dòng)的task)發(fā)現(xiàn)一個(gè)Tuple樹已經(jīng)處理完成了,它會(huì)發(fā)送一個(gè)消息給產(chǎn)生這個(gè)Tuple的那個(gè)task。
Acker的跟蹤算法是Storm的主要突破之一,對(duì)任意大的一個(gè)Tuple樹,它只需要恒定的20字節(jié)就可以進(jìn)行跟蹤。
Acker跟蹤算法的原理:acker對(duì)于每個(gè)spout-tuple保存一個(gè)ack-val的校驗(yàn)值,它的初始值是0,然后每發(fā)射一個(gè)Tuple或Ack一個(gè)Tuple時(shí),這個(gè)Tuple的id就要跟這個(gè)校驗(yàn)值異或一下,并且把得到的值更新為ack-val的新值。那么假設(shè)每個(gè)發(fā)射出去的Tuple都被ack了,那么最后ack-val的值就一定是0。Acker就根據(jù)ack-val是否為0來判斷是否完全處理,如果為0則認(rèn)為已完全處理。

要實(shí)現(xiàn)ack機(jī)制:

  • spout發(fā)射tuple的時(shí)候指定messageId
  • spout要重寫B(tài)aseRichSpout的fail和ack方法
  • spout對(duì)發(fā)射的tuple進(jìn)行緩存(否則spout的fail方法收到acker發(fā)來的messsageId,spout也無法獲取到發(fā)送失敗的數(shù)據(jù)進(jìn)行重發(fā)),看看系統(tǒng)提供的接口,只有msgId這個(gè)參數(shù),這里的設(shè)計(jì)不合理,其實(shí)在系統(tǒng)里是有cache整個(gè)msg的,只給用戶一個(gè)messageid,用戶如何取得原來的msg貌似需要自己cache,然后用這個(gè)msgId去查詢,太坑爹了
  • spout根據(jù)messageId對(duì)于ack的tuple則從緩存隊(duì)列中刪除,對(duì)于fail的tuple可以選擇重發(fā)。
  • 設(shè)置acker數(shù)至少大于0;Config.setNumAckers(conf, ackerParal);

Storm的Bolt有BasicBoltRichBolt:
  在BasicBolt中,BasicOutputCollector在emit數(shù)據(jù)的時(shí)候,會(huì)自動(dòng)和輸入的tuple相關(guān)聯(lián),而在execute方法結(jié)束的時(shí)候那個(gè)輸入tuple會(huì)被自動(dòng)ack
  使用RichBolt需要在emit數(shù)據(jù)的時(shí)候,顯式指定該數(shù)據(jù)的源tuple要加上第二個(gè)參數(shù)anchor tuple,以保持tracker鏈路,即collector.emit(oldTuple, newTuple);并且需要在execute執(zhí)行成功后調(diào)用OutputCollector.ack(tuple),當(dāng)失敗處理時(shí),執(zhí)行OutputCollector.fail(tuple)。

由一個(gè)tuple產(chǎn)生一個(gè)新的tuple稱為:anchoring,你發(fā)射一個(gè)tuple的同時(shí)也就完成了一次anchoring。

ack機(jī)制即,spout發(fā)送的每一條消息,在規(guī)定的時(shí)間內(nèi),spout收到Acker的ack響應(yīng),即認(rèn)為該tuple被后續(xù)bolt成功處理;在規(guī)定的時(shí)間內(nèi)(默認(rèn)是30秒),沒有收到Acker的ack響應(yīng)tuple,就觸發(fā)fail動(dòng)作,即認(rèn)為該tuple處理失敗,timeout時(shí)間可以通過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來設(shè)定;或者收到Acker發(fā)送的fail響應(yīng)tuple,也認(rèn)為失敗,觸發(fā)fail動(dòng)作

注意,如果繼承BaseBasicBolt那么程序拋出異常,程序直接異常停止了,不會(huì)讓spout進(jìn)行重發(fā)。

5. Fail注意點(diǎn)小結(jié)
  • 若某個(gè)task節(jié)點(diǎn)處理的tuple一直失敗,會(huì)導(dǎo)致spout節(jié)點(diǎn)存儲(chǔ)的tuple數(shù)據(jù)越來越多,直至內(nèi)存溢出
  • 在某個(gè)tuple的眾多子tuple中,若某一個(gè)子tuple處理fail,但是其他子tuple仍會(huì)執(zhí)行。即當(dāng)所有子tuple都執(zhí)行數(shù)據(jù)存儲(chǔ)操作,其中一個(gè)子tuple出現(xiàn)fail,即使整個(gè)處理是fail,但是成功的子tuple仍會(huì)執(zhí)行而不會(huì)滾。
  • Tuple的追蹤只要是spout開始,可以在任意層次bolt停止追蹤并作出應(yīng)答。acker的數(shù)量可以通過Acker task組件來設(shè)置。
  • 一個(gè)Topology并不需要太多acker,除非storm吞吐量不正常。
  • 若不需要保證可靠性,即不追蹤tuple樹的執(zhí)行情況,則系統(tǒng)里的消息數(shù)量會(huì)減少一半。
  • 關(guān)閉消息可靠性的三種方法:
    • config.Topology_ACKERS=0
    • Spout發(fā)送消息時(shí)不指定消息的msgId
    • emit方法中不指定輸入消息
6. Anchoring 錨定概念

拓?fù)涫且粋€(gè)消息(Tuple)沿著一個(gè)或多個(gè)分支的樹節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)將ack(tuple)或者fail(tuple),這樣當(dāng)消息失敗時(shí)Storm就會(huì)知道,并通知Spout重發(fā)消息。因?yàn)橐粋€(gè)Storm拓?fù)溥\(yùn)行在一個(gè)高度并發(fā)的環(huán)境中,跟蹤原始Spout示例的最好辦法是在消息Tuple中包含一個(gè)原始Spout的引用,這種行為(技術(shù))被稱為Anchoring(錨定)

錨點(diǎn)發(fā)生的語句在collector.emit(tuple, new Values(word))中,傳遞元組(emit方法)使Storm能夠跟蹤原始Spout。collector.ack(tuple)collector.fail(tuple)告訴Spout知道每個(gè)消息的處理結(jié)果。當(dāng)消息樹上的每個(gè)消息已經(jīng)被處理,Storm認(rèn)為來自Spout的元組被完全處理。當(dāng)消息樹在一個(gè)可配置的超時(shí)內(nèi)處理失敗,一個(gè)元組被認(rèn)為是失敗的。處理的每一個(gè)元組必須是確認(rèn)或者失敗,Storm會(huì)使用內(nèi)存來追蹤每個(gè)元組,如果不對(duì)每個(gè)元組進(jìn)行確認(rèn)/失敗,最終會(huì)耗盡內(nèi)存

為了簡(jiǎn)化編碼,Storm為Bolt提供了一個(gè)IBasicBolt接口,它會(huì)在調(diào)用execute()方法之后正確調(diào)用ack()方法,BaseBasicBolt類是該接口的一個(gè)實(shí)現(xiàn),用來實(shí)現(xiàn)自動(dòng)確認(rèn)。

7. Storm組件與編程時(shí)遇到的概念
名稱 解釋
Nimbus 負(fù)責(zé)資源分配和任務(wù)調(diào)度,Nimbus分配任務(wù)到Zookeeper指定目錄。
Supervisor 去Zookeeper指定目錄接受Nimbusf分配的任務(wù),啟停自己的Worker進(jìn)程。
Worker 運(yùn)行具體處理組件邏輯的進(jìn)程(process),Worker的任務(wù)分為Spout和Bolt兩種。
Task Worker啟動(dòng)相應(yīng)的物理線程(Executor),Worker執(zhí)行的每一個(gè)Spout/Bolt線程成為一個(gè)Task,0.8版本后Spout/Bolt的Task可能共享一個(gè)Executor。
Topology 拓?fù)?,Storm集群,即定義的數(shù)據(jù)流處理的DAG。
Spout Storm集群的數(shù)據(jù)源
Bolt Storm任務(wù)的處理邏輯單元,在集群多個(gè)機(jī)器上并發(fā)執(zhí)行。
Tuple 消息元組,Spout、Bolt用來與Storm集群通信、反饋任務(wù)處理成功與否的載體。恒定為20Bit。
Stream groupings 數(shù)據(jù)流的分組策略,分7種,常見為shuffleGrouping()、fieldsGrouping()。
Executor Worker啟動(dòng)的實(shí)際物理線程,一般一個(gè)Executor執(zhí)行一個(gè)Task,但也能執(zhí)行多個(gè)Task。
Configuration Topology的配置
8. 序列化與反序列化

由于博客上特別提到了Java虛擬機(jī)序列化的性能極其辣雞,所以在此記錄。

把對(duì)象轉(zhuǎn)換為字節(jié)序列的過程稱為對(duì)象的序列化;把字節(jié)序列恢復(fù)為對(duì)象的過程稱為對(duì)象的反序列化。

對(duì)象的序列化主要有兩種用途:

  • 把對(duì)象的字節(jié)序列永久地保存到硬盤上,通常存放在一個(gè)文件中;

  • 在網(wǎng)絡(luò)上傳送對(duì)象的字節(jié)序列。

在很多應(yīng)用中,需要對(duì)某些對(duì)象進(jìn)行序列化,讓它們離開內(nèi)存空間,入住物理硬盤,以便長(zhǎng)期保存。比如最常見的是Web服務(wù)器中的Session對(duì)象,當(dāng)有 10萬用戶并發(fā)訪問,就有可能出現(xiàn)10萬個(gè)Session對(duì)象,內(nèi)存可能吃不消,于是Web容器就會(huì)把一些seesion先序列化到硬盤中,等要用了,再把保存在硬盤中的對(duì)象還原到內(nèi)存中。

當(dāng)兩個(gè)進(jìn)程在進(jìn)行遠(yuǎn)程通信時(shí),彼此可以發(fā)送各種類型的數(shù)據(jù)。無論是何種類型的數(shù)據(jù),都會(huì)以二進(jìn)制序列的形式在網(wǎng)絡(luò)上傳送。發(fā)送方需要把這個(gè)Java對(duì)象轉(zhuǎn)換為字節(jié)序列,才能在網(wǎng)絡(luò)上傳送;接收方則需要把字節(jié)序列再恢復(fù)為Java對(duì)象。

在Java/Storm中,可以理解為toString()函數(shù)的自定義實(shí)現(xiàn)。注意使用transient 修飾的對(duì)象無法序列化。

9. declareOutputFields() 函數(shù)的具體作用

該Spout代碼里面最核心的部分有兩個(gè):

  • collector.emit()方法發(fā)射tuple。我們不用自己實(shí)現(xiàn)tuple,我們只需要定義tuplevalue,Storm會(huì)幫我們生成tuple。Values對(duì)象接受變長(zhǎng)參數(shù)。Tuple中以List存放Values,ListIndex按照new Values(obj1, obj2,…)的參數(shù)的index,例如我們emit(new Values("v1", "v2")), 那么Tuple的屬性即為:{ [ "v1" ], [ "V2" ] }
  • declarer.declare方法用來給我們發(fā)射的value在整個(gè)Stream中定義一個(gè)別名??梢岳斫鉃?code>key。該值必須在整個(gè)topology定義中唯一

3.5 Windows 平臺(tái)部署本地測(cè)試環(huán)境的注意事項(xiàng)

3.5.1 所需安裝包

  • Java SE Development Kit 8/9,安裝到非C:\Program Files\目錄下,否則storm將無法啟動(dòng)。
  • Apache-maven,解壓到本地目錄,推薦非系統(tǒng)盤
  • Zookeeper,解壓到本地目錄,推薦非系統(tǒng)盤
  • Storm,解壓到本地目錄,推薦非系統(tǒng)盤

3.5.2 環(huán)境變量配置

  • JAVA_HOME: path\to\your\jdk-file
  • Path: path\to\your-storm\bin;path\to\your-zookeeper\bin;path\to\your-maven\bin;%JAVA_HOME%\bin

3.5.3 配置文件設(shè)定

  • 配置zoo.cfg,參照3.1
  • 配置storm.yaml,參照3.1,注意storm.local.dirs中的目錄使用\\來表示\。

3.5.6 啟動(dòng)集群

zkServer
storm nimbus
storm supervisor
storm ui
# 打開瀏覽器 http://127.0.0.1:8080/index.html
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 目錄 場(chǎng)景假設(shè) 調(diào)優(yōu)步驟和方法 Storm 的部分特性 Storm 并行度 Storm 消息機(jī)制 Storm UI...
    mtide閱讀 17,285評(píng)論 30 60
  • 這是一個(gè)JStorm使用教程,不包含環(huán)境搭建教程,直接在公司現(xiàn)有集群上跑任務(wù),關(guān)于JStorm集群環(huán)境搭建,后續(xù)研...
    Coselding閱讀 6,737評(píng)論 1 9
  • Storm入門系列之一:storm核心概念及特性 本文的將介紹一些 storm 入門的基礎(chǔ)知識(shí),包括 storm ...
    zhaif閱讀 3,404評(píng)論 0 17
  • 一. wordCount Topology開發(fā): 1.spout數(shù)據(jù)收集器(SentenceSpout類): 有...
    奉先閱讀 1,292評(píng)論 0 0
  • 我在這家飲食公司做了那么久,一起工作的同事發(fā)生好多是是非非,我善良又沖動(dòng)的性格不太會(huì)交際拉關(guān)系討好人落得如此下場(chǎng),...
    Nataliechan我很快樂閱讀 179評(píng)論 0 0

友情鏈接更多精彩內(nèi)容