storm_定時(shí)機(jī)制tick使用

<h4>背景:</h4>
我們知道在java中可以有最少3鐘方式來實(shí)現(xiàn)定時(shí)任務(wù):1、普通thread(里面使用while循環(huán)以及sleep) 2、Timer和TimerTask 3、ScheduledExecutorService ,另外還有功能更全的Quartz框架或者是spring集成的Quartz。當(dāng)然從標(biāo)題就知道我們今天不是講這些東西,而是講講storm中自帶的定時(shí)功能使用,可以使用場(chǎng)景如:每分鐘統(tǒng)計(jì)訂單數(shù)據(jù)累計(jì)數(shù)據(jù)總和等。當(dāng)然這其中最好的搭配就是使用kafka來做訂單消息推送,目前我們只講個(gè)本地main demo。
<h4>一、tick全解</h4>
<b>1、tick的功能</b>
Apache Storm中內(nèi)置了一種定時(shí)機(jī)制——tick,它能夠讓任何bolt的所有task每隔一段時(shí)間(精確到秒級(jí),用戶可以自定義)收到一個(gè)來自systemd的tick stream的tick tuple,bolt收到這樣的tuple后可以根據(jù)業(yè)務(wù)需求完成相應(yīng)的處理。Tick功能從Apache Storm 0.8.0版本開始支持,本文在Apache Storm 0.9.5上測(cè)試。
<b>2、為bolt設(shè)置tick</b>
若希望某個(gè)bolt每隔一段時(shí)間做一些操作,那么可以將bolt繼承BaseBasicBolt/BaseRichBolt,并重寫getComponentConfiguration()方法。在方法中設(shè)置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值,單位是秒。
getComponentConfiguration()是backtype.storm.topology.IComponent接口中定義的方法,在此方法的實(shí)現(xiàn)中可以定義以Topology開頭的此bolt特定的Config。
<pre>


</pre>
這樣設(shè)置之后,此bolt的所有task都會(huì)每隔一段時(shí)間收到一個(gè)來自systemd的tick stream的tick tuple,因此execute()方法可以實(shí)現(xiàn)如下:
<pre>

</pre>
<b>3、全局tick</b>
若希望Topology中的每個(gè)bolt都每隔一段時(shí)間做一些操作,那么可以定義一個(gè)Topology全局的tick,同樣是設(shè)置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值:
<pre>
</pre>
<b>當(dāng)我們?cè)谡麄€(gè)Topology上設(shè)置tick和我們單個(gè)運(yùn)算bolt上沖突時(shí),其優(yōu)先級(jí)如何呢?事實(shí)是在更小范圍的bolt設(shè)置的tick優(yōu)先級(jí)更高</b>
<b>4、定時(shí)精度問題</b>
Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS是精確到秒級(jí)的。例如某bolt設(shè)置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS為10s,理論上說bolt的每個(gè)task應(yīng)該每個(gè)10s收到一個(gè)tick tuple。實(shí)際測(cè)試發(fā)現(xiàn),這個(gè)時(shí)間間隔的精確性是很高的,一般延遲(而不是提前)時(shí)間在1-2ms左右。
<h4>二、代碼實(shí)現(xiàn)</h4>
1、spout代碼
<pre>
public class TickWordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {"a","b","c"};
private int index = 0;
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index ++;
if(index >= sentences.length){
index = 0;
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
@SuppressWarnings("rawtypes")
public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
</pre>
2、bolt代碼
<pre>
public class TickWordCountBolt extends BaseBasicBolt{
Map<String, Integer> counts = new ConcurrentHashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
System.err.println("TickWordCount bolt: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
//模擬聚合打印結(jié)果
for (String key : counts.keySet()) {
System.err.println("key: " + key + " count: " + counts.get(key));
}
//模擬10秒鐘的結(jié)果處理以后清空操作
counts.clear();
} else {
String result = tuple.getStringByField("word");
if(counts.get(result) == null){
counts.put(result, 1);
}else{
counts.put(result, counts.get(result) + 1);
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
//設(shè)置10秒發(fā)送一次tick心跳
@SuppressWarnings("static-access")
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return conf;
}
}
</pre>
3、main調(diào)試代碼
<pre>
public class TickTest {
@SuppressWarnings("static-access")
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TickWordSpout());
//啟動(dòng)3個(gè)線程按word值進(jìn)行分組處理
builder.setBolt("count", new TickWordCountBolt(),3).fieldsGrouping("spout", new Fields("word"));
Config conf = new Config();
//設(shè)置一個(gè)全局的Topology發(fā)送tick心跳時(shí)間,測(cè)試優(yōu)先級(jí)
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 7);
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
}
}
}
---------------------------------------輸出結(jié)果------------------------------------------------
TickWordCount bolt: 2016-09-17 12:41:23:031
key: b count: 3014
TickWordCount bolt: 2016-09-17 12:41:23:041
key: c count: 3017
TickWordCount bolt: 2016-09-17 12:41:23:053
key: a count: 3021
</br>
TickWordCount bolt: 2016-09-17 12:41:33:031
key: b count: 3294
TickWordCount bolt: 2016-09-17 12:41:33:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:41:33:053
key: a count: 3295
</br>
TickWordCount bolt: 2016-09-17 12:41:43:031
key: b count: 3294
TickWordCount bolt: 2016-09-17 12:41:43:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:41:43:053
key: a count: 3293
</br>
TickWordCount bolt: 2016-09-17 12:41:53:031
key: b count: 3297
TickWordCount bolt: 2016-09-17 12:41:53:041
key: c count: 3297
TickWordCount bolt: 2016-09-17 12:41:53:053
key: a count: 3298
</br>
TickWordCount bolt: 2016-09-17 12:42:03:031
key: b count: 3293
TickWordCount bolt: 2016-09-17 12:42:03:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:42:03:053
key: a count: 3293
</pre>
從這組測(cè)試數(shù)據(jù)來看,每組都是相隔10s執(zhí)行0延遲,不過在測(cè)試中也有發(fā)現(xiàn)延遲1-2ms的情況,還是比較精準(zhǔn)的。
<h4>三、tick實(shí)現(xiàn)代碼淺顯分析</h4>
TopologyBuilder.setBolt
<pre>
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}
</pre>
<pre>
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
validateUnusedId(id);
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
return new BoltGetter(id);
}
</pre>
<pre>
<b> //Map conf = component.getComponentConfiguration();能夠獲取設(shè)置的tick發(fā)送心跳的設(shè)置</b>
private void initCommon(String id, IComponent component, Number parallelism) {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if(parallelism!=null) common.set_parallelism_hint(parallelism.intValue());
Map conf = component.getComponentConfiguration();
if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}
</pre>
tick功能的使用就講到這里啦。。。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容