jstorm 核心

1.api介紹

生成Topology

Map conf = new HashMp();
//topology所有自定義的配置均放入這個(gè)Map

TopologyBuilder builder = new TopologyBuilder();
//創(chuàng)建topology的生成器

int spoutParal = get("spout.parallel", 1);
//獲取spout的并發(fā)設(shè)置

SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
                new SequenceSpout(), spoutParal);
//創(chuàng)建Spout, 其中new SequenceSpout() 為真正spout對(duì)象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 為spout的名字,注意名字中不要含有空格

int boltParal = get("bolt.parallel", 1);
//獲取bolt的并發(fā)設(shè)置

BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
                boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
//創(chuàng)建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 為bolt名字,TotalCount 為bolt對(duì)象,boltParal為bolt并發(fā)數(shù),
//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME), 
//表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的數(shù)據(jù),并且以shuffle方式,
//即每個(gè)spout隨機(jī)輪詢發(fā)送tuple到下一級(jí)bolt中

int ackerParal = get("acker.parallel", 1);
Config.setNumAckers(conf, ackerParal);
//設(shè)置表示acker的并發(fā)數(shù)

int workerNum = get("worker.num", 10);
conf.put(Config.TOPOLOGY_WORKERS, workerNum);
//表示整個(gè)topology將使用幾個(gè)worker

conf.put(Config.STORM_CLUSTER_MODE, "distributed");
//設(shè)置topolog模式為分布式,這樣topology就可以放到JStorm集群上運(yùn)行

StormSubmitter.submitTopology(streamName, conf,
                builder.createTopology());
//提交topology

IRichSpout
IRichSpout 為最簡(jiǎn)單的Spout接口

 IRichSpout{

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    }

    @Override
    public void close() {
    }

    @Override
    public void activate() {
    }

    @Override
    public void deactivate() {
    }

    @Override
    public void nextTuple() {
    }

    @Override
    public void ack(Object msgId) {
    }

    @Override
    public void fail(Object msgId) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

其中注意:
=>spout對(duì)象必須是繼承Serializable, 因此要求spout內(nèi)所有數(shù)據(jù)結(jié)構(gòu)必須是可序列化的
=>spout可以有構(gòu)造函數(shù),但構(gòu)造函數(shù)只執(zhí)行一次,是在提交任務(wù)時(shí),創(chuàng)建spout對(duì)象,因此在task分配到具體worker之前的初始化工作可以在此處完成,一旦完成,初始化的內(nèi)容將攜帶到每一個(gè)=>task內(nèi)(因?yàn)樘峤蝗蝿?wù)時(shí)將spout序列化到文件中去,在worker起來時(shí)再將spout從文件中反序列化出來)。
=>open是當(dāng)task起來后執(zhí)行的初始化動(dòng)作
=>close是當(dāng)task被shutdown后執(zhí)行的動(dòng)作
=>activate 是當(dāng)task被激活時(shí),觸發(fā)的動(dòng)作
=>deactivate 是task被deactive時(shí),觸發(fā)的動(dòng)作
=>nextTuple 是spout實(shí)現(xiàn)核心, nextuple完成自己的邏輯,即每一次取消息后,用collector 將消息emit出去。
=>ack, 當(dāng)spout收到一條ack消息時(shí),觸發(fā)的動(dòng)作,詳情可以參考 ack機(jī)制
=>fail, 當(dāng)spout收到一條fail消息時(shí),觸發(fā)的動(dòng)作,詳情可以參考 ack機(jī)制
=>declareOutputFields, 定義spout發(fā)送數(shù)據(jù),每個(gè)字段的含義
=>getComponentConfiguration 獲取本spout的component 配置

Bolt

IRichBolt {

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    }

    @Override
    public void execute(Tuple input) {
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

}

