Storm應(yīng)用實(shí)例--集成HBase

本文展示一個(gè)Storm的topology,該topology對(duì)給定的詞源進(jìn)行詞頻統(tǒng)計(jì),然后存入HBase,該實(shí)例不借助storm-hbase包,而是直接使用hbase client來(lái)完成對(duì)HBase的操作。

引言

由Twitter開源的、分布式實(shí)時(shí)計(jì)算系統(tǒng)Apache Storm,如今已被多家知名企業(yè)應(yīng)用于實(shí)時(shí)分析、流式計(jì)算、在線機(jī)器學(xué)習(xí)、分布式RPC調(diào)用、ETL等領(lǐng)域,甚至有看到“Storm之于實(shí)時(shí)計(jì)算,就像Hadoop之于數(shù)據(jù)批處理”這樣的評(píng)價(jià),是否言過(guò)其實(shí),這里暫且不論,但至少已經(jīng)看到業(yè)界對(duì)Storm在實(shí)時(shí)計(jì)算領(lǐng)域的肯定,加之其開源特性,必然會(huì)得到更廣泛的應(yīng)用。
在Storm的實(shí)際應(yīng)用中,在topology中將經(jīng)過(guò)處理的數(shù)據(jù)通過(guò)HBase進(jìn)行持久化,是一個(gè)常見(jiàn)的需求。Storm官方提供了storm-hbase,包含一些比較通用的API及其簡(jiǎn)單實(shí)現(xiàn),可以查看對(duì)應(yīng)的官方文檔來(lái)了解基本使用方法:storm-hbase。但如果你需要進(jìn)行一些更復(fù)雜的處理,或者希望對(duì)自己的代碼有更多的掌控,那么脫離storm-hbase,直接使用HBase的Java API來(lái)完成操作,將是一個(gè)不錯(cuò)的選擇。本文將展示的,就是一個(gè)在Storm的topology中直接使用HBase Java API操作HBase的簡(jiǎn)單示例。

零.示例簡(jiǎn)述

本項(xiàng)目數(shù)據(jù)源部分直接借用Storm詞頻統(tǒng)計(jì)的官方示例,在WordSpout.java中從靜態(tài)字符串?dāng)?shù)組中讀取單詞,在WordCounterBolt.java中統(tǒng)計(jì)單詞出現(xiàn)的次數(shù),最后在MyHBaseBolt.java中將單詞及其出現(xiàn)的次數(shù)寫入到HBase。

一.環(huán)境信息

示例的測(cè)試環(huán)境:

  • Java 8
  • Storm 1.0.1
  • HBase 1.2.2
  • Hadoop 2.6.4
  • Maven 3.3.3

二.創(chuàng)建項(xiàng)目

示例直接使用hbase client操作HBase,因此關(guān)鍵的依賴只有storm和hbase client,項(xiàng)目pom.xml:

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <storm.version>1.0.1</storm.version>
  <!-- 開發(fā)調(diào)試時(shí)配置為compile,topology打包時(shí)配置為provided -->
  <storm.scope>compile</storm.scope>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>${storm.version}</version>
        <scope>${storm.scope}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.2.2</version>
    </dependency>
</dependencies>

項(xiàng)目結(jié)構(gòu):

--src
   --main
       --java
          --bolt
              --MyHBaseBolt.java
              --WordCounterBolt.java
          --spout
              --WordSpout.java
          --HBaseTopology.java
       --resources
           --hbase-site.xml

其中hbase-site.xml直接使用HBase服務(wù)器上面的hbase-site.xml即可。本示例的HBase集群使用獨(dú)立的zookeeper集群,zk的端口使用了默認(rèn)端口,因此不需要在hbase-site.xml中顯式配置,詳細(xì)內(nèi)容見(jiàn)附錄。

三.詞頻統(tǒng)計(jì)

