JStorm實時計算框架學(xué)習(xí)

這是一個JStorm使用教程,不包含環(huán)境搭建教程,直接在公司現(xiàn)有集群上跑任務(wù),關(guān)于JStorm集群環(huán)境搭建,后續(xù)研究完會考慮額外寫一篇博客。
你如果想了解JStorm是什么,有多牛逼什么什么的,請看最后的參考博客鏈接,里面有各種版本的介紹,我就不在這里總結(jié)這種東西了,我相信這些東西你第一次接觸的時候會看,等學(xué)了JStorm之后也不會再去看這些東西了。。。

簡介

  • JStorm和MapReduce的一些對比


    compare-table.png

一些關(guān)鍵概念

  1. nimbus:主控節(jié)點運行Nimbus守護進程,類似于Hadoop中的ResourceManager,負(fù)責(zé)在集群中分發(fā)代碼,對節(jié)點分配任務(wù),并監(jiān)視主機故障。
  2. supervisor:每個工作節(jié)點運行Supervisor守護進程,負(fù)責(zé)監(jiān)聽工作節(jié)點上已經(jīng)分配的主機作業(yè),啟動和停止Nimbus已經(jīng)分配的工作進程,類似于Hadoop中的NodeManager。
    supervisor會定時從zookeeper獲取拓補信息topologies、任務(wù)分配信息assignments及各類心跳信息,以此為依據(jù)進行任務(wù)分配。
    在supervisor同步時,會根據(jù)新的任務(wù)分配情況來啟動新的worker或者關(guān)閉舊的worker并進行負(fù)載均衡。
  3. worker:Worker是具體處理Spout/Bolt邏輯的進程,根據(jù)提交的拓?fù)渲衏onf.setNumWorkers(3);定義分配每個拓?fù)鋵?yīng)的worker數(shù)量,Storm會在每個Worker上均勻分配任務(wù),一個Worker只能執(zhí)行一個topology,但是可以執(zhí)行其中的多個任務(wù)線程。
  4. task:任務(wù)是指Worker中每個Spout/Bolt線程,每個Spout和Bolt在集群中會執(zhí)行許多任務(wù),每個任務(wù)對應(yīng)一個線程執(zhí)行,可以通過TopologyBuilder類的setSpout()和setBolt()方法來設(shè)置每個Spout或者Bolt的并行度。
  5. Executor:Task接收到任務(wù)就是在Executor中執(zhí)行的,可以理解為執(zhí)行Task專門的一個線程。
  6. topology:Storm中Topology的概念類似于Hadoop中的MapReduce Job,是一個用來編排、容納一組計算邏輯組件(Spout、Bolt)的對象(Hadoop MapReduce中一個Job包含一組Map Task、Reduce Task),這一組計算組件可以按照DAG圖的方式編排起來(通過選擇Stream Groupings來控制數(shù)據(jù)流分發(fā)流向),從而組合成一個計算邏輯更加負(fù)責(zé)的對象,那就是Topology。一個Topology運行以后就不能停止,它會無限地運行下去,除非手動干預(yù)(顯式執(zhí)行bin/storm kill )或意外故障(如停機、整個Storm集群掛掉)讓它終止。
  7. spout:Storm中Spout是一個Topology的消息生產(chǎn)的源頭,Spout應(yīng)該是一個持續(xù)不斷生產(chǎn)消息的組件,例如,它可以是一個Socket Server在監(jiān)聽外部Client連接并發(fā)送消息,可以是一個消息隊列(MQ)的消費者、可以是用來接收Flume Agent的Sink所發(fā)送消息的服務(wù),等等。Spout生產(chǎn)的消息在Storm中被抽象為Tuple,在整個Topology的多個計算組件之間都是根據(jù)需要抽象構(gòu)建的Tuple消息來進行連接,從而形成流。
  8. bolt:Storm中消息的處理邏輯被封裝到Bolt組件中,任何處理邏輯都可以在Bolt里面執(zhí)行,處理過程和普通計算應(yīng)用程序沒什么區(qū)別,只是需要根據(jù)Storm的計算語義來合理設(shè)置一下組件之間消息流的聲明、分發(fā)、連接即可。Bolt可以接收來自一個或多個Spout的Tuple消息,也可以來自多個其它Bolt的Tuple消息,也可能是Spout和其它Bolt組合發(fā)送的Tuple消息。
  9. tuple:JStorm中信息傳輸?shù)膯挝?,Storm程序是無限執(zhí)行下去的,數(shù)據(jù)流是無止境的,但是每次驅(qū)動程序執(zhí)行的只是一個數(shù)據(jù)流單位,就是Tuple,Spout的一次nextTuple以及Bolt的一次execute的執(zhí)行操作的都是一個Tuple。Tuple只要是任意可序列化對象即可。

