storm自定義實(shí)現(xiàn)wordcount

storm中的任務(wù)

  1. storm中的任務(wù)的結(jié)構(gòu)是Topology(拓?fù)鋱D),這個(gè)拓?fù)鋱D是一個(gè)有向無(wú)環(huán)圖(DAG),DAG能夠清楚的表達(dá)鏈?zhǔn)降娜蝿?wù),每一個(gè)節(jié)點(diǎn)都是一個(gè)任務(wù),邊的方向代表著數(shù)據(jù)流的方向。如下圖


    Paste_Image.png
  2. storm任務(wù)中數(shù)據(jù)流的數(shù)據(jù)結(jié)構(gòu)是一個(gè)個(gè)tuple,tuple元組是任意數(shù)據(jù)結(jié)構(gòu)類型的鍵值對(duì)組合。例如:(k1:v1, k2:v2, k3:v3, ····)
  3. Spout是數(shù)據(jù)采集器,從數(shù)據(jù)源采集數(shù)據(jù),轉(zhuǎn)成tuple發(fā)射到后面的bolt處理
  4. Bolt是數(shù)據(jù)處理器,可執(zhí)行數(shù)據(jù)過(guò)濾,分析等操作。

開發(fā)流程

  1. 設(shè)計(jì)Topology圖


    Paste_Image.png
  2. 按照Topology圖,創(chuàng)建maven項(xiàng)目后,依次寫各個(gè)任務(wù)節(jié)點(diǎn)。首先寫SentenceSpout節(jié)點(diǎn)。
package strom.strom;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class SentenceSpout extends BaseRichSpout {

    // tuple發(fā)射器
    private SpoutOutputCollector collector;

    private static final String[] SENTENCES = { "hadoop yarn mapreduce spark", "flume hadoop hive spark",
            "oozie yarn spark storm", "storm yarn mapreduce error", "error flume storm spark" };

    /*
     * 用于指定只針對(duì)本組件的一些特殊配置
     */
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    /*
     * spout組件的初始化方法 創(chuàng)建這個(gè)sentenceSpout組件實(shí)例時(shí)調(diào)用一次
     */
    @Override
    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
        // 用實(shí)例變量接收發(fā)射器
        this.collector = arg2;
    }

    /*
     * 聲明向后面的組件發(fā)送tuple的key是什么
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        arg0.declare(new Fields("sentence"));
    }

    /*
     * 1)指定tuple的value值,封裝tuple后,并將其發(fā)射給后面的組件, 2) 會(huì)迭代式的循環(huán)調(diào)用這個(gè)方法
     */
    @Override
    public void nextTuple() {
        // 從數(shù)組中隨意獲取一個(gè)值
        String sentence = SENTENCES[new Random().nextInt(SENTENCES.length)];
        // 指定value值并封裝為tuple后,把tuple發(fā)射給后面的組件
        this.collector.emit(new Values(sentence));
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. 寫splitbolt組件
package strom.strom;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class SplitBolt implements IRichBolt {

    // bolt組件中的發(fā)射器
    private OutputCollector collector;

    @Override
    public void cleanup() {

    }

    /*
     * 設(shè)置key名稱
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        arg0.declare(new Fields("word"));
    }

    /*
     * 每次接受到前面組件發(fā)送的tuple調(diào)用一次 ,封裝好tuple后發(fā)射
     */
    @Override
    public void execute(Tuple input) {
        // 獲取key value對(duì)后,取出value值
        String values = input.getStringByField("sentence");
        if (values != null && !"".equals(values)) {
            // 按空格分割value
            String[] valuelist = values.split(" ");
            for (String value : valuelist) {
                // 向后面的組件發(fā)射封裝好的tuple
                this.collector.emit(new Values(value));
            }
        }
    }

    /*
     * bolt組件初始化方法,只會(huì)調(diào)用一次
     */
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
        this.collector = arg2;
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

}

4.CountBolt組件實(shí)現(xiàn)計(jì)數(shù)邏輯

package strom.strom;
//

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class CountBolt extends BaseRichBolt {

    // 發(fā)射器
    private OutputCollector collector;
    // 為了計(jì)數(shù)
    private Map<String, Integer> counts;

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
        this.collector = arg2;
        this.counts = new HashMap<String, Integer>();
    }

    /*
     * 聲明key名稱,可以同時(shí)聲明多個(gè)
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        arg0.declare(new Fields("word", "count"));
    }

    /*
     * 統(tǒng)計(jì)單詞
     */
    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");

        int count = 1;
        // 如果這個(gè)單詞已經(jīng)存在,則取出count再加一
        if (counts.containsKey(word)) {
            count = counts.get(word) + 1;
        }
        counts.put(word, count);
        this.collector.emit(new Values(word, count));
    }
}