其中注意:
=>bolt對(duì)象必須是繼承Serializable, 因此要求spout內(nèi)所有數(shù)據(jù)結(jié)構(gòu)必須是可序列化的
=>bolt可以有構(gòu)造函數(shù),但構(gòu)造函數(shù)只執(zhí)行一次,是在提交任務(wù)時(shí),創(chuàng)建bolt對(duì)象,因此在task分配到具體worker之前的初始化工作可以在此處完成,一旦完成,初始化的內(nèi)容將攜帶到每一個(gè)task內(nèi)(因?yàn)樘峤蝗蝿?wù)時(shí)將bolt序列化到文件中去,在worker起來時(shí)再將bolt從文件中反序列化出來)。
=>prepare是當(dāng)task起來后執(zhí)行的初始化動(dòng)作
=>cleanup是當(dāng)task被shutdown后執(zhí)行的動(dòng)作
=>execute是bolt實(shí)現(xiàn)核心, 完成自己的邏輯,即接受每一次取消息后,處理完,有可能用collector 將產(chǎn)生的新消息emit出去。 ** 在executor中,當(dāng)程序處理一條消息時(shí),需要執(zhí)行collector.ack, 詳情可以參考 ack機(jī)制 ** 在executor中,當(dāng)程序無法處理一條消息時(shí)或出錯(cuò)時(shí),需要執(zhí)行collector.fail ,詳情可以參考 ack機(jī)制
=>declareOutputFields, 定義bolt發(fā)送數(shù)據(jù),每個(gè)字段的含義
=>getComponentConfiguration 獲取本bolt的component 配置

     <dependency>
            <groupId>com.alibaba.jstorm</groupId>
            <artifactId>jstorm-client</artifactId>
            <version>0.9.3.1</version>
            <scope>provided</scope>
        </dependency> 


         <dependency>
            <groupId>com.alibaba.jstorm</groupId>
            <artifactId>jstorm-client-extension</artifactId>
            <version>0.9.3.1</version>
            <scope>provided</scope>
        </dependency>

打包

<build>
        <plugins>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>storm.starter.SequenceTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

提交jar
xxxx.jar 為打包后的jar
com.alibaba.xxxx.xx 為入口類,即提交任務(wù)的類
parameter即為提交參數(shù)

jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter

2.Ack原理

Storm中有個(gè)特殊的task名叫acker,他們負(fù)責(zé)跟蹤spout發(fā)出的每一個(gè)Tuple的Tuple樹(因?yàn)橐粋€(gè)tuple通過spout發(fā)出了,經(jīng)過每一個(gè)bolt處理后,會(huì)生成一個(gè)新的tuple發(fā)送出去)。當(dāng)acker(框架自啟動(dòng)的task)發(fā)現(xiàn)一個(gè)Tuple樹已經(jīng)處理完成了,它會(huì)發(fā)送一個(gè)消息給產(chǎn)生這個(gè)Tuple的那個(gè)task。Acker的跟蹤算法是Storm的主要突破之一,對(duì)任意大的一個(gè)Tuple樹,它只需要恒定的20字節(jié)就可以進(jìn)行跟蹤。

Acker跟蹤算法的原理:acker對(duì)于每個(gè)spout-tuple保存一個(gè)ack-val的校驗(yàn)值,它的初始值是0,然后每發(fā)射一個(gè)Tuple或Ack一個(gè)Tuple時(shí),這個(gè)Tuple的id就要跟這個(gè)校驗(yàn)值異或一下,并且把得到的值更新為ack-val的新值。那么假設(shè)每個(gè)發(fā)射出去的Tuple都被ack了,那么最后ack-val的值就一定是0。Acker就根據(jù)ack-val是否為0來判斷是否完全處理,如果為0則認(rèn)為已完全處理。
要實(shí)現(xiàn)ack機(jī)制:

1,spout發(fā)射tuple的時(shí)候指定messageId
2,spout要重寫B(tài)aseRichSpout的fail和ack方法
3,spout對(duì)發(fā)射的tuple進(jìn)行緩存(否則spout的fail方法收到acker發(fā)來的messsageId,spout也無法獲取到發(fā)送失敗的數(shù)據(jù)進(jìn)行重發(fā)),看看系統(tǒng)提供的接口,只有msgId這個(gè)參數(shù),這里的設(shè)計(jì)不合理,其實(shí)在系統(tǒng)里是有cache整個(gè)msg的,只給用戶一個(gè)messageid,用戶如何取得原來的msg貌似需要自己cache,然后用這個(gè)msgId去查詢,太坑爹了3,spout根據(jù)messageId對(duì)于ack的tuple則從緩存隊(duì)列中刪除,對(duì)于fail的tuple可以選擇重發(fā)。
4,設(shè)置acker數(shù)至少大于0;Config.setNumAckers(conf, ackerParal);

阿里自己的Jstorm會(huì)提供
public interface IFailValueSpout { void fail(Object msgId, List<object>values); }
這樣更合理一些, 可以直接取得系統(tǒng)cache的msg values

ack機(jī)制即,spout發(fā)送的每一條消息,在規(guī)定的時(shí)間內(nèi),spout收到Acker的ack響應(yīng),即認(rèn)為該tuple 被后續(xù)bolt成功處理

