storm中的任務(wù)
-
storm中的任務(wù)的結(jié)構(gòu)是Topology(拓?fù)鋱D),這個(gè)拓?fù)鋱D是一個(gè)有向無(wú)環(huán)圖(DAG),DAG能夠清楚的表達(dá)鏈?zhǔn)降娜蝿?wù),每一個(gè)節(jié)點(diǎn)都是一個(gè)任務(wù),邊的方向代表著數(shù)據(jù)流的方向。如下圖
Paste_Image.png - storm任務(wù)中數(shù)據(jù)流的數(shù)據(jù)結(jié)構(gòu)是一個(gè)個(gè)tuple,tuple元組是任意數(shù)據(jù)結(jié)構(gòu)類型的鍵值對(duì)組合。例如:(k1:v1, k2:v2, k3:v3, ····)
- Spout是數(shù)據(jù)采集器,從數(shù)據(jù)源采集數(shù)據(jù),轉(zhuǎn)成tuple發(fā)射到后面的bolt處理
- Bolt是數(shù)據(jù)處理器,可執(zhí)行數(shù)據(jù)過(guò)濾,分析等操作。
開發(fā)流程
-
設(shè)計(jì)Topology圖
Paste_Image.png - 按照Topology圖,創(chuàng)建maven項(xiàng)目后,依次寫各個(gè)任務(wù)節(jié)點(diǎn)。首先寫SentenceSpout節(jié)點(diǎn)。
package strom.strom;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class SentenceSpout extends BaseRichSpout {
// tuple發(fā)射器
private SpoutOutputCollector collector;
private static final String[] SENTENCES = { "hadoop yarn mapreduce spark", "flume hadoop hive spark",
"oozie yarn spark storm", "storm yarn mapreduce error", "error flume storm spark" };
/*
* 用于指定只針對(duì)本組件的一些特殊配置
*/
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
/*
* spout組件的初始化方法 創(chuàng)建這個(gè)sentenceSpout組件實(shí)例時(shí)調(diào)用一次
*/
@Override
public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
// 用實(shí)例變量接收發(fā)射器
this.collector = arg2;
}
/*
* 聲明向后面的組件發(fā)送tuple的key是什么
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
arg0.declare(new Fields("sentence"));
}
/*
* 1)指定tuple的value值,封裝tuple后,并將其發(fā)射給后面的組件, 2) 會(huì)迭代式的循環(huán)調(diào)用這個(gè)方法
*/
@Override
public void nextTuple() {
// 從數(shù)組中隨意獲取一個(gè)值
String sentence = SENTENCES[new Random().nextInt(SENTENCES.length)];
// 指定value值并封裝為tuple后,把tuple發(fā)射給后面的組件
this.collector.emit(new Values(sentence));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 寫splitbolt組件
package strom.strom;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SplitBolt implements IRichBolt {
// bolt組件中的發(fā)射器
private OutputCollector collector;
@Override
public void cleanup() {
}
/*
* 設(shè)置key名稱
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
arg0.declare(new Fields("word"));
}
/*
* 每次接受到前面組件發(fā)送的tuple調(diào)用一次 ,封裝好tuple后發(fā)射
*/
@Override
public void execute(Tuple input) {
// 獲取key value對(duì)后,取出value值
String values = input.getStringByField("sentence");
if (values != null && !"".equals(values)) {
// 按空格分割value
String[] valuelist = values.split(" ");
for (String value : valuelist) {
// 向后面的組件發(fā)射封裝好的tuple
this.collector.emit(new Values(value));
}
}
}
/*
* bolt組件初始化方法,只會(huì)調(diào)用一次
*/
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
this.collector = arg2;
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
4.CountBolt組件實(shí)現(xiàn)計(jì)數(shù)邏輯
package strom.strom;
//
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class CountBolt extends BaseRichBolt {
// 發(fā)射器
private OutputCollector collector;
// 為了計(jì)數(shù)
private Map<String, Integer> counts;
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
this.collector = arg2;
this.counts = new HashMap<String, Integer>();
}
/*
* 聲明key名稱,可以同時(shí)聲明多個(gè)
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
arg0.declare(new Fields("word", "count"));
}
/*
* 統(tǒng)計(jì)單詞
*/
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
int count = 1;
// 如果這個(gè)單詞已經(jīng)存在,則取出count再加一
if (counts.containsKey(word)) {
count = counts.get(word) + 1;
}
counts.put(word, count);
this.collector.emit(new Values(word, count));
}
}
5 . PrintBolt組件
package strom.strom;
//
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
public class PrintBolt extends BaseRichBolt {
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
}
/*
* 打印到控制臺(tái)
*/
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
int count = input.getIntegerByField("count");
System.out.println(word + "---->" + count);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
}
6 . WordCountTopology類用來(lái)連接這些組件
package strom.strom;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class WordCountTopology {
private static final String SPOUT_ID = "sentenceSpout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String COUNT_BOLT = "countBolt";
private static final String PRINT_BOLT = "printBolt";
public static void main(String[] args) {
// 構(gòu)造Topology
TopologyBuilder builder = new TopologyBuilder();
// 指定spout
builder.setSpout(SPOUT_ID, new SentenceSpout());
// 指定bolt,并指定當(dāng)有有多個(gè)bolt時(shí),數(shù)據(jù)流發(fā)射的分組策略
builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(SPOUT_ID);
// 因?yàn)橐WC正確的單詞計(jì)數(shù),同一個(gè)單詞一定要?jiǎng)澐值酵粋€(gè)CountBolt上,所以按照字段值分組
builder.setBolt(COUNT_BOLT, new CountBolt()).fieldsGrouping(SPLIT_BOLT, new Fields("word"));
// 全局分組,所有tuple發(fā)射到一個(gè)printbolt,一般是id最小的那一個(gè)
builder.setBolt(PRINT_BOLT, new PrintBolt()).globalGrouping(COUNT_BOLT);
Config conf = new Config();
if (args == null || args.length == 0) {
// 本地執(zhí)行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordcount", conf, builder.createTopology());
} else {
// 提交到集群上執(zhí)行
// 指定使用多少個(gè)進(jìn)程來(lái)執(zhí)行該Topology
conf.setNumWorkers(1);
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}
}
}
-
本地執(zhí)行測(cè)試
Paste_Image.png -
打成jar包后上傳到storm集群測(cè)試
Paste_Image.png
下面的jar包包含著依賴的包,上面的jar包中沒(méi)有包括,所以我們選擇使用下面這個(gè)jar包。
上傳到集群上然后執(zhí)行
$ bin/storm jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar strom.strom.WordCountTopology wordcount
在UI中查看運(yùn)行情況

Paste_Image.png
查看運(yùn)行日志

Paste_Image.png

Paste_Image.png

Paste_Image.png
查看拓?fù)鋱D

Paste_Image.png

Paste_Image.png

Paste_Image.png



