2 Storm代碼實(shí)現(xiàn)

Storm的一個拓?fù)渲邪⊿pout和Blots。
代碼主要體現(xiàn)在Spout讀取數(shù)據(jù),然后發(fā)送給Blot去處理。

首先添加maven依賴

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.1.1</version>
    </dependency>

Spout讀取數(shù)據(jù)

實(shí)現(xiàn)Spout有兩種方式,一種是繼承BaseRichSpout,一種是實(shí)現(xiàn)IRichSpout
其實(shí)BaseRichSpout也是實(shí)現(xiàn)了IRichSpout。

public abstract class BaseRichSpout extends BaseComponent implements IRichSpout

這里我就用BaseRichSpout去實(shí)現(xiàn)讀取文件

import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Map;

public class SpoutTest  extends BaseRichSpout implements Serializable {
    SpoutOutputCollector collector;
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
    }

    public void nextTuple() {
        //讀取目錄`d:\\test`下的txt格式的文件,你也可以添加其他類型
        Collection<File> listFiles = FileUtils.listFiles(new File("d:\\test"),  new String[] { "txt" }, true);
        for (File file : listFiles) {
            // 行格式發(fā)送
            try {
               //按行發(fā)送
                List<String> lines = FileUtils.readLines(file,"utf-8");
                for (String line : lines) {
                    this.collector.emit(new Values(lines));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            // 文件已經(jīng)處理完成
            try {
                File srcFile = file.getAbsoluteFile();
                File destFile = new File(srcFile + ".done." + System.currentTimeMillis());
                FileUtils.moveFile(srcFile, destFile);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("lines"));
    }
}

Blot處理數(shù)據(jù)

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class BlotTest extends BaseRichBolt {
    private Map conf;
    private TopologyContext context;
    private OutputCollector collector;

    //準(zhǔn)備階段,初始化conf,context和collector
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        //接收tuple中的信息
        String line = tuple.getStringByField("line");
        if ("".equals(line) || null == line){
            return;
        }
        System.out.println(line);
          //。。。這塊處理數(shù)據(jù)或者存儲數(shù)據(jù)庫
          //如果有需要發(fā)送到下一個blot,在下一個blot存儲
//        collector.emit(new Values(line));

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //outputFieldsDeclarer.declare(new Fields("phone","time"));
    }
}

上面的Blot接收之前的Spout傳過來的數(shù)據(jù)。如果為空直接返回。如果還需要過濾,則可以調(diào)用上面注釋的代碼繼續(xù)發(fā)送到下一個blot,當(dāng)然需要下面的declareOutputFields()和spout一樣。

最后主方法

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;

public class Main {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        try {
            //移動
            builder.setSpout("spoutid",new SpoutTest());
            builder.setBolt("blotid", new BlotTest()).shuffleGrouping("spoutid");
            //對應(yīng)Blot里面的注釋,以phone分組,給它開了4個并行度
           // builder.setBolt("blotid", new BlotTest(),4).fieldsGrouping("spoutid",new Fields("phone"));
            Config config = new Config();
            //這里對數(shù)據(jù)準(zhǔn)確性要求不高,就不設(shè)置ack數(shù)量了,按需設(shè)置,不然會有處理堆積的問題
            config.setNumAckers(0);
            
            //>0是集群用的,else里面是本機(jī)運(yùn)行
            if (args.length>0){
                config.setNumWorkers(Integer.parseInt(args[1]));
                config.setMaxSpoutPending(5000);
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            }else {
                String topologyName = Main.class.getSimpleName();
                StormTopology stormTopology = builder.createTopology();
                LocalCluster lCluster = new LocalCluster();
                lCluster.submitTopology(topologyName, config, stormTopology);
            }
        } catch (InvalidTopologyException e) {
            e.printStackTrace();
        } catch (AlreadyAliveException e) {
            e.printStackTrace();
        } catch (AuthorizationException e) {
            e.printStackTrace();
        }

    }
}

上面有一行注釋的,是按照BlotTest下面的注釋分組。里面的并行度具體我沒研究過,根據(jù)業(yè)務(wù)設(shè)定吧。
到此,簡單的一個拓?fù)渚屯瓿闪恕?br> 那么問題來了,如果storm一直處理,什么時候去存入數(shù)據(jù)庫等。這就涉及到storm的定時器
把上面的代碼稍微改一下

import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class Blot1Test extends BaseRichBolt {
    private Map conf;
    private TopologyContext context;
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    Map map = new HashMap();
    public void execute(Tuple tuple) {
        String line = tuple.getStringByField("lines");
        if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
            //接收到定時信號的時候,處理這里,其余時間走else
            savemaptodb();
            return;
        }else {
            map.put(line,line);
            return;
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

     //設(shè)置10秒發(fā)送一次tick心跳
    @SuppressWarnings("static-access")
    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
        return conf;
    }
}

上面這個getComponentConfiguration()就是實(shí)現(xiàn)了這個blot的定時,還有全局的定時器,在Main類的config加上

 config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//設(shè)置定時器,每五秒發(fā)送一次系統(tǒng)級別的

然后在每個blot的execute方法里面判斷是否觸發(fā)

tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)

這樣就實(shí)現(xiàn)了一個簡單是storm例子(說實(shí)話我沒有驗(yàn)證,都是手敲出來的。公司的代碼在內(nèi)網(wǎng),拿出來太麻煩),但是大體上是這樣的。

這個拓?fù)錄]有失敗機(jī)制,也不是從hdfs或者kafka讀取。自己去寫吧。遇到問題才能真正掌握。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容