
本文展示一個(gè)Storm的topology,該topology對(duì)給定的詞源進(jìn)行詞頻統(tǒng)計(jì),然后存入HBase,該實(shí)例不借助storm-hbase包,而是直接使用hbase client來(lái)完成對(duì)HBase的操作。
引言
由Twitter開源的、分布式實(shí)時(shí)計(jì)算系統(tǒng)Apache Storm,如今已被多家知名企業(yè)應(yīng)用于實(shí)時(shí)分析、流式計(jì)算、在線機(jī)器學(xué)習(xí)、分布式RPC調(diào)用、ETL等領(lǐng)域,甚至有看到“Storm之于實(shí)時(shí)計(jì)算,就像Hadoop之于數(shù)據(jù)批處理”這樣的評(píng)價(jià),是否言過(guò)其實(shí),這里暫且不論,但至少已經(jīng)看到業(yè)界對(duì)Storm在實(shí)時(shí)計(jì)算領(lǐng)域的肯定,加之其開源特性,必然會(huì)得到更廣泛的應(yīng)用。
在Storm的實(shí)際應(yīng)用中,在topology中將經(jīng)過(guò)處理的數(shù)據(jù)通過(guò)HBase進(jìn)行持久化,是一個(gè)常見(jiàn)的需求。Storm官方提供了storm-hbase,包含一些比較通用的API及其簡(jiǎn)單實(shí)現(xiàn),可以查看對(duì)應(yīng)的官方文檔來(lái)了解基本使用方法:storm-hbase。但如果你需要進(jìn)行一些更復(fù)雜的處理,或者希望對(duì)自己的代碼有更多的掌控,那么脫離storm-hbase,直接使用HBase的Java API來(lái)完成操作,將是一個(gè)不錯(cuò)的選擇。本文將展示的,就是一個(gè)在Storm的topology中直接使用HBase Java API操作HBase的簡(jiǎn)單示例。
零.示例簡(jiǎn)述
本項(xiàng)目數(shù)據(jù)源部分直接借用Storm詞頻統(tǒng)計(jì)的官方示例,在WordSpout.java中從靜態(tài)字符串?dāng)?shù)組中讀取單詞,在WordCounterBolt.java中統(tǒng)計(jì)單詞出現(xiàn)的次數(shù),最后在MyHBaseBolt.java中將單詞及其出現(xiàn)的次數(shù)寫入到HBase。
一.環(huán)境信息
示例的測(cè)試環(huán)境:
- Java 8
- Storm 1.0.1
- HBase 1.2.2
- Hadoop 2.6.4
- Maven 3.3.3
二.創(chuàng)建項(xiàng)目
示例直接使用hbase client操作HBase,因此關(guān)鍵的依賴只有storm和hbase client,項(xiàng)目pom.xml:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<storm.version>1.0.1</storm.version>
<!-- 開發(fā)調(diào)試時(shí)配置為compile,topology打包時(shí)配置為provided -->
<storm.scope>compile</storm.scope>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<scope>${storm.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.2</version>
</dependency>
</dependencies>
項(xiàng)目結(jié)構(gòu):
--src
--main
--java
--bolt
--MyHBaseBolt.java
--WordCounterBolt.java
--spout
--WordSpout.java
--HBaseTopology.java
--resources
--hbase-site.xml
其中hbase-site.xml直接使用HBase服務(wù)器上面的hbase-site.xml即可。本示例的HBase集群使用獨(dú)立的zookeeper集群,zk的端口使用了默認(rèn)端口,因此不需要在hbase-site.xml中顯式配置,詳細(xì)內(nèi)容見(jiàn)附錄。
三.詞頻統(tǒng)計(jì)
這部分直接借用一個(gè)Storm官方示例:WordSpout.java從靜態(tài)數(shù)組中隨機(jī)讀取單詞并向外發(fā)射,WordCounterBolt接收來(lái)自WordSpout的包含一個(gè)個(gè)單詞的tuple,對(duì)每個(gè)單詞出現(xiàn)的次數(shù)進(jìn)行統(tǒng)計(jì),然后將每個(gè)單詞及其對(duì)應(yīng)的計(jì)數(shù)向外發(fā)射。為快速進(jìn)入主題,這部分代碼放在附錄中。
四.HBase操作
在java中通過(guò)hbase client對(duì)hbase進(jìn)行讀寫大體有如下步驟:
- 創(chuàng)建HBaseConfiguration對(duì)象,該對(duì)象可以讀取CLASSPATH下的hbase-site.xml文件的內(nèi)容。
Configuration config = HBaseConfiguration.create(); - 用前面的config對(duì)象為入?yún)?chuàng)建Connection對(duì)象來(lái)連接至目標(biāo)HBase集群。connection對(duì)象對(duì)資源消耗較大,應(yīng)該避免創(chuàng)建過(guò)多的實(shí)例。使用完畢后,調(diào)用connection的close()方法關(guān)閉連接,建議使用try/finally來(lái)確保連接的關(guān)閉。
Connection connection = ConnectionFactory.createConnection(config); - 以指定的table名稱(應(yīng)該是已存在的)為入?yún)?chuàng)建Table對(duì)象來(lái)連接指定的表。使用完畢后,需要調(diào)用table的close()方法進(jìn)行關(guān)閉。與connection不同,table對(duì)象是輕量的,對(duì)table對(duì)象的創(chuàng)建,不需要像connection那樣小心,當(dāng)然,這并不是鼓勵(lì)你創(chuàng)建得越多越好。
Table table = connection.getTable(TableName.valueOf("WordCount")); - 以指定的row key(可以是在HBase中還不存在的)為入?yún)?chuàng)建Put對(duì)象來(lái)持有要寫入的數(shù)據(jù)。
Put p = new Put(Bytes.toBytes("key")); - 調(diào)用Put對(duì)象的addColumn方法,接受列族名稱(column family)、列名(column qualifier)和要寫入的值作為參數(shù)。可以多次調(diào)用該方法讓put對(duì)象持有一定數(shù)量的數(shù)據(jù)后,再一次性提交。
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("words"), Bytes.toBytes("word")); - 以Put對(duì)象為入?yún)?,調(diào)用table的put方法來(lái)提交要寫入hbase的數(shù)據(jù)
- 關(guān)閉table
- 關(guān)閉connection
在Storm的bolt中進(jìn)行實(shí)際應(yīng)用:
public class MyHBaseBolt extends BaseBasicBolt {
private Connection connection;
private Table table;
@Override
public void prepare(Map stormConf, TopologyContext context) {
Configuration config = HBaseConfiguration.create();
try {
connection = ConnectionFactory.createConnection(config);
//示例都是對(duì)同一個(gè)table進(jìn)行操作,因此直接將Table對(duì)象的創(chuàng)建放在了prepare,在bolt執(zhí)行過(guò)程中可以直接重用。
table = connection.getTable(TableName.valueOf("WordCount"));
} catch (IOException e) {
//do something to handle exception
}
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//從tuple中獲取單詞
String word = tuple.getString(0);
//從tuple中獲取計(jì)數(shù),這里轉(zhuǎn)換為String只是為了示例運(yùn)行后存入hbase的計(jì)數(shù)值能夠直觀顯示。
String count = tuple.getInteger(1).toString();
try {
//以各個(gè)單詞作為row key
Put put = new Put(Bytes.toBytes(word));
//將被計(jì)數(shù)的單詞寫入cf:words列
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("words"), Bytes.toBytes(word));
//將單詞的計(jì)數(shù)寫入cf:counts列
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("counts"), Bytes.toBytes(count));
table.put(put);
} catch (IOException e) {
//do something to handle exception
}
}
@Override
public void cleanup() {
//關(guān)閉table
try {
if(table != null) table.close();
} catch (Exception e){
//do something to handle exception
} finally {
//在finally中關(guān)閉connection
try {
connection.close();
} catch (IOException e) {
//do something to handle exception
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//示例中本bolt不向外發(fā)射數(shù)據(jù),所以沒(méi)有再做聲明
}
}
雖然可能應(yīng)用場(chǎng)景相對(duì)較少,但還是附帶介紹一下從HBase讀取數(shù)據(jù):
- 以指定的row key為入?yún)?chuàng)建Get對(duì)象
Get get = new Get(Bytes.toBytes("key")); - 以Get實(shí)例為入?yún)⒄{(diào)用table的get方法來(lái)獲取結(jié)果集對(duì)象Result
Result r = table.get(get); - 從結(jié)果集中獲取制定列的值
byte[] value = r.getValue(Bytes.toBytes("cf"), Bytes.toBytes("words")); - 也可以使用scan來(lái)批量讀取,Scanner實(shí)現(xiàn)了Iterable,因此可以使用foreach來(lái)進(jìn)行遍歷:
Scan scan = new Scan();
//獲取指定列族所有列的數(shù)據(jù)
scan.addFamily(Bytes.toBytes("cf"));
ResultScanner scanner = table.getScanner(scan);
try {
for (Result r : scanner) {...}
}finally{
scanner.close();
}
五.Topology
topology中唯一需要注意的是,在Windows測(cè)試該示例時(shí),需要配置hadoop.home.dir屬性,并確保將winutils.exe客戶端(示例中使用的版本(鏈接若失效請(qǐng)自助))放置在所配置的hadoop.home.dir目錄下(資料解釋:在hadoop 2.x版本的包中不再包含winutils.exe文件)。
HBaseTopology.java:
public class PersistentWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
private static final String COUNT_BOLT = "COUNT_BOLT";
private static final String HBASE_BOLT = "HBASE_BOLT";
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir","E:/BaiduYunDownload");
Config config = new Config();
WordSpout spout = new WordSpout();
WordCounter bolt = new WordCounter();
MyHBaseBolt hbase = new MyHBaseBolt();
// wordSpout ==> countBolt ==> HBaseBolt
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(WORD_SPOUT, spout, 1);
builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
builder.setBolt(HBASE_BOLT, hbase, 10).fieldsGrouping(COUNT_BOLT, new Fields("word"));
if (args.length == 0) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word", config, builder.createTopology());
Thread.sleep(10000);
cluster.killTopology("word");
cluster.shutdown();
System.exit(0);
} else {
config.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
}
}
如果編譯遇到類似:java.io.IOException: No FileSystem for scheme: hdfs這樣關(guān)于hadoop的問(wèn)題,可能需要添加hadoop相關(guān)依賴包,如:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.4</version>
</dependency>
六.總結(jié)
本文通過(guò)一個(gè)詞頻統(tǒng)計(jì)后通過(guò)HBase進(jìn)行結(jié)果持久化的topology示例,展示了如何在Storm的中直接使用HBase的java api來(lái)實(shí)現(xiàn)基本的讀寫操作,希望能為想自己完成Storm的HBase集成而不得其法的朋友提供一個(gè)入門指引。
附錄
- WordSpout.java:
public class WordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private static final String[] MSGS = new String[]{
"Storm", "HBase", "Integration", "example", "by ", "aloo", "in", "Aug",
};
private static final Random random = new Random();
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
String word = MSGS[random.nextInt(8)];
collector.emit(new Values(word));
}
}
- WordCounterBolt.java:
public class WordCounter extends BaseBasicBolt {
private Map<String, Integer> _counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
int count;
if(_counts.containsKey(word)){
count = _counts.get(word);
} else {
count = 0;
}
count ++;
_counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
- hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://xxx.xx.xx.xx:9000/hbase</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/hadoop/hbase/storm/zookeeper</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>zknode1,zdnode2,zknode3</value>
</configuration>