package com.csylh;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
/**
* Description:使用Storm實(shí)現(xiàn)累加求和操作
*
* @author: 留歌36
* Date:2018/9/3 16:50
*/
public class LocalSumStormTopology {
/**
* Spout需要繼承BaseRichSpout
* 數(shù)據(jù)源需要產(chǎn)生數(shù)據(jù)并發(fā)射到Bolt
*/
public static class DataSourceSpout extends BaseRichSpout{
//定義一個(gè)發(fā)射器
private SpoutOutputCollector collector;
/**
* 初始化方法 只是會(huì)被調(diào)用一次
* @param conf 配置參數(shù)
* @param context 上下文:相當(dāng)于一個(gè)框 可以從里面獲取許多東西
* @param collector 數(shù)據(jù)發(fā)射器
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
//將傳入的collector發(fā)射器 對(duì)私有變量 進(jìn)行賦初值
this.collector = collector;
}
int number = 0;
/**
* 用于產(chǎn)生數(shù)據(jù)
* 生產(chǎn)中肯定是從消息隊(duì)列中獲取數(shù)據(jù)
* 這個(gè)方法是一個(gè)死循環(huán)
*/
@Override
public void nextTuple() {
//發(fā)送方式,調(diào)用上面定義的數(shù)據(jù)發(fā)射器
this.collector.emit(new Values(number++));
System.out.println("Spout==》發(fā)送的數(shù)據(jù):" + number);
//每隔1s中發(fā)射一次,防止數(shù)據(jù)產(chǎn)生太快
Utils.sleep(1000);
}
/**
* 聲明輸出字段
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
}
}
/**
* Bolt需要繼承BaseRichBolt
* 用于接收數(shù)據(jù)并對(duì)數(shù)據(jù)進(jìn)行處理
*/
public static class SumBolt extends BaseRichBolt{
/**
* 初始化方法 ,會(huì)被執(zhí)行一次
* @param stormConf
* @param context
* @param collector 這里的數(shù)據(jù)發(fā)射器,由于業(yè)務(wù)邏輯中沒(méi)有沒(méi)有必要進(jìn)行放下發(fā)的操作,所以就是沒(méi)有必要進(jìn)行new一個(gè)
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
int sum = 0;
/**
* 也是一個(gè)死循環(huán) ,職責(zé): 獲取Spout發(fā)射過(guò)來(lái)的數(shù)據(jù)
* @param input
*/
@Override
public void execute(Tuple input) {
//Bolt中獲取值可以通過(guò)index獲取
// 也可以根據(jù)上一個(gè)環(huán)節(jié)中定義的filed的名稱獲取(***推薦)
Integer value = input.getIntegerByField("num");
sum += value;
System.out.println("Bolt :Sum = ["+ sum + "]");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout",new DataSourceSpout());
builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");
//創(chuàng)建一個(gè)本地的Storm集群 ,本地模式運(yùn)行,不需要搭建Storm集群
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());
}
}
使用Storm實(shí)現(xiàn)累加求和操作
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
相關(guān)閱讀更多精彩內(nèi)容
- 對(duì)列表里某個(gè)類中的某個(gè)屬性進(jìn)行求和操作非常常見(jiàn)。在Java 8誕生前,常用for循環(huán)手動(dòng)累加,往往要寫(xiě)上三四行代碼...
- 數(shù)據(jù)文件如下: 求和cat data|awk '{sum+=$1} END {print "Sum = ", su...
- 一個(gè)簡(jiǎn)單的實(shí)現(xiàn)tableView來(lái)求金額總和的demo, 圖例如下: Github上代碼:[https://git...
- 在https://github.com/go-sql-driver/mysql 可以看到關(guān)于go的連接池和超時(shí)相關(guān)...