生命周期

Topology生命周期

  1. 上傳代碼并做校驗(/nimbus/inbox);
  2. 建立本地目錄(/stormdist/topology-id/);
  3. 建立zookeeper上的心跳目錄;
  4. 計算topology的工作量(parallelism hint),分配task-id并寫入zookeeper;
  5. 把task分配給supervisor執(zhí)行;
  6. 在supervisor中定時檢查是否有新的task,下載新代碼、刪除老代碼,剩下的工作交個小弟worker;
  7. 在worker中把task拿到,看里面有哪些spout/Bolt,然后計算需要給哪些task發(fā)消息并建立連接;
  8. 在nimbus將topology終止的時候會將zookeeper上的相關(guān)信息刪除;


    topology-lifecycle.png

Spout生命周期

提交時

  1. 構(gòu)造方法:初始化構(gòu)造參數(shù),其中包含的必須都是可序列化的
  2. getComponentConfiguration:獲取該類特殊的配置參數(shù),只和該組件相關(guān)的配置,通常return null
  3. declareOutputFields:獲取該組件會輸出的流、字段列表,其后續(xù)的其他組件訂閱相應(yīng)的流或者字段需要和這里對應(yīng),否則會出錯
  4. 將內(nèi)存中的該實例序列化為字節(jié)碼文件。

在Worker節(jié)點中執(zhí)行

  1. 將傳輸過來的字節(jié)碼文件反序列化為類實例
  2. open:初始化這個組件類實例,可以加載消息隊列消費端、JDBC鏈接池等非可序列化對象
  3. activate:該實例設(shè)置為活躍狀態(tài)(有數(shù)據(jù)流驅(qū)動時)調(diào)用,過段時間暫時沒有數(shù)據(jù)流驅(qū)動就會睡眠
  4. nextTuple:循環(huán)調(diào)用,可在這里從數(shù)據(jù)源獲取數(shù)據(jù)emit到下一個節(jié)點,JStorm就會自動循環(huán)調(diào)用執(zhí)行下去
  5. ack:往后emit的一個Tuple在acker節(jié)點察覺成功了,回調(diào)通知Spout
  6. fail:往后emit的一個Tuple在acker節(jié)點察覺失敗或者超時了,回調(diào)通知Spout
  7. deactivate:沒數(shù)據(jù)流驅(qū)動達到一段時間,進入睡眠前調(diào)用
  8. close:程序停止時調(diào)用,釋放資源

Bolt生命周期

提交時

  1. 構(gòu)造方法:初始化構(gòu)造參數(shù),其中包含的必須都是可序列化的
  2. getComponentConfiguration:獲取該類特殊的配置參數(shù),只和該組件相關(guān)的配置,通常return null
  3. declareOutputFields:獲取該組件會輸出的流、字段列表,其后續(xù)的其他組件訂閱相應(yīng)的流或者字段需要和這里對應(yīng),否則會出錯
  4. 將內(nèi)存中的該實例序列化為字節(jié)碼文件。

在Worker節(jié)點中執(zhí)行

  1. 將傳輸過來的字節(jié)碼文件反序列化為類實例
  2. prepare:初始化這個組件類實例,可以加載配置,數(shù)據(jù)處理類初始化,數(shù)據(jù)輸出對象初始化
  3. execute:循環(huán)調(diào)用,可在這里從上個節(jié)點獲取Tuple,進行相應(yīng)處理之后emit到下一個節(jié)點,JStorm就會自動循環(huán)調(diào)用執(zhí)行下去
  4. cleanup:程序停止時調(diào)用,釋放資源

數(shù)據(jù)流向控制