這部分直接借用一個(gè)Storm官方示例:WordSpout.java從靜態(tài)數(shù)組中隨機(jī)讀取單詞并向外發(fā)射,WordCounterBolt接收來(lái)自WordSpout的包含一個(gè)個(gè)單詞的tuple,對(duì)每個(gè)單詞出現(xiàn)的次數(shù)進(jìn)行統(tǒng)計(jì),然后將每個(gè)單詞及其對(duì)應(yīng)的計(jì)數(shù)向外發(fā)射。為快速進(jìn)入主題,這部分代碼放在附錄中。

四.HBase操作

在java中通過(guò)hbase client對(duì)hbase進(jìn)行讀寫大體有如下步驟:

  • 創(chuàng)建HBaseConfiguration對(duì)象,該對(duì)象可以讀取CLASSPATH下的hbase-site.xml文件的內(nèi)容。
    Configuration config = HBaseConfiguration.create();
  • 用前面的config對(duì)象為入?yún)?chuàng)建Connection對(duì)象來(lái)連接至目標(biāo)HBase集群。connection對(duì)象對(duì)資源消耗較大,應(yīng)該避免創(chuàng)建過(guò)多的實(shí)例。使用完畢后,調(diào)用connection的close()方法關(guān)閉連接,建議使用try/finally來(lái)確保連接的關(guān)閉。
    Connection connection = ConnectionFactory.createConnection(config);
  • 以指定的table名稱(應(yīng)該是已存在的)為入?yún)?chuàng)建Table對(duì)象來(lái)連接指定的表。使用完畢后,需要調(diào)用table的close()方法進(jìn)行關(guān)閉。與connection不同,table對(duì)象是輕量的,對(duì)table對(duì)象的創(chuàng)建,不需要像connection那樣小心,當(dāng)然,這并不是鼓勵(lì)你創(chuàng)建得越多越好。
    Table table = connection.getTable(TableName.valueOf("WordCount"));
  • 以指定的row key(可以是在HBase中還不存在的)為入?yún)?chuàng)建Put對(duì)象來(lái)持有要寫入的數(shù)據(jù)。
    Put p = new Put(Bytes.toBytes("key"));
  • 調(diào)用Put對(duì)象的addColumn方法,接受列族名稱(column family)、列名(column qualifier)和要寫入的值作為參數(shù)。可以多次調(diào)用該方法讓put對(duì)象持有一定數(shù)量的數(shù)據(jù)后,再一次性提交。
    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("words"), Bytes.toBytes("word"));
  • 以Put對(duì)象為入?yún)?,調(diào)用table的put方法來(lái)提交要寫入hbase的數(shù)據(jù)
  • 關(guān)閉table
  • 關(guān)閉connection

在Storm的bolt中進(jìn)行實(shí)際應(yīng)用:

public class MyHBaseBolt extends BaseBasicBolt {
    private Connection connection;
    private Table table;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        Configuration config = HBaseConfiguration.create();
        try {
            connection = ConnectionFactory.createConnection(config);
//示例都是對(duì)同一個(gè)table進(jìn)行操作,因此直接將Table對(duì)象的創(chuàng)建放在了prepare,在bolt執(zhí)行過(guò)程中可以直接重用。
            table = connection.getTable(TableName.valueOf("WordCount"));
        } catch (IOException e) {
            //do something to handle exception
        }
    }
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        //從tuple中獲取單詞
        String word = tuple.getString(0);
        //從tuple中獲取計(jì)數(shù),這里轉(zhuǎn)換為String只是為了示例運(yùn)行后存入hbase的計(jì)數(shù)值能夠直觀顯示。
        String count = tuple.getInteger(1).toString();
        try {
            //以各個(gè)單詞作為row key
            Put put = new Put(Bytes.toBytes(word));
            //將被計(jì)數(shù)的單詞寫入cf:words列
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("words"), Bytes.toBytes(word));
            //將單詞的計(jì)數(shù)寫入cf:counts列
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("counts"), Bytes.toBytes(count));
            table.put(put);
        } catch (IOException e) {
            //do something to handle exception
        }
    }
    @Override
    public void cleanup() {
        //關(guān)閉table
        try {
            if(table != null) table.close();
        } catch (Exception e){
            //do something to handle exception
        } finally {
            //在finally中關(guān)閉connection
            try {
                connection.close();
            } catch (IOException e) {
                //do something to handle exception
            }
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //示例中本bolt不向外發(fā)射數(shù)據(jù),所以沒(méi)有再做聲明
    }
}

