這是一個JStorm使用教程,不包含環(huán)境搭建教程,直接在公司現(xiàn)有集群上跑任務(wù),關(guān)于JStorm集群環(huán)境搭建,后續(xù)研究完會考慮額外寫一篇博客。
你如果想了解JStorm是什么,有多牛逼什么什么的,請看最后的參考博客鏈接,里面有各種版本的介紹,我就不在這里總結(jié)這種東西了,我相信這些東西你第一次接觸的時候會看,等學(xué)了JStorm之后也不會再去看這些東西了。。。
簡介
-
JStorm和MapReduce的一些對比
compare-table.png
一些關(guān)鍵概念
- nimbus:主控節(jié)點運行Nimbus守護進程,類似于Hadoop中的ResourceManager,負(fù)責(zé)在集群中分發(fā)代碼,對節(jié)點分配任務(wù),并監(jiān)視主機故障。
- supervisor:每個工作節(jié)點運行Supervisor守護進程,負(fù)責(zé)監(jiān)聽工作節(jié)點上已經(jīng)分配的主機作業(yè),啟動和停止Nimbus已經(jīng)分配的工作進程,類似于Hadoop中的NodeManager。
supervisor會定時從zookeeper獲取拓補信息topologies、任務(wù)分配信息assignments及各類心跳信息,以此為依據(jù)進行任務(wù)分配。
在supervisor同步時,會根據(jù)新的任務(wù)分配情況來啟動新的worker或者關(guān)閉舊的worker并進行負(fù)載均衡。 - worker:Worker是具體處理Spout/Bolt邏輯的進程,根據(jù)提交的拓?fù)渲衏onf.setNumWorkers(3);定義分配每個拓?fù)鋵?yīng)的worker數(shù)量,Storm會在每個Worker上均勻分配任務(wù),一個Worker只能執(zhí)行一個topology,但是可以執(zhí)行其中的多個任務(wù)線程。
- task:任務(wù)是指Worker中每個Spout/Bolt線程,每個Spout和Bolt在集群中會執(zhí)行許多任務(wù),每個任務(wù)對應(yīng)一個線程執(zhí)行,可以通過TopologyBuilder類的setSpout()和setBolt()方法來設(shè)置每個Spout或者Bolt的并行度。
- Executor:Task接收到任務(wù)就是在Executor中執(zhí)行的,可以理解為執(zhí)行Task專門的一個線程。
- topology:Storm中Topology的概念類似于Hadoop中的MapReduce Job,是一個用來編排、容納一組計算邏輯組件(Spout、Bolt)的對象(Hadoop MapReduce中一個Job包含一組Map Task、Reduce Task),這一組計算組件可以按照DAG圖的方式編排起來(通過選擇Stream Groupings來控制數(shù)據(jù)流分發(fā)流向),從而組合成一個計算邏輯更加負(fù)責(zé)的對象,那就是Topology。一個Topology運行以后就不能停止,它會無限地運行下去,除非手動干預(yù)(顯式執(zhí)行bin/storm kill )或意外故障(如停機、整個Storm集群掛掉)讓它終止。
- spout:Storm中Spout是一個Topology的消息生產(chǎn)的源頭,Spout應(yīng)該是一個持續(xù)不斷生產(chǎn)消息的組件,例如,它可以是一個Socket Server在監(jiān)聽外部Client連接并發(fā)送消息,可以是一個消息隊列(MQ)的消費者、可以是用來接收Flume Agent的Sink所發(fā)送消息的服務(wù),等等。Spout生產(chǎn)的消息在Storm中被抽象為Tuple,在整個Topology的多個計算組件之間都是根據(jù)需要抽象構(gòu)建的Tuple消息來進行連接,從而形成流。
- bolt:Storm中消息的處理邏輯被封裝到Bolt組件中,任何處理邏輯都可以在Bolt里面執(zhí)行,處理過程和普通計算應(yīng)用程序沒什么區(qū)別,只是需要根據(jù)Storm的計算語義來合理設(shè)置一下組件之間消息流的聲明、分發(fā)、連接即可。Bolt可以接收來自一個或多個Spout的Tuple消息,也可以來自多個其它Bolt的Tuple消息,也可能是Spout和其它Bolt組合發(fā)送的Tuple消息。
- tuple:JStorm中信息傳輸?shù)膯挝?,Storm程序是無限執(zhí)行下去的,數(shù)據(jù)流是無止境的,但是每次驅(qū)動程序執(zhí)行的只是一個數(shù)據(jù)流單位,就是Tuple,Spout的一次nextTuple以及Bolt的一次execute的執(zhí)行操作的都是一個Tuple。Tuple只要是任意可序列化對象即可。
生命周期
Topology生命周期
- 上傳代碼并做校驗(/nimbus/inbox);
- 建立本地目錄(/stormdist/topology-id/);
- 建立zookeeper上的心跳目錄;
- 計算topology的工作量(parallelism hint),分配task-id并寫入zookeeper;
- 把task分配給supervisor執(zhí)行;
- 在supervisor中定時檢查是否有新的task,下載新代碼、刪除老代碼,剩下的工作交個小弟worker;
- 在worker中把task拿到,看里面有哪些spout/Bolt,然后計算需要給哪些task發(fā)消息并建立連接;
-
在nimbus將topology終止的時候會將zookeeper上的相關(guān)信息刪除;
topology-lifecycle.png
Spout生命周期
提交時
- 構(gòu)造方法:初始化構(gòu)造參數(shù),其中包含的必須都是可序列化的
- getComponentConfiguration:獲取該類特殊的配置參數(shù),只和該組件相關(guān)的配置,通常
return null - declareOutputFields:獲取該組件會輸出的流、字段列表,其后續(xù)的其他組件訂閱相應(yīng)的流或者字段需要和這里對應(yīng),否則會出錯
- 將內(nèi)存中的該實例序列化為字節(jié)碼文件。
在Worker節(jié)點中執(zhí)行
- 將傳輸過來的字節(jié)碼文件反序列化為類實例
- open:初始化這個組件類實例,可以加載消息隊列消費端、JDBC鏈接池等非可序列化對象
- activate:該實例設(shè)置為活躍狀態(tài)(有數(shù)據(jù)流驅(qū)動時)調(diào)用,過段時間暫時沒有數(shù)據(jù)流驅(qū)動就會睡眠
- nextTuple:循環(huán)調(diào)用,可在這里從數(shù)據(jù)源獲取數(shù)據(jù)emit到下一個節(jié)點,JStorm就會自動循環(huán)調(diào)用執(zhí)行下去
- ack:往后emit的一個Tuple在acker節(jié)點察覺成功了,回調(diào)通知Spout
- fail:往后emit的一個Tuple在acker節(jié)點察覺失敗或者超時了,回調(diào)通知Spout
- deactivate:沒數(shù)據(jù)流驅(qū)動達到一段時間,進入睡眠前調(diào)用
- close:程序停止時調(diào)用,釋放資源
Bolt生命周期
提交時
- 構(gòu)造方法:初始化構(gòu)造參數(shù),其中包含的必須都是可序列化的
- getComponentConfiguration:獲取該類特殊的配置參數(shù),只和該組件相關(guān)的配置,通常
return null - declareOutputFields:獲取該組件會輸出的流、字段列表,其后續(xù)的其他組件訂閱相應(yīng)的流或者字段需要和這里對應(yīng),否則會出錯
- 將內(nèi)存中的該實例序列化為字節(jié)碼文件。
在Worker節(jié)點中執(zhí)行
- 將傳輸過來的字節(jié)碼文件反序列化為類實例
- prepare:初始化這個組件類實例,可以加載配置,數(shù)據(jù)處理類初始化,數(shù)據(jù)輸出對象初始化
- execute:循環(huán)調(diào)用,可在這里從上個節(jié)點獲取Tuple,進行相應(yīng)處理之后emit到下一個節(jié)點,JStorm就會自動循環(huán)調(diào)用執(zhí)行下去
- cleanup:程序停止時調(diào)用,釋放資源
數(shù)據(jù)流向控制