jstorm-grouping.png
  1. ShuffleGrouping:對符合條件的目標(biāo)Worker,其中可能的多個Task,隨機分配Task來接收和處理該Tuple
  2. FieldsGrouping:會按照指定的field值進行分配,可以保證相同field對應(yīng)值的Tuple分配到相同一個Task中執(zhí)行 —— 可以想象成拿指定的field的值hash取模決定哪個Task(具體算法沒研究)
  3. 除了這兩個其他暫時沒用到,也感覺剩下的比較用不到,等用了再更
  4. 具體的流的聚合和分發(fā),參考這篇博客,例子很詳細(xì):JStorm流的匯聚和分發(fā)

數(shù)據(jù)流傳輸過程

  1. Spout中的數(shù)據(jù)源取出一份數(shù)據(jù)(無限循環(huán)取出),作為一個Tuple,emit到下一個節(jié)點
  2. 根據(jù)Spout中declareOutputFields定義的字段和流,查閱后繼訂閱該節(jié)點或者流的Bolt,Tuple會被發(fā)送到每個訂閱節(jié)點或者流的后繼節(jié)點當(dāng)中
  3. 后繼訂閱的Bolt節(jié)點接收到該Tuple,使用tuple.getValueByField通過上一節(jié)點declareOutputFields的字段名獲取相應(yīng)的字段值,也可以根據(jù)fields的聲明順序使用tuple.getValue通過下標(biāo)獲取相應(yīng)的值
  4. 拿到相應(yīng)的數(shù)據(jù)之后,進行相關(guān)邏輯處理,之后emit到下一個節(jié)點當(dāng)中,以此類推,直到最終節(jié)點將數(shù)據(jù)輸出到mysql、ES、HDFS等存儲系統(tǒng)當(dāng)中
  5. emit時可以指定相應(yīng)的streamId來指定當(dāng)前的數(shù)據(jù)要傳輸?shù)降哪膫€streamId當(dāng)中(該組件的declareOutputFields需要聲明所需的所有streamId),在Topology構(gòu)建時后繼節(jié)點指定該streamId來訂閱相應(yīng)的數(shù)據(jù)。

編程例子講解

  • 這個例子是一個單詞計數(shù)程序,通過一組字符串?dāng)?shù)組中隨機獲取一個檔次作為數(shù)據(jù)源往后輸出,在后續(xù)節(jié)點統(tǒng)計各個單詞被獲取的總次數(shù)。
  • 包括RandomSentenceSpout、SplitBolt、CountBolt三個節(jié)點,各個節(jié)點并行度都為1,是一個最簡單的單條鏈?zhǔn)降耐負(fù)?,如?/p>

    jstorm-example.jpg

RandomSentenceSpout

  • 表示數(shù)據(jù)源,這里用從數(shù)組隨機獲取一個元素作為模擬數(shù)據(jù)源獲取,日常開發(fā)通常是從MQ中獲取相應(yīng)數(shù)據(jù)進行數(shù)據(jù)流驅(qū)動。
/**
 * RandomSentenceSpout實現(xiàn)了IRichSpout接口
 * Spout需要實現(xiàn)的接口可以是:
 *    1,IRichSpout:最基本的Spout,繼承自ISpout, IComponent,沒有任何特殊方法(一般用這個)
 *    2,IControlSpout:繼承自IComponent,包括open,close,activate,deactivate,nextTuple,ack(Object msgId),fail等方法
 */
public class RandomSentenceSpout implements IRichSpout {

    private static final long serialVersionUID = 4058847280819269954L;
    private static final Logger logger = Logger.getLogger(RandomSentenceSpout.class);
    //可以理解為JStorm的數(shù)據(jù)傳輸管道,通過這個對象將這個組件的數(shù)據(jù)傳輸?shù)较乱粋€組件當(dāng)中
    private SpoutOutputCollector _collector;
    //隨機生成對象
    private Random _rand;
    private String component;

