JStorm:單詞計數-開發(fā)示例

JStorm:1、概念與編程模型
JStorm:2、任務調度

轉載自個人博客
示例功能說明:統計單詞出現的次數,spout將持續(xù)輸入的一句句話作為輸入流,bolt將一句話分割成單詞,最后統計每個單詞出現的次數。

示例介紹

如下圖所示,單詞計數topology由一個spout和下游三個bolt組成。


SentenceSpout:向后端發(fā)射一個單值tuple組成的數據流,鍵名“sentence”,tuple如下:
{“sentence”:“my name is zhangsan”}
SplitSentenceBolt:訂閱SentenceSpout發(fā)射的數據流,將“sentence”中的語句分割為一個個單詞,向后端發(fā)射“word”組成的tuple如下:
{“word”:“my”}
{“word”:“name”}
{“word”:“is”}
{“word”:“zhangsan”}
WordCountBolt:訂閱SplitSentenceBolt發(fā)射的數據流,保存每個特定單詞出現的次數,每當bolt收到一個tuple,將對應單詞的計數加一,并想后發(fā)射該單詞當前的計數。
{“word”:“my”,“count”:“5”}
ReportBolt:訂閱WordCountBolt的輸出流,維護一份所有單詞對應的計數表,結束時將所有值打印。

代碼實現

添加Pom.xml依賴

<dependency>
        <groupId>com.alibaba.jstorm</groupId>
        <artifactId>jstorm-core</artifactId>
        <version>2.2.1</version>
        <!-- <scope>provided</scope> -->
</dependency>

SentenceSpout:繼承BaseRichSpout類,在nextTuple方法中生成并向后發(fā)射數據流,declareOutputFields方法定義了向后發(fā)射數據流tuple的字段名為:sentence。
SplitSentenceBolt:繼承BaseRichBolt類,在execute方法中將接收到的tuple分割為單詞,并向后傳輸tuple,declareOutputFields定義了tuple字段為word。
WordCountBolt:繼承BaseRichBolt,在execute方法中統計單詞出現的次數,本地使用HashMap保存所有單詞出現的次數。接收到tuple后更新該單詞出現的次數并向后傳輸tuple,declareOutputFields定義了tuple為"word", "count"。
ReportBolt:繼承BaseRichBolt類,在execute方法中匯總所有單詞出現的次數。本地使用HashMap保存所有單詞出現的次數。當任務結束時,Cleanup方法打印統計結果。
WordCountTopology:創(chuàng)建topology,定義了Spout以及Bolt之間數據流傳輸的規(guī)則,以及并發(fā)數(前后并發(fā)為2、2、4、1)。進程(worker)、線程(Executor)與Task之間的關系如下圖:


核心代碼參考如下,注意其中的shuffleGrouping設定向后傳輸數據流為隨機,fieldsGrouping按照字段值向后傳輸數據流,能保證同一個單詞由同一個WordCountBolt統計,而globalGrouping保證匯總的bolt是單例。

WordCountTopology.java

//WordCountTopology代碼
import storm.blueprints.word.v1.*;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import static storm.blueprints.utils.Utils.*;

public class WordCountTopology {

    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) throws Exception {

        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt splitBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();


        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
        // SentenceSpout --> SplitSentenceBolt
        builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2)
                .setNumTasks(4)
                .shuffleGrouping(SENTENCE_SPOUT_ID);
        // SplitSentenceBolt --> WordCountBolt
        builder.setBolt(COUNT_BOLT_ID, countBolt, 4)
                .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
        // WordCountBolt --> ReportBolt
        builder.setBolt(REPORT_BOLT_ID, reportBolt)
                .globalGrouping(COUNT_BOLT_ID);

        Config config = new Config();
        config.setNumWorkers(2);

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
        waitForSeconds(10);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
    }
}

SentenceSpout.java

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.blueprints.utils.Utils;

import java.util.Map;

public class SentenceSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;
    private String[] sentences = {
        "my dog has fleas",
        "i like cold beverages",
        "the dog ate my homework",
        "don't have a cow man",
        "i don't think i like fleas"
    };
    private int index = 0;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    public void open(Map config, TopologyContext context, 
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    public void nextTuple() {
        this.collector.emit(new Values(sentences[index]));
        index++;
        if (index >= sentences.length) {
            index = 0;
        }
        Utils.waitForMillis(1000);
    }
}

SplitSentenceBolt.java

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;

public class SplitSentenceBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            this.collector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

WordCountBolt.java

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;

public class WordCountBolt extends BaseRichBolt{
    private OutputCollector collector;
    private HashMap<String, Long> counts = null;

    public void prepare(Map config, TopologyContext context, 
            OutputCollector collector) {
        this.collector = collector;
        this.counts = new HashMap<String, Long>();
    }

    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = this.counts.get(word);
        if(count == null){
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        this.collector.emit(new Values(word, count));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

ReportBolt.java

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ReportBolt extends BaseRichBolt {

    private HashMap<String, Long> counts = null;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.counts = new HashMap<String, Long>();
    }

    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = tuple.getLongByField("count");
        this.counts.put(word, count);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // this bolt does not emit anything
    }

    @Override
    public void cleanup() {
        System.out.println("--- FINAL COUNTS ---");
        List<String> keys = new ArrayList<String>();
        keys.addAll(this.counts.keySet());
        Collections.sort(keys);
        for (String key : keys) {
            System.out.println(key + " : " + this.counts.get(key));
        }
        System.out.println("--------------");
    }
}

Utils.java

public class Utils {

    public static void waitForSeconds(int seconds) {
        try {
            Thread.sleep(seconds * 1000);
        } catch (InterruptedException e) {
        }
    }

    public static void waitForMillis(long milliseconds) {
        try {
            Thread.sleep(milliseconds);
        } catch (InterruptedException e) {
        }
    }
}

轉載請標明出處

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 一. wordCount Topology開發(fā): 1.spout數據收集器(SentenceSpout類): 有...
    奉先閱讀 1,281評論 0 0
  • 原文鏈接Storm Tutorial 本人原創(chuàng)翻譯,轉載請注明出處 這個教程內容包含如何創(chuàng)建topologies及...
    quiterr閱讀 1,750評論 0 6
  • Date: Nov 17-24, 2017 1. 目的 積累Storm為主的流式大數據處理平臺對實時數據處理的相關...
    一只很努力爬樹的貓閱讀 2,325評論 0 4
  • 這是一個JStorm使用教程,不包含環(huán)境搭建教程,直接在公司現有集群上跑任務,關于JStorm集群環(huán)境搭建,后續(xù)研...
    Coselding閱讀 6,706評論 1 9
  • 在愛情這場博弈中, 從不曾有勝負之分, 要么兩敗俱傷, 要么皆大歡喜, 無論怎樣的結局, 我們都不曾后悔, 只因我...
    夏藜若閱讀 160評論 2 0

友情鏈接更多精彩內容