使用Storm實(shí)現(xiàn)累加求和操作

package com.csylh;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

/**
 * Description:使用Storm實(shí)現(xiàn)累加求和操作
 *
 * @author: 留歌36
 * Date:2018/9/3 16:50
 */
public class LocalSumStormTopology {
    /**
     * Spout需要繼承BaseRichSpout
     * 數(shù)據(jù)源需要產(chǎn)生數(shù)據(jù)并發(fā)射到Bolt
     */
    public static class DataSourceSpout extends BaseRichSpout{
        //定義一個(gè)發(fā)射器
        private SpoutOutputCollector collector;

        /**
         * 初始化方法 只是會(huì)被調(diào)用一次
         * @param conf     配置參數(shù)
         * @param context 上下文:相當(dāng)于一個(gè)框 可以從里面獲取許多東西
         * @param collector 數(shù)據(jù)發(fā)射器
         */
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            //將傳入的collector發(fā)射器 對(duì)私有變量 進(jìn)行賦初值
            this.collector = collector;
        }
        int number = 0;
        /**
         *  用于產(chǎn)生數(shù)據(jù)
         *  生產(chǎn)中肯定是從消息隊(duì)列中獲取數(shù)據(jù)
         *  這個(gè)方法是一個(gè)死循環(huán)
         */
        @Override
        public void nextTuple() {
            //發(fā)送方式,調(diào)用上面定義的數(shù)據(jù)發(fā)射器
            this.collector.emit(new Values(number++));

            System.out.println("Spout==》發(fā)送的數(shù)據(jù):" + number);
            //每隔1s中發(fā)射一次,防止數(shù)據(jù)產(chǎn)生太快
            Utils.sleep(1000);

        }
        /**
         * 聲明輸出字段
         * @param declarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("num"));
        }
    }

    /**
     * Bolt需要繼承BaseRichBolt
     * 用于接收數(shù)據(jù)并對(duì)數(shù)據(jù)進(jìn)行處理
     */
    public static class SumBolt extends BaseRichBolt{
        /**
         *  初始化方法 ,會(huì)被執(zhí)行一次
         * @param stormConf
         * @param context
         * @param collector  這里的數(shù)據(jù)發(fā)射器,由于業(yè)務(wù)邏輯中沒(méi)有沒(méi)有必要進(jìn)行放下發(fā)的操作,所以就是沒(méi)有必要進(jìn)行new一個(gè)
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }
        int sum = 0;
        /**
         * 也是一個(gè)死循環(huán) ,職責(zé): 獲取Spout發(fā)射過(guò)來(lái)的數(shù)據(jù)
         * @param input
         */
        @Override
        public void execute(Tuple input) {

           //Bolt中獲取值可以通過(guò)index獲取
            // 也可以根據(jù)上一個(gè)環(huán)節(jié)中定義的filed的名稱獲取(***推薦)
          Integer value = input.getIntegerByField("num");
          sum += value;

          System.out.println("Bolt :Sum = ["+ sum + "]");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    public static void main(String[] args) {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("DataSourceSpout",new DataSourceSpout());
        builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");

        //創(chuàng)建一個(gè)本地的Storm集群 ,本地模式運(yùn)行,不需要搭建Storm集群
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());

    }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容