    /**
     * Spout初始化的時候調(diào)用
     */
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        _rand = new Random();
        component = context.getThisComponentId();
    }

    /**
     * 系統(tǒng)框架會不斷調(diào)用
     */
    public void nextTuple() {
        //模擬數(shù)據(jù)源
        String[] sentences = new String[]{"Hello world! This is my first programme of JStorm",
                "Hello JStorm,Nice to meet you!", "Hi JStorm, do you have a really good proformance",
                "Goodbye JStorm,see you tomorrow"};
        //隨機取出字符串
        String sentence = sentences[_rand.nextInt(sentences.length)];
        //將得到的字符串輸出到下一個組件
        //!?。∵@里Values中值填充順序要和下面declareOutputFields中字段聲明順序一致
        _collector.emit(new Values(sentence), Time.currentTimeSecs());
        Utils.sleep(1000);
    }

    @Override
    public void ack(Object arg0) {
        logger.debug("ACK!");
    }

    public void activate() {
        logger.debug("ACTIVE!");
    }

    public void close() {

    }

    public void deactivate() {

    }

    public void fail(Object arg0) {
        logger.debug("FAILED!");
    }

    /**
     * 聲明框架有哪些輸出的字段
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //下一個組件通過word這個關(guān)鍵字拿到這個組件往后輸出的單詞sentence
        declarer.declare(new Fields("word"));
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

SplitBolt

  • 將獲取的字符串通過空白符分割,并轉(zhuǎn)為小寫之后輸出到下一個節(jié)點。
/**
 * IBasicBolt:繼承自IComponent,包括prepare,execut,cleanup等方法
 */
public class SplitBolt extends BaseBasicBolt {

    private static final long serialVersionUID = 7104767103420386784L;
    private static final Logger logger = Logger.getLogger(SplitBolt.class);
    private String component;

    /**
     * cleanup方法在bolt被關(guān)閉的時候調(diào)用, 它應(yīng)該清理所有被打開的資源。(基本只能在local mode使用)
     * 但是集群不保證這個方法一定會被執(zhí)行。比如執(zhí)行task的機器down掉了,那么根本就沒有辦法來調(diào)用那個方法。
     * cleanup設(shè)計的時候是被用來在local mode的時候才被調(diào)用(也就是說在一個進程里面模擬整個storm集群),
     * 并且你想在關(guān)閉一些topology的時候避免資源泄漏。
     * (非 Javadoc)
     * @see backtype.storm.topology.base.BaseBasicBolt#cleanup()
     */
    @Override
    public void cleanup() {

    }

    //接收消息之后被調(diào)用的方法
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        //以下兩個方式獲取前驅(qū)節(jié)點發(fā)送過來的sentence,一個根據(jù)fieldName,一個根據(jù)字段聲明順序
//        String sentence = input.getValueByField("word");
        String sentence = input.getString(0);
        String[] words = sentence.split("[,|\\s+]");
        for (String word : words) {
            word = word.trim();
            //將非空單詞輸出到下一個節(jié)點
            if (!word.isEmpty()) {
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
    }
    /**
     * prepare方法在worker初始化task的時候調(diào)用.
     *
     * prepare方法提供給bolt一個Outputcollector用來發(fā)射tuple。
     * Bolt可以在任何時候發(fā)射tuple — 在prepare, execute或者cleanup方法里面, 或者甚至在另一個線程里面異步發(fā)射。
     * 這里prepare方法只是簡單地把OutputCollector作為一個類字段保存下來給后面execute方法 使用。
     */
    @Override
    public void prepare(Map stromConf, TopologyContext context) {
        component = context.getThisComponentId();
    }

    /**
     * declearOutputFields方法僅在有新的topology提交到服務(wù)器,
     * 用來決定輸出內(nèi)容流的格式(相當(dāng)于定義spout/bolt之間傳輸stream的name:value格式),
     * 在topology執(zhí)行的過程中并不會被調(diào)用.
     * (非 Javadoc)
     * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

CountBolt

  • 這個組件接收到的每個值都是單個單詞,通過一個內(nèi)存Map統(tǒng)計各個單詞總數(shù)
  • 后臺設(shè)置一個異步線程10s一次輸出當(dāng)前Map中的各個單詞總數(shù)
  • 日常開發(fā)通常在這個終端節(jié)點將實時計算得到的結(jié)果輸出到HDFS、mysql、HBase、ElasticSearch等存儲系統(tǒng)當(dāng)中
public class CountBolt extends BaseBasicBolt {
    private Integer id;
    private String name;
    private Map<String, Integer> counters;
    private String component;
    private static final Logger LOG = Logger.getLogger(CountBolt.class);
    //異步輸出結(jié)果集的子線程
    private AsyncLoopThread statThread;

    /**
     * On create
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counters = new HashMap<String, Integer>();
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
        //異步循環(huán)輸出結(jié)果集
        this.statThread = new AsyncLoopThread(new statRunnable());

        LOG.info(stormConf.get("abc") + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        component = context.getThisComponentId();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
        // declarer.declareStream("coord-"+"word-counter", new Fields("epoch","ebagNum"));
        // LOG.info("set stream coord-"+component);
    }

    //接收消息之后被調(diào)用的方法
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
//        String str = input.getString(0);
        String str = input.getStringByField("word");
        if (!counters.containsKey(str)) {
            //單詞計數(shù)
            counters.put(str, 1);
        } else {
            //單詞計數(shù)
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
    }

    /**
     * 異步輸出結(jié)果集的死循環(huán)子線程
     */
    class statRunnable extends RunnableCallback {

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {

                }
                LOG.info("\n-- Word Counter [" + name + "-" + id + "] --");
                for (Map.Entry<String, Integer> entry : counters.entrySet()) {
                    LOG.info(entry.getKey() + ": " + entry.getValue());
                }
                LOG.info("");
            }
        }
    }
}