雖然可能應(yīng)用場(chǎng)景相對(duì)較少,但還是附帶介紹一下從HBase讀取數(shù)據(jù):

  • 以指定的row key為入?yún)?chuàng)建Get對(duì)象
    Get get = new Get(Bytes.toBytes("key"));
  • 以Get實(shí)例為入?yún)⒄{(diào)用table的get方法來(lái)獲取結(jié)果集對(duì)象Result
    Result r = table.get(get);
  • 從結(jié)果集中獲取制定列的值
    byte[] value = r.getValue(Bytes.toBytes("cf"), Bytes.toBytes("words"));
  • 也可以使用scan來(lái)批量讀取,Scanner實(shí)現(xiàn)了Iterable,因此可以使用foreach來(lái)進(jìn)行遍歷:
Scan scan = new Scan();
//獲取指定列族所有列的數(shù)據(jù)
scan.addFamily(Bytes.toBytes("cf"));
ResultScanner scanner = table.getScanner(scan);
try {
    for (Result r : scanner) {...}
}finally{
    scanner.close();
    }

五.Topology

topology中唯一需要注意的是,在Windows測(cè)試該示例時(shí),需要配置hadoop.home.dir屬性,并確保將winutils.exe客戶端(示例中使用的版本(鏈接若失效請(qǐng)自助))放置在所配置的hadoop.home.dir目錄下(資料解釋:在hadoop 2.x版本的包中不再包含winutils.exe文件)。
HBaseTopology.java:

public class PersistentWordCount {
    private static final String WORD_SPOUT = "WORD_SPOUT";
    private static final String COUNT_BOLT = "COUNT_BOLT";
    private static final String HBASE_BOLT = "HBASE_BOLT";

    public static void main(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir","E:/BaiduYunDownload");

        Config config = new Config();

        WordSpout spout = new WordSpout();
        WordCounter bolt = new WordCounter();
        MyHBaseBolt hbase = new MyHBaseBolt();

        // wordSpout ==> countBolt ==> HBaseBolt
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(WORD_SPOUT, spout, 1);
        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
        builder.setBolt(HBASE_BOLT, hbase, 10).fieldsGrouping(COUNT_BOLT, new Fields("word"));

        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.killTopology("word");
            cluster.shutdown();
            System.exit(0);
        } else {
            config.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        }
}

如果編譯遇到類似:java.io.IOException: No FileSystem for scheme: hdfs這樣關(guān)于hadoop的問(wèn)題,可能需要添加hadoop相關(guān)依賴包,如:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.4</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.6.4</version>
</dependency>

六.總結(jié)

本文通過(guò)一個(gè)詞頻統(tǒng)計(jì)后通過(guò)HBase進(jìn)行結(jié)果持久化的topology示例,展示了如何在Storm的中直接使用HBase的java api來(lái)實(shí)現(xiàn)基本的讀寫操作,希望能為想自己完成Storm的HBase集成而不得其法的朋友提供一個(gè)入門指引。

附錄

  1. WordSpout.java:
public class WordSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private static final String[] MSGS = new String[]{
            "Storm", "HBase", "Integration", "example", "by ", "aloo", "in", "Aug",
    };

    private static final Random random = new Random();

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        String word = MSGS[random.nextInt(8)];
        collector.emit(new Values(word));
    }
}
  1. WordCounterBolt.java:
public class WordCounter extends BaseBasicBolt {
    private Map<String, Integer> _counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        int count;
        if(_counts.containsKey(word)){
            count = _counts.get(word);
        } else {
            count = 0;
        }
        count ++;
        _counts.put(word, count);
        collector.emit(new Values(word, count));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
  1. hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://xxx.xx.xx.xx:9000/hbase</value>
    </property>
    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/home/hadoop/hbase/storm/zookeeper</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>zknode1,zdnode2,zknode3</value>
</configuration>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容