- ShuffleGrouping:對符合條件的目標(biāo)Worker,其中可能的多個Task,隨機分配Task來接收和處理該Tuple
- FieldsGrouping:會按照指定的field值進行分配,可以保證相同field對應(yīng)值的Tuple分配到相同一個Task中執(zhí)行 —— 可以想象成拿指定的field的值hash取模決定哪個Task(具體算法沒研究)
- 除了這兩個其他暫時沒用到,也感覺剩下的比較用不到,等用了再更
- 具體的流的聚合和分發(fā),參考這篇博客,例子很詳細(xì):JStorm流的匯聚和分發(fā)
數(shù)據(jù)流傳輸過程
- Spout中的數(shù)據(jù)源取出一份數(shù)據(jù)(無限循環(huán)取出),作為一個Tuple,emit到下一個節(jié)點
- 根據(jù)Spout中declareOutputFields定義的字段和流,查閱后繼訂閱該節(jié)點或者流的Bolt,Tuple會被發(fā)送到每個訂閱節(jié)點或者流的后繼節(jié)點當(dāng)中
- 后繼訂閱的Bolt節(jié)點接收到該Tuple,使用
tuple.getValueByField通過上一節(jié)點declareOutputFields的字段名獲取相應(yīng)的字段值,也可以根據(jù)fields的聲明順序使用tuple.getValue通過下標(biāo)獲取相應(yīng)的值 - 拿到相應(yīng)的數(shù)據(jù)之后,進行相關(guān)邏輯處理,之后emit到下一個節(jié)點當(dāng)中,以此類推,直到最終節(jié)點將數(shù)據(jù)輸出到mysql、ES、HDFS等存儲系統(tǒng)當(dāng)中
- emit時可以指定相應(yīng)的streamId來指定當(dāng)前的數(shù)據(jù)要傳輸?shù)降哪膫€streamId當(dāng)中(該組件的declareOutputFields需要聲明所需的所有streamId),在Topology構(gòu)建時后繼節(jié)點指定該streamId來訂閱相應(yīng)的數(shù)據(jù)。
編程例子講解
- 這個例子是一個單詞計數(shù)程序,通過一組字符串?dāng)?shù)組中隨機獲取一個檔次作為數(shù)據(jù)源往后輸出,在后續(xù)節(jié)點統(tǒng)計各個單詞被獲取的總次數(shù)。
-
包括RandomSentenceSpout、SplitBolt、CountBolt三個節(jié)點,各個節(jié)點并行度都為1,是一個最簡單的單條鏈?zhǔn)降耐負(fù)?,如?/p>
jstorm-example.jpg
RandomSentenceSpout
- 表示數(shù)據(jù)源,這里用從數(shù)組隨機獲取一個元素作為模擬數(shù)據(jù)源獲取,日常開發(fā)通常是從MQ中獲取相應(yīng)數(shù)據(jù)進行數(shù)據(jù)流驅(qū)動。
/**
* RandomSentenceSpout實現(xiàn)了IRichSpout接口
* Spout需要實現(xiàn)的接口可以是:
* 1,IRichSpout:最基本的Spout,繼承自ISpout, IComponent,沒有任何特殊方法(一般用這個)
* 2,IControlSpout:繼承自IComponent,包括open,close,activate,deactivate,nextTuple,ack(Object msgId),fail等方法
*/
public class RandomSentenceSpout implements IRichSpout {
private static final long serialVersionUID = 4058847280819269954L;
private static final Logger logger = Logger.getLogger(RandomSentenceSpout.class);
//可以理解為JStorm的數(shù)據(jù)傳輸管道,通過這個對象將這個組件的數(shù)據(jù)傳輸?shù)较乱粋€組件當(dāng)中
private SpoutOutputCollector _collector;
//隨機生成對象
private Random _rand;
private String component;
/**
* Spout初始化的時候調(diào)用
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
component = context.getThisComponentId();
}
/**
* 系統(tǒng)框架會不斷調(diào)用
*/
public void nextTuple() {
//模擬數(shù)據(jù)源
String[] sentences = new String[]{"Hello world! This is my first programme of JStorm",
"Hello JStorm,Nice to meet you!", "Hi JStorm, do you have a really good proformance",
"Goodbye JStorm,see you tomorrow"};
//隨機取出字符串
String sentence = sentences[_rand.nextInt(sentences.length)];
//將得到的字符串輸出到下一個組件
//!?。∵@里Values中值填充順序要和下面declareOutputFields中字段聲明順序一致
_collector.emit(new Values(sentence), Time.currentTimeSecs());
Utils.sleep(1000);
}
@Override
public void ack(Object arg0) {
logger.debug("ACK!");
}
public void activate() {
logger.debug("ACTIVE!");
}
public void close() {
}
public void deactivate() {
}
public void fail(Object arg0) {
logger.debug("FAILED!");
}
/**
* 聲明框架有哪些輸出的字段
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//下一個組件通過word這個關(guān)鍵字拿到這個組件往后輸出的單詞sentence
declarer.declare(new Fields("word"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
SplitBolt
- 將獲取的字符串通過空白符分割,并轉(zhuǎn)為小寫之后輸出到下一個節(jié)點。
/**
* IBasicBolt:繼承自IComponent,包括prepare,execut,cleanup等方法
*/
public class SplitBolt extends BaseBasicBolt {
private static final long serialVersionUID = 7104767103420386784L;
private static final Logger logger = Logger.getLogger(SplitBolt.class);
private String component;
/**
* cleanup方法在bolt被關(guān)閉的時候調(diào)用, 它應(yīng)該清理所有被打開的資源。(基本只能在local mode使用)
* 但是集群不保證這個方法一定會被執(zhí)行。比如執(zhí)行task的機器down掉了,那么根本就沒有辦法來調(diào)用那個方法。
* cleanup設(shè)計的時候是被用來在local mode的時候才被調(diào)用(也就是說在一個進程里面模擬整個storm集群),
* 并且你想在關(guān)閉一些topology的時候避免資源泄漏。
* (非 Javadoc)
* @see backtype.storm.topology.base.BaseBasicBolt#cleanup()
*/
@Override
public void cleanup() {
}
//接收消息之后被調(diào)用的方法
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
//以下兩個方式獲取前驅(qū)節(jié)點發(fā)送過來的sentence,一個根據(jù)fieldName,一個根據(jù)字段聲明順序
// String sentence = input.getValueByField("word");
String sentence = input.getString(0);
String[] words = sentence.split("[,|\\s+]");
for (String word : words) {
word = word.trim();
//將非空單詞輸出到下一個節(jié)點
if (!word.isEmpty()) {
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
}
/**
* prepare方法在worker初始化task的時候調(diào)用.
*
* prepare方法提供給bolt一個Outputcollector用來發(fā)射tuple。
* Bolt可以在任何時候發(fā)射tuple — 在prepare, execute或者cleanup方法里面, 或者甚至在另一個線程里面異步發(fā)射。
* 這里prepare方法只是簡單地把OutputCollector作為一個類字段保存下來給后面execute方法 使用。
*/
@Override
public void prepare(Map stromConf, TopologyContext context) {
component = context.getThisComponentId();
}
/**
* declearOutputFields方法僅在有新的topology提交到服務(wù)器,
* 用來決定輸出內(nèi)容流的格式(相當(dāng)于定義spout/bolt之間傳輸stream的name:value格式),
* 在topology執(zhí)行的過程中并不會被調(diào)用.
* (非 Javadoc)
* @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
CountBolt
- 這個組件接收到的每個值都是單個單詞,通過一個內(nèi)存Map統(tǒng)計各個單詞總數(shù)
- 后臺設(shè)置一個異步線程10s一次輸出當(dāng)前Map中的各個單詞總數(shù)
- 日常開發(fā)通常在這個終端節(jié)點將實時計算得到的結(jié)果輸出到HDFS、mysql、HBase、ElasticSearch等存儲系統(tǒng)當(dāng)中
public class CountBolt extends BaseBasicBolt {
private Integer id;
private String name;
private Map<String, Integer> counters;
private String component;
private static final Logger LOG = Logger.getLogger(CountBolt.class);
//異步輸出結(jié)果集的子線程
private AsyncLoopThread statThread;
/**
* On create
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
this.counters = new HashMap<String, Integer>();
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
//異步循環(huán)輸出結(jié)果集
this.statThread = new AsyncLoopThread(new statRunnable());
LOG.info(stormConf.get("abc") + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
component = context.getThisComponentId();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
// declarer.declareStream("coord-"+"word-counter", new Fields("epoch","ebagNum"));
// LOG.info("set stream coord-"+component);
}
//接收消息之后被調(diào)用的方法
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// String str = input.getString(0);
String str = input.getStringByField("word");
if (!counters.containsKey(str)) {
//單詞計數(shù)
counters.put(str, 1);
} else {
//單詞計數(shù)
Integer c = counters.get(str) + 1;
counters.put(str, c);
}
}
/**
* 異步輸出結(jié)果集的死循環(huán)子線程
*/
class statRunnable extends RunnableCallback {
@Override
public void run() {
while (true) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
LOG.info("\n-- Word Counter [" + name + "-" + id + "] --");
for (Map.Entry<String, Integer> entry : counters.entrySet()) {
LOG.info(entry.getKey() + ": " + entry.getValue());
}
LOG.info("");
}
}
}
}
WordCountTopology主入口,拓?fù)錁?gòu)建
- 這里通過setSpout和setBolt將上面的三個節(jié)點連接成線 —— 即最開始說明的鏈?zhǔn)酵負(fù)鋱D
- JStorm提交執(zhí)行相關(guān)執(zhí)行參數(shù)統(tǒng)一寫入一個Properties或Yaml配置文件中,命令行執(zhí)行第一個參數(shù)是該配置文件的路徑
public class WordCountTopology {
private static Logger LOG = LoggerFactory.getLogger(WordCountTopology.class);
//裝載配置文件配置參數(shù)
private static Map conf = new HashMap<Object, Object>();
public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.err.println("Please input configuration file");
System.exit(-1);
}
//加載配置文件配置到內(nèi)存
LoadConf(args[0]);
//構(gòu)建JStorm拓?fù)? TopologyBuilder builder = setupBuilder();
System.out.println("Topology準(zhǔn)備提交");
//提交任務(wù)到集群
submitTopology(builder);
System.out.println("Topology提交完成");
}
//!!!!這里通過setSpout和setBolt設(shè)置各個節(jié)點之間的連接關(guān)系,
// 是這里把所有各自獨立的節(jié)點用線連接起來,構(gòu)建成一張具體的任務(wù)執(zhí)行拓?fù)鋱D
private static TopologyBuilder setupBuilder() throws Exception {
TopologyBuilder builder = new TopologyBuilder();
/*
* 設(shè)置spout和bolt,完整參數(shù)為
* 1,spout的id(即name)
* 2,spout對象
* 3,executor數(shù)量即并發(fā)數(shù),也就是設(shè)置多少個executor來執(zhí)行spout/bolt(此項沒有默認(rèn)null)
*/
//setSpout,聲明Spout名稱Id為sentence-spout,并行度1
builder.setSpout("sentence-spout", new RandomSentenceSpout(), 1);
//setBolt:SplitBolt的grouping策略是上層隨機分發(fā),CountBolt的grouping策略是按照上層字段分發(fā)
//如果想要從多個Bolt獲取數(shù)據(jù),可以繼續(xù)設(shè)置grouping
//聲明Bolt名稱Id為split-bolt,并行度1
builder.setBolt("split-bolt", new SplitBolt(), 1)
//設(shè)置該Bolt的數(shù)據(jù)源為sentence-spout的輸出
.shuffleGrouping("sentence-spout");
//聲明Bolt名稱Id為count-bolt,并行度1
builder.setBolt("count-bolt", new CountBolt(), 1)
//設(shè)置該Bolt的數(shù)據(jù)源為sentence-spout和split-bolt的輸出
//fieldsGrouping保證相同word對應(yīng)的值發(fā)送到同一個Task節(jié)點,這是單詞計數(shù)業(yè)務(wù)需要
.fieldsGrouping("split-bolt", new Fields("word"))
.fieldsGrouping("sentence-spout", new Fields("word"));
return builder;
}
//提交任務(wù)到JStorm集群
private static void submitTopology(TopologyBuilder builder) {
try {
if (local_mode(conf)) {//本地模式,需要有本地JStorm環(huán)境支持
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(
String.valueOf(conf.get("topology.name")), conf,
builder.createTopology());
Thread.sleep(200000);
cluster.shutdown();
} else {
StormSubmitter.submitTopology(
String.valueOf(conf.get("topology.name")), conf,
builder.createTopology());
}
} catch (Exception e) {
LOG.error(e.getMessage(), e.getCause());
}
}
//加載Properties配置文件
private static void LoadProperty(String prop) {
Properties properties = new Properties();
try {
InputStream stream = new FileInputStream(prop);
properties.load(stream);
} catch (FileNotFoundException e) {
System.out.println("No such file " + prop);
} catch (Exception e1) {
e1.printStackTrace();
return;
}
conf.putAll(properties);
}
//加載Yaml配置文件
private static void LoadYaml(String confPath) {
Yaml yaml = new Yaml();
try {
InputStream stream = new FileInputStream(confPath);
conf = (Map) yaml.load(stream);
if (conf == null || conf.isEmpty() == true) {
throw new RuntimeException("Failed to read config file");
}
} catch (FileNotFoundException e) {
System.out.println("No such file " + confPath);
throw new RuntimeException("No config file");
} catch (Exception e1) {
e1.printStackTrace();
throw new RuntimeException("Failed to read config file");
}
}
//根據(jù)后綴名選擇加載配置文件方案
private static void LoadConf(String arg) {
if (arg.endsWith("yaml")) {
LoadYaml(arg);
} else {
LoadProperty(arg);
}
}
public static boolean local_mode(Map conf) {
String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
if (mode != null) {
if (mode.equals("local")) {
return true;
}
}
return false;
}
}
批量用法
基本的用法是每次處理一個tuple,但是這種效率比較低,很多情況下是可以批量獲取消息然后一起處理,批量用法對這種方式提供了支持。打開代碼可以很明顯地發(fā)現(xiàn)jstorm和storm的有著不小的區(qū)別:
// storm 中的定義
public interface IBatchSpout extends Serializable {
void open(Map conf, TopologyContext context);
void emitBatch(long batchId, TridentCollector collector);// 批次發(fā)射tuple
void ack(long batchId); // 成功處理批次
void close();
Map getComponentConfiguration();
Fields getOutputFields();
}
// jstorm中的定義
public interface IBatchSpout extends IBasicBolt, ICommitter, Serializable {
}
另外如果用批次的話就需要改用BatchTopologyBuilder來構(gòu)建拓?fù)浣Y(jié)構(gòu),在IBatchSpout中主要實現(xiàn)的接口如下:
- execute:雖然和IBolt中名字、參數(shù)一致,但是增加了一些默認(rèn)邏輯
- 入?yún)⒌膇nput.getValue(0)表示批次(BatchId)。
- 發(fā)送消息時collector.emit(new Values(batchId, value)),發(fā)送的列表第一個字段表示批次(BatchId)。
- commit:批次成功時調(diào)用,常見的是修改offset。
- revert:批次失敗時調(diào)用,可以在這里根據(jù)offset取出批次數(shù)據(jù)進行重試。
Ack機制
為保證無數(shù)據(jù)丟失,Storm/JStorm使用了非常漂亮的可靠性處理機制,如圖當(dāng)定義Topology時指定Acker,JStorm除了Topology本身任務(wù)外,還會啟動一組稱為Acker的特殊任務(wù),負(fù)責(zé)跟蹤Topolgogy DAG中的每個消息。每當(dāng)發(fā)現(xiàn)一個DAG被成功處理完成,Acker就向創(chuàng)建根消息的Spout任務(wù)發(fā)送一個Ack信號。Topology中Acker任務(wù)的并行度默認(rèn)parallelism hint=1,當(dāng)系統(tǒng)中有大量的消息時,應(yīng)該適當(dāng)提高Acker任務(wù)的并行度。
Acker按照Tuple Tree的方式跟蹤消息。當(dāng)Spout發(fā)送一個消息的時候,它就通知對應(yīng)的Acker一個新的根消息產(chǎn)生了,這時Acker就會創(chuàng)建一個新的Tuple Tree。當(dāng)Acker發(fā)現(xiàn)這棵樹被完全處理之后,他就會通知對應(yīng)的Spout任務(wù)。
Acker任務(wù)保存了數(shù)據(jù)結(jié)構(gòu)
Map<MessageID,Map< TaskID, Value>>,其中MessageID是Spout根消息ID,TaskID是Spout任務(wù)ID,Value表示一個64bit的長整型數(shù)字,是樹中所有消息的隨機ID的異或結(jié)果。通過TaskID,Acker知道當(dāng)消息樹處理完成后通知哪個Spout任務(wù),通過MessageID,Acker知道屬于Spout任務(wù)的哪個消息被成功處理完成。Value表示了整棵樹的的狀態(tài),無論這棵樹多大,只需要這個固定大小的數(shù)字就可以跟蹤整棵樹。當(dāng)消息被創(chuàng)建和被應(yīng)答的時候都會有相同的MessageID發(fā)送過來做異或。當(dāng)Acker發(fā)現(xiàn)一棵樹的Value值為0的時候,表明這棵樹已經(jīng)被成功處理完成。-
舉例說明具體流程,以下為拓?fù)洌?/p>
ack-example.jpg Acker數(shù)據(jù)的變化過程:(算法)
Step1:A發(fā)送T0給B后:
產(chǎn)生一個隨機數(shù)r0,樹種存R0:R0=r0
<id0,<taskA,R0>>
# ---------
Step2.B接收到T0并成功處理后向C發(fā)送T1,向D發(fā)送T2:
接收到上級傳過來的R0,自己傳給兩個下家,產(chǎn)生兩個隨機數(shù)代表下家存入樹中:R1=R0^r1^r2=r0^r1^r2
<id0,<taskA,R0^R1>>
=<id0,<taskA,r0^r0^r1^r2>>
=<id0,<taskA,r1^r2>>
# ---------
Step3.C接收到T1并成功處理后:
接收到上家傳過來的r1,沒有下家:R2=r1
<id0,<taskA,r1^r2^R2>>
=<id0,<taskA,r1^r2^r1>>
=<id0,<taskA,r2>>
# ---------
Step4.D接收到T2并成功處理后:
接收到上家傳過來的r2,沒有下家:R3=r2
<id0,<taskA,r2^R3>>
=<id0,<taskA,r2^r2>>
=<id0,<taskA,0>>
當(dāng)結(jié)果為0時Acker可以通知taskA根消息id0的消息樹已被成功處理完成,調(diào)用Spout的ack方法通知,若超時發(fā)現(xiàn)消息樹中值不為0,調(diào)用Spout中的fail。
-
整體節(jié)點間通信:
jstorm-ack.png
- 需要指出的是,Acker并不是必須的,當(dāng)實際業(yè)務(wù)可以容忍數(shù)據(jù)丟失情況下可以不用Acker,對數(shù)據(jù)丟失零容忍的業(yè)務(wù)必須打開Acker,另外當(dāng)系統(tǒng)的消息規(guī)模較大是可適當(dāng)增加Acker的并行度。
JStorm事務(wù)
事務(wù)拓?fù)洳⒉皇切碌臇|西,只是在原始的ISpout、IBolt上做了一層封裝。在事務(wù)拓?fù)渲幸圆⑿校╬rocessing)和順序(commiting)混合的方式來完成任務(wù),使用Transactional Topology可以保證每個消息只會成功處理一次。不過需要注意的是,在Spout需要保證能夠根據(jù)BatchId進行多次重試,在這里有一個基本的例子,這里有一個不錯的講解。
Trident
這次一種更高級的抽象(甚至不需要知道底層是怎么map-reduce的),所面向的不再是spout和bolt,而是stream。主要涉及到下面幾種接口:
- 在本地完成的操作
- Function:自定義操作。
- Filters:自定義過濾。
- partitionAggregate:對同批次的數(shù)據(jù)進行l(wèi)ocal combiner操作。
- project:只保留stream中指定的field。
- stateQuery、partitionPersist:查詢和持久化。
- 決定Tuple如何分發(fā)到下一個處理環(huán)節(jié)
- shuffle:隨機。
- broadcast:廣播。
- partitionBy:以某一個特定的field進行hash,分到某一個分區(qū),這樣該field位置相同的都會放到同一個分區(qū)。
- global:所有tuple發(fā)到指定的分區(qū)。
- batchGlobal:同一批的tuple被放到相同的分區(qū)(不同批次不同分區(qū))。
- partition:用戶自定義的分區(qū)策略。
- 不同partition處理結(jié)果的匯聚操作
- aggregate:只針對同一批次的數(shù)據(jù)。
- persistentAggregate:針對所有批次進行匯聚,并將中間狀態(tài)持久化。
- 對stream中的tuple進行重新分組,后續(xù)的操作將會對每一個分組獨立進行(類似sql中的group by)
- groupBy
- 將多個Stream融合成一個
- merge:多個流進行簡單的合并。
- join:多個流按照某個KEY進行UNION操作(只能針對同一個批次的數(shù)據(jù))。
在這里有一個jstorm中使用Trident的簡單例子。
故障恢復(fù)
-
節(jié)點故障
- Nimbus故障。Nimbus本身無狀態(tài),所以Nimbus故障不會影響正在正常運行任務(wù),另外Nimbus HA保證Nimbus故障后可以及時被備份Nimbus接管。
- Supervisors節(jié)點故障。Supervisor故障后,Nimbus會將故障節(jié)點上的任務(wù)遷移到其他可用節(jié)點上繼續(xù)運行,但是Supervisor故障需要外部監(jiān)控并及時手動重啟。
- Worker故障。Worker健康狀況監(jiān)控由Supervisor負(fù)責(zé),當(dāng)Woker出現(xiàn)故障時,Supervisor會及時在本機重試重啟。
- Zookeeper節(jié)點故障。Zookeeper本身具有很好的故障恢復(fù)機制,能保證至少半數(shù)以上節(jié)點在線就可正常運行,及時修復(fù)故障節(jié)點即可。
-
任務(wù)失敗
- Spout失敗。消息不能被及時被Pull到系統(tǒng)中,造成外部大量消息不能被及時處理,而外部大量計算資源空閑。
- Bolt失敗。消息不能被處理,Acker持有的所有與該Bolt相關(guān)的消息反饋值都不能回歸到0,最后因為超時最終Spout的fail將被調(diào)用。
- Acker失敗。Acker持有的所有反饋信息不管成功與否都不能及時反饋到Spout,最后同樣因為超時Spout的fail將被調(diào)用。
- 任務(wù)失敗后,需要Nimbus及時監(jiān)控到并重新分配失敗任務(wù)。
JStorm使用感受
- JStorm各個節(jié)點之間是松耦合的,各個節(jié)點之間的通信只和Tuple數(shù)據(jù)流結(jié)構(gòu)相關(guān),其他處理邏輯各自獨立
- JStorm不處理數(shù)據(jù)的存儲服務(wù),計算結(jié)果自行存儲到HDFS、HBase、Mysql、ElasticSearch等存儲系統(tǒng)當(dāng)中
- JStorm的拓?fù)涔?jié)點設(shè)計中,應(yīng)該把延時操作分發(fā)到多個節(jié)點當(dāng)中執(zhí)行,每個節(jié)點只處理各自單一的功能邏輯,如上面的例子,我把單詞分割和單詞計數(shù)分成兩個Bolt來實現(xiàn),這才是流式計算的特點,讓數(shù)據(jù)流動起來,而不是在一個節(jié)點完成所有工作,也保證了程序可用性更強
- JStorm各個節(jié)點內(nèi)部的處理邏輯非常開放,想怎么處理都行,只要最終往后輸出相應(yīng)的Tuple即可,編程時非常自由,不像MapReduce,很多操作都在MR模型中得到限制