WordCountTopology主入口,拓?fù)錁?gòu)建

  • 這里通過setSpout和setBolt將上面的三個節(jié)點連接成線 —— 即最開始說明的鏈?zhǔn)酵負(fù)鋱D
  • JStorm提交執(zhí)行相關(guān)執(zhí)行參數(shù)統(tǒng)一寫入一個Properties或Yaml配置文件中,命令行執(zhí)行第一個參數(shù)是該配置文件的路徑
public class WordCountTopology {
    private static Logger LOG = LoggerFactory.getLogger(WordCountTopology.class);

    //裝載配置文件配置參數(shù)
    private static Map conf = new HashMap<Object, Object>();

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            System.err.println("Please input configuration file");
            System.exit(-1);
        }
        //加載配置文件配置到內(nèi)存
        LoadConf(args[0]);
        //構(gòu)建JStorm拓?fù)?        TopologyBuilder builder = setupBuilder();
        System.out.println("Topology準(zhǔn)備提交");
        //提交任務(wù)到集群
        submitTopology(builder);
        System.out.println("Topology提交完成");
    }

    //!!!!這里通過setSpout和setBolt設(shè)置各個節(jié)點之間的連接關(guān)系,
    // 是這里把所有各自獨立的節(jié)點用線連接起來,構(gòu)建成一張具體的任務(wù)執(zhí)行拓?fù)鋱D
    private static TopologyBuilder setupBuilder() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /*
         * 設(shè)置spout和bolt,完整參數(shù)為
         * 1,spout的id(即name)
         * 2,spout對象
         * 3,executor數(shù)量即并發(fā)數(shù),也就是設(shè)置多少個executor來執(zhí)行spout/bolt(此項沒有默認(rèn)null)
         */
        //setSpout,聲明Spout名稱Id為sentence-spout,并行度1
        builder.setSpout("sentence-spout", new RandomSentenceSpout(), 1);
        //setBolt:SplitBolt的grouping策略是上層隨機分發(fā),CountBolt的grouping策略是按照上層字段分發(fā)
        //如果想要從多個Bolt獲取數(shù)據(jù),可以繼續(xù)設(shè)置grouping
        //聲明Bolt名稱Id為split-bolt,并行度1
        builder.setBolt("split-bolt", new SplitBolt(), 1)
                //設(shè)置該Bolt的數(shù)據(jù)源為sentence-spout的輸出
                .shuffleGrouping("sentence-spout");
        //聲明Bolt名稱Id為count-bolt,并行度1
        builder.setBolt("count-bolt", new CountBolt(), 1)
                //設(shè)置該Bolt的數(shù)據(jù)源為sentence-spout和split-bolt的輸出
                //fieldsGrouping保證相同word對應(yīng)的值發(fā)送到同一個Task節(jié)點,這是單詞計數(shù)業(yè)務(wù)需要
                .fieldsGrouping("split-bolt", new Fields("word"))
                .fieldsGrouping("sentence-spout", new Fields("word"));
        return builder;
    }

    //提交任務(wù)到JStorm集群
    private static void submitTopology(TopologyBuilder builder) {
        try {
            if (local_mode(conf)) {//本地模式,需要有本地JStorm環(huán)境支持
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology(
                        String.valueOf(conf.get("topology.name")), conf,
                        builder.createTopology());

                Thread.sleep(200000);
                cluster.shutdown();
            } else {
                StormSubmitter.submitTopology(
                        String.valueOf(conf.get("topology.name")), conf,
                        builder.createTopology());
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e.getCause());
        }
    }

    //加載Properties配置文件
    private static void LoadProperty(String prop) {
        Properties properties = new Properties();
        try {
            InputStream stream = new FileInputStream(prop);
            properties.load(stream);
        } catch (FileNotFoundException e) {
            System.out.println("No such file " + prop);
        } catch (Exception e1) {
            e1.printStackTrace();

            return;
        }
        conf.putAll(properties);
    }

    //加載Yaml配置文件
    private static void LoadYaml(String confPath) {
        Yaml yaml = new Yaml();
        try {
            InputStream stream = new FileInputStream(confPath);
            conf = (Map) yaml.load(stream);
            if (conf == null || conf.isEmpty() == true) {
                throw new RuntimeException("Failed to read config file");
            }
        } catch (FileNotFoundException e) {
            System.out.println("No such file " + confPath);
            throw new RuntimeException("No config file");
        } catch (Exception e1) {
            e1.printStackTrace();
            throw new RuntimeException("Failed to read config file");
        }
    }

    //根據(jù)后綴名選擇加載配置文件方案
    private static void LoadConf(String arg) {
        if (arg.endsWith("yaml")) {
            LoadYaml(arg);
        } else {
            LoadProperty(arg);
        }
    }

    public static boolean local_mode(Map conf) {
        String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
        if (mode != null) {
            if (mode.equals("local")) {
                return true;
            }
        }
        return false;
    }
}

