Storm Trident之一spout和bolt

Trident Spout


Trident Spout特點(diǎn)

  • Trident中,定義Spout的接口為ITridentSpout。
  • Trident Spout必須以批量形式發(fā)送tuple。
  • Trident Spout不真正執(zhí)行數(shù)據(jù)的發(fā)送,而是由ITridentSpout.Emitter負(fù)責(zé)發(fā)送數(shù)據(jù)。同時(shí)引入了協(xié)調(diào)器的概念,協(xié)調(diào)器負(fù)責(zé)管理數(shù)據(jù)發(fā)送的批次和元數(shù)據(jù),當(dāng)事務(wù)失敗時(shí),調(diào)度Emitter根據(jù)元數(shù)據(jù)重新發(fā)送數(shù)據(jù)。協(xié)調(diào)器接口為ITridentSpout.BatchCoordinator。

maven依賴

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.2</version>
</dependency>

Trident Spout簡單實(shí)現(xiàn)

    public class WordSpout implements ITridentSpout<String> {
        
        /**
         * 
         */
        private static final long serialVersionUID = -954626449213280061L;
    
        /**
         * 協(xié)調(diào)器
         * 負(fù)責(zé)保存重放batch元數(shù)據(jù),當(dāng)重放一個(gè)batch時(shí),通過協(xié)調(diào)器中保存的元數(shù)據(jù)創(chuàng)建batch
         */
        @Override
        public BatchCoordinator<String> getCoordinator(String txStateId,Map conf, TopologyContext context) {
            return new WordCoordinator();
        }
    
        @Override
        public Emitter<String> getEmitter(String txStateId, Map conf, TopologyContext context) {
            return new WordEmitter();
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
        /**
         * 定義發(fā)送的所有字段
         */
        @Override
        public Fields getOutputFields() {
            return new Fields("field1","field2");
        }
        
        private class WordCoordinator implements BatchCoordinator<String> {
    
            @Override
            public String initializeTransaction(long txid, String prevMetadata, String currMetadata) {
                return null;
            }
    
            @Override
            public void success(long txid) {
                logger.info("success: " + txid);
            }
    
            @Override
            public boolean isReady(long txid) {
                return Boolean.TRUE;
            }
    
            @Override
            public void close() {
                
            }
            
        }
        
        /**
         * 發(fā)射器
         * 發(fā)送數(shù)據(jù)流
         *
         */
        private class WordEmitter implements Emitter<String> {
    
            @Override
            public void success(TransactionAttempt tx) {
                logger.info("emitter success " + tx.getId());
            }
    
            @Override
            public void close() {
            }
            
            /**
             * 每次調(diào)用本方法所發(fā)送的數(shù)據(jù)集合被稱為batch
             * batch是Trident中發(fā)送數(shù)據(jù)流的最小單元
             */
            @Override
            public void emitBatch(TransactionAttempt tx, String coordinatorMeta, TridentCollector collector) {
                for(int i=0;i<10;i++){
                    List list = Lists.newArrayList();
                    list.add("event1");
                    list.add("event2");
                    collector.emit(list);
                }
            }
            
        }
    
        private Logger logger = LoggerFactory.getLogger("Trident Spout");
    }
重點(diǎn)說明

Emitter定義的emitBatch方法。該方法實(shí)現(xiàn)了發(fā)送哪里數(shù)據(jù)。該方法每執(zhí)行一次,發(fā)送的所有數(shù)據(jù)被稱為batch。batch中的每條數(shù)據(jù)被稱為tuple。

ITridentSpout.getOutputFields定義了每條tuple有哪些字段,本例中定義了2個(gè)字段,字段名為"field1"、"field2"。在Emitter.emitBatch中每條tuple均符合該定義。本例中每次調(diào)用emitBatch方法發(fā)送的數(shù)據(jù)內(nèi)容及格式可以假象如下:

[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]

每次調(diào)用均發(fā)送了10條數(shù)據(jù)(10個(gè)tuple),這10個(gè)tuple構(gòu)成1個(gè)batch,tuple均符合ITridentSpout.getOutputFields中定義的字段

Trident bolt


Trident bolt特點(diǎn)

  • Trident中沒有bolt接口,而是分為了Filter和Function兩類

Trident Function簡單實(shí)現(xiàn)

public class WordFunction extends BaseFunction {

    /**
     * 
     */
    private static final long serialVersionUID = 735468688795780833L;

    /**
     * 接收數(shù)據(jù)流
     * 每次接收batch中一條數(shù)據(jù)
     */
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        logger.info(tuple.getValueByField("field1").toString());
    }

    private Logger logger = LoggerFactory.getLogger("Trident Function");
}
重點(diǎn)說明

BaseFunction已經(jīng)實(shí)現(xiàn)了Function接口。

execute方法用于具體實(shí)現(xiàn)接收到tuple后如何處理。每次接收1個(gè)tuple。在本例中emitter每次發(fā)送1個(gè)batch,每個(gè)batch有10條數(shù)據(jù),則每次發(fā)送數(shù)據(jù),execute方法均會(huì)被調(diào)用10次。

在tuple中可以獲取數(shù)據(jù)流中的數(shù)據(jù),能夠獲取的字段受TridentTopology對(duì)象的控制。

在execute方法中處理完成后可繼續(xù)使用TridentCollector對(duì)象繼續(xù)發(fā)送數(shù)據(jù)到下一節(jié)點(diǎn),F(xiàn)unction發(fā)送數(shù)據(jù)時(shí)只能添加新的字段,不能修改或刪除已有的字段

啟動(dòng)TridentTopology


public class Start {

    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();
        WordSpout spout = new WordSpout();
        WordFunction function = new WordFunction();

        topology.newStream("filter", spout)
                /**
                 * 將spout發(fā)送的數(shù)據(jù)流中哪些字段傳入bolt中
                 */
                .each(new Fields("field1"), function, new Fields());

        return topology.build();
    }

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("MyStorm", conf, buildTopology());

        Thread.sleep(1000 * 60);
        cluster.shutdown();
    }

}
重點(diǎn)說明

topology定義數(shù)據(jù)流時(shí)指定function可以讀取哪些字段。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一. wordCount Topology開發(fā): 1.spout數(shù)據(jù)收集器(SentenceSpout類): 有...
    奉先閱讀 1,291評(píng)論 0 0
  • 這是一個(gè)JStorm使用教程,不包含環(huán)境搭建教程,直接在公司現(xiàn)有集群上跑任務(wù),關(guān)于JStorm集群環(huán)境搭建,后續(xù)研...
    Coselding閱讀 6,734評(píng)論 1 9
  • 聲明 本文首發(fā)于個(gè)人技術(shù)博客,轉(zhuǎn)載請(qǐng)注明出處,本文鏈接:http://qifuguang.me/2015/11/2...
    winwill2012閱讀 2,293評(píng)論 1 15
  • 簡介: Trident 是 Storm 的一種高度抽象的實(shí)時(shí)計(jì)算模型,它可以將高吞吐量(每秒百萬級(jí))數(shù)據(jù)輸入、有狀...
    hello_coke閱讀 3,347評(píng)論 0 1
  • 現(xiàn)在的社會(huì)太匆忙,人情關(guān)系太復(fù)雜,有些事情只能靠自己解決,無論是工作還是生活。有自己的想法,就應(yīng)該想各種辦法,最終...
    平靜的海洋閱讀 1,610評(píng)論 0 0

友情鏈接更多精彩內(nèi)容