Storm的一個拓?fù)渲邪⊿pout和Blots。
代碼主要體現(xiàn)在Spout讀取數(shù)據(jù),然后發(fā)送給Blot去處理。
首先添加maven依賴
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
</dependency>
Spout讀取數(shù)據(jù)
實(shí)現(xiàn)Spout有兩種方式,一種是繼承BaseRichSpout,一種是實(shí)現(xiàn)IRichSpout。
其實(shí)BaseRichSpout也是實(shí)現(xiàn)了IRichSpout。
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout
這里我就用BaseRichSpout去實(shí)現(xiàn)讀取文件
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public class SpoutTest extends BaseRichSpout implements Serializable {
SpoutOutputCollector collector;
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
public void nextTuple() {
//讀取目錄`d:\\test`下的txt格式的文件,你也可以添加其他類型
Collection<File> listFiles = FileUtils.listFiles(new File("d:\\test"), new String[] { "txt" }, true);
for (File file : listFiles) {
// 行格式發(fā)送
try {
//按行發(fā)送
List<String> lines = FileUtils.readLines(file,"utf-8");
for (String line : lines) {
this.collector.emit(new Values(lines));
}
} catch (IOException e) {
e.printStackTrace();
}
// 文件已經(jīng)處理完成
try {
File srcFile = file.getAbsoluteFile();
File destFile = new File(srcFile + ".done." + System.currentTimeMillis());
FileUtils.moveFile(srcFile, destFile);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("lines"));
}
}
Blot處理數(shù)據(jù)
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class BlotTest extends BaseRichBolt {
private Map conf;
private TopologyContext context;
private OutputCollector collector;
//準(zhǔn)備階段,初始化conf,context和collector
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
public void execute(Tuple tuple) {
//接收tuple中的信息
String line = tuple.getStringByField("line");
if ("".equals(line) || null == line){
return;
}
System.out.println(line);
//。。。這塊處理數(shù)據(jù)或者存儲數(shù)據(jù)庫
//如果有需要發(fā)送到下一個blot,在下一個blot存儲
// collector.emit(new Values(line));
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//outputFieldsDeclarer.declare(new Fields("phone","time"));
}
}
上面的Blot接收之前的Spout傳過來的數(shù)據(jù)。如果為空直接返回。如果還需要過濾,則可以調(diào)用上面注釋的代碼繼續(xù)發(fā)送到下一個blot,當(dāng)然需要下面的declareOutputFields()和spout一樣。
最后主方法
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
public class Main {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
try {
//移動
builder.setSpout("spoutid",new SpoutTest());
builder.setBolt("blotid", new BlotTest()).shuffleGrouping("spoutid");
//對應(yīng)Blot里面的注釋,以phone分組,給它開了4個并行度
// builder.setBolt("blotid", new BlotTest(),4).fieldsGrouping("spoutid",new Fields("phone"));
Config config = new Config();
//這里對數(shù)據(jù)準(zhǔn)確性要求不高,就不設(shè)置ack數(shù)量了,按需設(shè)置,不然會有處理堆積的問題
config.setNumAckers(0);
//>0是集群用的,else里面是本機(jī)運(yùn)行
if (args.length>0){
config.setNumWorkers(Integer.parseInt(args[1]));
config.setMaxSpoutPending(5000);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
}else {
String topologyName = Main.class.getSimpleName();
StormTopology stormTopology = builder.createTopology();
LocalCluster lCluster = new LocalCluster();
lCluster.submitTopology(topologyName, config, stormTopology);
}
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
}
}
上面有一行注釋的,是按照BlotTest下面的注釋分組。里面的并行度具體我沒研究過,根據(jù)業(yè)務(wù)設(shè)定吧。
到此,簡單的一個拓?fù)渚屯瓿闪恕?br>
那么問題來了,如果storm一直處理,什么時候去存入數(shù)據(jù)庫等。這就涉及到storm的定時器
把上面的代碼稍微改一下
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class Blot1Test extends BaseRichBolt {
private Map conf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
Map map = new HashMap();
public void execute(Tuple tuple) {
String line = tuple.getStringByField("lines");
if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
//接收到定時信號的時候,處理這里,其余時間走else
savemaptodb();
return;
}else {
map.put(line,line);
return;
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
//設(shè)置10秒發(fā)送一次tick心跳
@SuppressWarnings("static-access")
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return conf;
}
}
上面這個getComponentConfiguration()就是實(shí)現(xiàn)了這個blot的定時,還有全局的定時器,在Main類的config加上
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//設(shè)置定時器,每五秒發(fā)送一次系統(tǒng)級別的
然后在每個blot的execute方法里面判斷是否觸發(fā)
tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
這樣就實(shí)現(xiàn)了一個簡單是storm例子(說實(shí)話我沒有驗(yàn)證,都是手敲出來的。公司的代碼在內(nèi)網(wǎng),拿出來太麻煩),但是大體上是這樣的。
這個拓?fù)錄]有失敗機(jī)制,也不是從hdfs或者kafka讀取。自己去寫吧。遇到問題才能真正掌握。