批量用法

基本的用法是每次處理一個tuple,但是這種效率比較低,很多情況下是可以批量獲取消息然后一起處理,批量用法對這種方式提供了支持。打開代碼可以很明顯地發(fā)現(xiàn)jstorm和storm的有著不小的區(qū)別:

// storm 中的定義
public interface IBatchSpout extends Serializable {
    void open(Map conf, TopologyContext context);
    void emitBatch(long batchId, TridentCollector collector);// 批次發(fā)射tuple
    void ack(long batchId); // 成功處理批次
    void close();
    Map getComponentConfiguration();
    Fields getOutputFields();
}
// jstorm中的定義
public interface IBatchSpout extends IBasicBolt, ICommitter, Serializable {
}

另外如果用批次的話就需要改用BatchTopologyBuilder來構(gòu)建拓?fù)浣Y(jié)構(gòu),在IBatchSpout中主要實現(xiàn)的接口如下:

  1. execute:雖然和IBolt中名字、參數(shù)一致,但是增加了一些默認(rèn)邏輯
  • 入?yún)⒌膇nput.getValue(0)表示批次(BatchId)。
  • 發(fā)送消息時collector.emit(new Values(batchId, value)),發(fā)送的列表第一個字段表示批次(BatchId)。
  1. commit:批次成功時調(diào)用,常見的是修改offset。
  2. revert:批次失敗時調(diào)用,可以在這里根據(jù)offset取出批次數(shù)據(jù)進行重試。