5 . PrintBolt組件

package strom.strom;
//

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class PrintBolt extends BaseRichBolt {

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {

    }

    /*
     * 打印到控制臺(tái)
     */
    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        int count = input.getIntegerByField("count");
        System.out.println(word + "---->" + count);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {

    }

}

6 . WordCountTopology類用來(lái)連接這些組件

package strom.strom;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCountTopology {

    private static final String SPOUT_ID = "sentenceSpout";
    private static final String SPLIT_BOLT = "splitBolt";
    private static final String COUNT_BOLT = "countBolt";
    private static final String PRINT_BOLT = "printBolt";

    public static void main(String[] args) {
        // 構(gòu)造Topology
        TopologyBuilder builder = new TopologyBuilder();
        // 指定spout
        builder.setSpout(SPOUT_ID, new SentenceSpout());
        // 指定bolt,并指定當(dāng)有有多個(gè)bolt時(shí),數(shù)據(jù)流發(fā)射的分組策略
        builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(SPOUT_ID);
        // 因?yàn)橐WC正確的單詞計(jì)數(shù),同一個(gè)單詞一定要?jiǎng)澐值酵粋€(gè)CountBolt上,所以按照字段值分組
        builder.setBolt(COUNT_BOLT, new CountBolt()).fieldsGrouping(SPLIT_BOLT, new Fields("word"));
        // 全局分組,所有tuple發(fā)射到一個(gè)printbolt,一般是id最小的那一個(gè)
        builder.setBolt(PRINT_BOLT, new PrintBolt()).globalGrouping(COUNT_BOLT);

        Config conf = new Config();

        if (args == null || args.length == 0) {
            // 本地執(zhí)行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("wordcount", conf, builder.createTopology());
        } else {
            // 提交到集群上執(zhí)行
            // 指定使用多少個(gè)進(jìn)程來(lái)執(zhí)行該Topology
            conf.setNumWorkers(1);
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        }
    }
}
  1. 本地執(zhí)行測(cè)試


    Paste_Image.png
  2. 打成jar包后上傳到storm集群測(cè)試


    Paste_Image.png

    下面的jar包包含著依賴的包,上面的jar包中沒(méi)有包括,所以我們選擇使用下面這個(gè)jar包。
    上傳到集群上然后執(zhí)行

$ bin/storm jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar strom.strom.WordCountTopology wordcount

在UI中查看運(yùn)行情況


Paste_Image.png

查看運(yùn)行日志


Paste_Image.png

Paste_Image.png

Paste_Image.png

查看拓?fù)鋱D


Paste_Image.png

Paste_Image.png

Paste_Image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Date: Nov 17-24, 2017 1. 目的 積累Storm為主的流式大數(shù)據(jù)處理平臺(tái)對(duì)實(shí)時(shí)數(shù)據(jù)處理的相關(guān)...
    一只很努力爬樹的貓閱讀 2,329評(píng)論 0 4
  • 目錄 場(chǎng)景假設(shè) 調(diào)優(yōu)步驟和方法 Storm 的部分特性 Storm 并行度 Storm 消息機(jī)制 Storm UI...
    mtide閱讀 17,283評(píng)論 30 60
  • 本文主要介紹storm中的基本概念,從基礎(chǔ)上了解strom的體系結(jié)構(gòu),便于后續(xù)編程過(guò)程中作為基礎(chǔ)指導(dǎo)。主要的概念包...
    看山遠(yuǎn)兮閱讀 1,672評(píng)論 0 9
  • 流式計(jì)算中,各個(gè)中間件產(chǎn)品對(duì)計(jì)算過(guò)程中的角色的抽象都不盡相同,實(shí)現(xiàn)方式也是千差萬(wàn)別。本文針對(duì)storm中間件在進(jìn)行...
    一品悟技術(shù)_張馳閱讀 2,708評(píng)論 0 1
  • 他不經(jīng)意走過(guò)她的身旁 微笑的眼底跳動(dòng)著的卻是憂傷 他看起來(lái)似乎并不迷惘 他說(shuō),只是當(dāng)回憶想起時(shí) 難免有些彷徨 曾經(jīng)...
    沙華_ad21閱讀 175評(píng)論 0 3

友情鏈接更多精彩內(nèi)容