在規(guī)定的時(shí)間內(nèi)(默認(rèn)是30秒),沒有收到Acker的ack響應(yīng)tuple,就觸發(fā)fail動(dòng)作,即認(rèn)為該tuple處理失敗,timeout時(shí)間可以通過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來設(shè)定。
l或者收到Acker發(fā)送的fail響應(yīng)tuple,也認(rèn)為失敗,觸發(fā)fail動(dòng)作
注意,我開始以為如果繼承BaseBasicBolt那么程序拋出異常,也會(huì)讓spout進(jìn)行重發(fā),但是我錯(cuò)了,程序直接異常停止了
這里我以分布式程序入門案例worldcount為例子吧。

Paste_Image.png

問題:

有沒有想過,如果該tuple的眾多子tuple中,某一個(gè)子tuple處理
failed了,但是另外的子tuple仍然會(huì)繼續(xù)執(zhí)行,如果子tuple都是執(zhí)
行數(shù)據(jù)存儲(chǔ)操作,那么就算整個(gè)消息失敗,那些生成的子tuple還
是會(huì)成功執(zhí)行而不會(huì)回滾的。

(1)關(guān)于Storm如何處理重復(fù)的tuple問題

有人問到Storm 是怎么處理重復(fù)的tuple?
因?yàn)镾torm 要保證tuple 的可靠處理,當(dāng)tuple 處理失敗或者超時(shí)的時(shí)候,spout 會(huì)fail并重新發(fā)送該tuple,那么就會(huì)有tuple 重復(fù)計(jì)算的問題。這個(gè)問題是很難解決的,storm也沒有提供機(jī)制幫助你解決。不過也有一些可行的策略:
(1)不處理,這也算是種策略。因?yàn)閷?shí)時(shí)計(jì)算通常并不要求很高的精確度,后
續(xù)的批處理計(jì)算會(huì)更正實(shí)時(shí)計(jì)算的誤差。
(2)使用第三方集中存儲(chǔ)來過濾,比如利用MySQL、MemCached 或者Redis 根據(jù)邏輯主鍵來去重。
(3)使用bloom filter 做過濾,簡(jiǎn)單高效。

(2)關(guān)于Storm的ack和fail問題

在學(xué)習(xí)storm的過程中,有不少人對(duì)storm的Spout組件中的ack及fail相關(guān)的問題存在困惑,這里做一個(gè)簡(jiǎn)要的概述。

Storm保證每一個(gè)數(shù)據(jù)都得到有效處理,這是如何保證的呢?正是ack及fail機(jī)制確保數(shù)據(jù)都得到處理的保證,但是storm只是提供給我們一個(gè)接口,而具體的方法得由我們自己來實(shí)現(xiàn)。例如在spout下一個(gè)拓?fù)涔?jié)點(diǎn)的bolt上,我們定義某種情況下為數(shù)據(jù)處理失敗,則調(diào)用fail,則我們可以在fail方法中進(jìn)行數(shù)據(jù)重發(fā),這樣就保證了數(shù)據(jù)都得到了處理。其實(shí),通過讀storm的源碼,里面有講到,有些類(BaseBasicBolt?)是會(huì)自動(dòng)調(diào)用ack和fail的,不需要我們程序員去ack和fail,但是其他Bolt就沒有這種功能了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Date: Nov 17-24, 2017 1. 目的 積累Storm為主的流式大數(shù)據(jù)處理平臺(tái)對(duì)實(shí)時(shí)數(shù)據(jù)處理的相關(guān)...
    一只很努力爬樹的貓閱讀 2,329評(píng)論 0 4
  • Storm入門系列之一:storm核心概念及特性 本文的將介紹一些 storm 入門的基礎(chǔ)知識(shí),包括 storm ...
    zhaif閱讀 3,392評(píng)論 0 17
  • 目錄 場(chǎng)景假設(shè) 調(diào)優(yōu)步驟和方法 Storm 的部分特性 Storm 并行度 Storm 消息機(jī)制 Storm UI...
    mtide閱讀 17,283評(píng)論 30 60
  • 一. wordCount Topology開發(fā): 1.spout數(shù)據(jù)收集器(SentenceSpout類): 有...
    奉先閱讀 1,288評(píng)論 0 0
  • 20170902周慧心賞第19天 親愛的老公,最近一段時(shí)間,你見到我父母都很有禮貌的叫一聲爸,叫一聲媽,出...
    hmzhou閱讀 207評(píng)論 0 1

友情鏈接更多精彩內(nèi)容