Ack機制

  • 為保證無數(shù)據(jù)丟失,Storm/JStorm使用了非常漂亮的可靠性處理機制,如圖當(dāng)定義Topology時指定Acker,JStorm除了Topology本身任務(wù)外,還會啟動一組稱為Acker的特殊任務(wù),負(fù)責(zé)跟蹤Topolgogy DAG中的每個消息。每當(dāng)發(fā)現(xiàn)一個DAG被成功處理完成,Acker就向創(chuàng)建根消息的Spout任務(wù)發(fā)送一個Ack信號。Topology中Acker任務(wù)的并行度默認(rèn)parallelism hint=1,當(dāng)系統(tǒng)中有大量的消息時,應(yīng)該適當(dāng)提高Acker任務(wù)的并行度。

  • Acker按照Tuple Tree的方式跟蹤消息。當(dāng)Spout發(fā)送一個消息的時候,它就通知對應(yīng)的Acker一個新的根消息產(chǎn)生了,這時Acker就會創(chuàng)建一個新的Tuple Tree。當(dāng)Acker發(fā)現(xiàn)這棵樹被完全處理之后,他就會通知對應(yīng)的Spout任務(wù)。

  • Acker任務(wù)保存了數(shù)據(jù)結(jié)構(gòu)Map<MessageID,Map< TaskID, Value>>,其中MessageID是Spout根消息ID,TaskID是Spout任務(wù)ID,Value表示一個64bit的長整型數(shù)字,是樹中所有消息的隨機ID的異或結(jié)果。通過TaskID,Acker知道當(dāng)消息樹處理完成后通知哪個Spout任務(wù),通過MessageID,Acker知道屬于Spout任務(wù)的哪個消息被成功處理完成。Value表示了整棵樹的的狀態(tài),無論這棵樹多大,只需要這個固定大小的數(shù)字就可以跟蹤整棵樹。當(dāng)消息被創(chuàng)建和被應(yīng)答的時候都會有相同的MessageID發(fā)送過來做異或。當(dāng)Acker發(fā)現(xiàn)一棵樹的Value值為0的時候,表明這棵樹已經(jīng)被成功處理完成。

  • 舉例說明具體流程,以下為拓?fù)洌?/p>

    ack-example.jpg
  • Acker數(shù)據(jù)的變化過程:(算法)

Step1:A發(fā)送T0給B后:
產(chǎn)生一個隨機數(shù)r0,樹種存R0:R0=r0
<id0,<taskA,R0>>
# ---------
Step2.B接收到T0并成功處理后向C發(fā)送T1,向D發(fā)送T2:
接收到上級傳過來的R0,自己傳給兩個下家,產(chǎn)生兩個隨機數(shù)代表下家存入樹中:R1=R0^r1^r2=r0^r1^r2
<id0,<taskA,R0^R1>>
=<id0,<taskA,r0^r0^r1^r2>>
=<id0,<taskA,r1^r2>>
# ---------
Step3.C接收到T1并成功處理后:
接收到上家傳過來的r1,沒有下家:R2=r1
<id0,<taskA,r1^r2^R2>>
=<id0,<taskA,r1^r2^r1>>
=<id0,<taskA,r2>>
# ---------
Step4.D接收到T2并成功處理后:
接收到上家傳過來的r2,沒有下家:R3=r2
<id0,<taskA,r2^R3>>
=<id0,<taskA,r2^r2>>
=<id0,<taskA,0>>

當(dāng)結(jié)果為0時Acker可以通知taskA根消息id0的消息樹已被成功處理完成,調(diào)用Spout的ack方法通知,若超時發(fā)現(xiàn)消息樹中值不為0,調(diào)用Spout中的fail。

  • 整體節(jié)點間通信:


    jstorm-ack.png
  • 需要指出的是,Acker并不是必須的,當(dāng)實際業(yè)務(wù)可以容忍數(shù)據(jù)丟失情況下可以不用Acker,對數(shù)據(jù)丟失零容忍的業(yè)務(wù)必須打開Acker,另外當(dāng)系統(tǒng)的消息規(guī)模較大是可適當(dāng)增加Acker的并行度。

JStorm事務(wù)

事務(wù)拓?fù)洳⒉皇切碌臇|西,只是在原始的ISpout、IBolt上做了一層封裝。在事務(wù)拓?fù)渲幸圆⑿校╬rocessing)和順序(commiting)混合的方式來完成任務(wù),使用Transactional Topology可以保證每個消息只會成功處理一次。不過需要注意的是,在Spout需要保證能夠根據(jù)BatchId進行多次重試,在這里有一個基本的例子,這里有一個不錯的講解。

Trident

這次一種更高級的抽象(甚至不需要知道底層是怎么map-reduce的),所面向的不再是spout和bolt,而是stream。主要涉及到下面幾種接口:
  1. 在本地完成的操作
  • Function:自定義操作。
  • Filters:自定義過濾。
  • partitionAggregate:對同批次的數(shù)據(jù)進行l(wèi)ocal combiner操作。
  • project:只保留stream中指定的field。
  • stateQuery、partitionPersist:查詢和持久化。
  1. 決定Tuple如何分發(fā)到下一個處理環(huán)節(jié)
  • shuffle:隨機。
  • broadcast:廣播。
  • partitionBy:以某一個特定的field進行hash,分到某一個分區(qū),這樣該field位置相同的都會放到同一個分區(qū)。
  • global:所有tuple發(fā)到指定的分區(qū)。
  • batchGlobal:同一批的tuple被放到相同的分區(qū)(不同批次不同分區(qū))。
  • partition:用戶自定義的分區(qū)策略。
  1. 不同partition處理結(jié)果的匯聚操作
  • aggregate:只針對同一批次的數(shù)據(jù)。
  • persistentAggregate:針對所有批次進行匯聚,并將中間狀態(tài)持久化。
  1. 對stream中的tuple進行重新分組,后續(xù)的操作將會對每一個分組獨立進行(類似sql中的group by)
  • groupBy
  1. 將多個Stream融合成一個
  • merge:多個流進行簡單的合并。
  • join:多個流按照某個KEY進行UNION操作(只能針對同一個批次的數(shù)據(jù))。

這里有一個jstorm中使用Trident的簡單例子。

故障恢復(fù)

  1. 節(jié)點故障

    • Nimbus故障。Nimbus本身無狀態(tài),所以Nimbus故障不會影響正在正常運行任務(wù),另外Nimbus HA保證Nimbus故障后可以及時被備份Nimbus接管。
    • Supervisors節(jié)點故障。Supervisor故障后,Nimbus會將故障節(jié)點上的任務(wù)遷移到其他可用節(jié)點上繼續(xù)運行,但是Supervisor故障需要外部監(jiān)控并及時手動重啟。
    • Worker故障。Worker健康狀況監(jiān)控由Supervisor負(fù)責(zé),當(dāng)Woker出現(xiàn)故障時,Supervisor會及時在本機重試重啟。
    • Zookeeper節(jié)點故障。Zookeeper本身具有很好的故障恢復(fù)機制,能保證至少半數(shù)以上節(jié)點在線就可正常運行,及時修復(fù)故障節(jié)點即可。
  2. 任務(wù)失敗

    • Spout失敗。消息不能被及時被Pull到系統(tǒng)中,造成外部大量消息不能被及時處理,而外部大量計算資源空閑。
    • Bolt失敗。消息不能被處理,Acker持有的所有與該Bolt相關(guān)的消息反饋值都不能回歸到0,最后因為超時最終Spout的fail將被調(diào)用。
    • Acker失敗。Acker持有的所有反饋信息不管成功與否都不能及時反饋到Spout,最后同樣因為超時Spout的fail將被調(diào)用。
    • 任務(wù)失敗后,需要Nimbus及時監(jiān)控到并重新分配失敗任務(wù)。

JStorm使用感受

  1. JStorm各個節(jié)點之間是松耦合的,各個節(jié)點之間的通信只和Tuple數(shù)據(jù)流結(jié)構(gòu)相關(guān),其他處理邏輯各自獨立
  2. JStorm不處理數(shù)據(jù)的存儲服務(wù),計算結(jié)果自行存儲到HDFS、HBase、Mysql、ElasticSearch等存儲系統(tǒng)當(dāng)中
  3. JStorm的拓?fù)涔?jié)點設(shè)計中,應(yīng)該把延時操作分發(fā)到多個節(jié)點當(dāng)中執(zhí)行,每個節(jié)點只處理各自單一的功能邏輯,如上面的例子,我把單詞分割和單詞計數(shù)分成兩個Bolt來實現(xiàn),這才是流式計算的特點,讓數(shù)據(jù)流動起來,而不是在一個節(jié)點完成所有工作,也保證了程序可用性更強
  4. JStorm各個節(jié)點內(nèi)部的處理邏輯非常開放,想怎么處理都行,只要最終往后輸出相應(yīng)的Tuple即可,編程時非常自由,不像MapReduce,很多操作都在MR模型中得到限制

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容