在storm筆記:Trident應(yīng)用中說了下Trident的使用,這里說下Trident幾種狀態(tài)的變化及其對(duì)應(yīng)API的使用。
本文內(nèi)容來自Trident State,部分內(nèi)容根據(jù)實(shí)際情況做出修改。
Trident中有對(duì)狀態(tài)數(shù)據(jù)進(jìn)行讀取和寫入操作的一流抽象工具。狀態(tài)既可以保存在拓?fù)鋬?nèi)部,比如保存在內(nèi)容中并由HDFS存儲(chǔ),也可以通過外部存儲(chǔ)(比如Memcached或Cassandra)存儲(chǔ)在數(shù)據(jù)庫中。而對(duì)于Trident的API而言,這兩種機(jī)制沒有任何區(qū)別。
Trident以容錯(cuò)的方式管理狀態(tài),以便在重試或失敗時(shí)的狀態(tài)更新是冪等的。在大數(shù)據(jù)處理中,數(shù)據(jù)處理的冪等性是非常重要的一個(gè)指標(biāo),這樣能夠保證每個(gè)消息即使處理了多次,結(jié)果也像是只處理了一次一樣。
在進(jìn)行狀態(tài)更新時(shí)可能需要各種級(jí)別的容錯(cuò)能力,在這之前,我們來看一個(gè)例子說明實(shí)現(xiàn)“恰好一次”語義所需的技巧。比如,正在對(duì)流中的數(shù)據(jù)進(jìn)行計(jì)數(shù)聚合操作,每次處理新的元組時(shí),都會(huì)將運(yùn)行的計(jì)數(shù)結(jié)果存儲(chǔ)在數(shù)據(jù)庫中。
如果發(fā)生故障時(shí),元組將重新執(zhí)行計(jì)數(shù)操作。這就會(huì)在執(zhí)行狀態(tài)更新時(shí)出現(xiàn)問題,因?yàn)檫@個(gè)時(shí)候不知道是不是已經(jīng)更新過該元組狀態(tài)。也許還沒有處理該元組數(shù)據(jù),這個(gè)時(shí)候就需要增加計(jì)數(shù)。也許已經(jīng)處理該元組,并成功增加計(jì)數(shù),但是在下一步的時(shí)候出現(xiàn)問題,這種情況下,就不應(yīng)該增加計(jì)數(shù)。也有可能是處理元組正常,更新計(jì)數(shù)是異常,這個(gè)時(shí)候就需要更新計(jì)數(shù)。
所以說,如果只是在數(shù)據(jù)庫中存儲(chǔ)計(jì)數(shù)信息,就不知道元組是否已經(jīng)處理過。因此,就需要更多的信息作為輔助。Trident提供了下面三個(gè)性質(zhì),來實(shí)現(xiàn)“恰好一次”的處理:
- 元組都是以小批次處理
- 每批元組都會(huì)給出一個(gè)唯一ID,稱為事務(wù)ID(transaction id,txid)。如果批次重復(fù)處理,txid也會(huì)相同。
- 狀態(tài)的更新操作是按照元組批次的順序執(zhí)行的。也就是說,在批次2狀態(tài)更新成功之前,不會(huì)進(jìn)行批次3的狀態(tài)更新。
根據(jù)這些特性,就可以通過檢查到該元組的批次是否已被處理,并根據(jù)檢測(cè)結(jié)果采取適當(dāng)?shù)牟僮鞲聽顟B(tài)了。采取的具體操作取決于Spout的類型。Spout有三種類型:“非事務(wù)型(non-transactional)”,“事務(wù)型(transactional)”和“不透明事務(wù)型(opaque transactional)”。對(duì)應(yīng)的容錯(cuò)能力也是三種:“非事務(wù)”,“事務(wù)”和“不透明事務(wù)”。下面來看看Spout的各個(gè)類型及對(duì)應(yīng)的容錯(cuò)能力。
事務(wù)型Spout
Trident是按照批次發(fā)送元組進(jìn)行處理的,每個(gè)批次的元組被賦予唯一的事務(wù)ID。Spout的特性根據(jù)他們所提供容錯(cuò)性保證機(jī)制來決定的,而且這種機(jī)制也會(huì)對(duì)每個(gè)批次發(fā)生作用。事務(wù)型Spout有如下特性:
- 每個(gè)批次的txid不變,對(duì)于一個(gè)特定的txid,重復(fù)執(zhí)行時(shí),它所包含的元組數(shù)據(jù)與第一次完全相同。
- 元組只會(huì)在一個(gè)批次出現(xiàn),不會(huì)重復(fù)(某個(gè)元組只會(huì)出現(xiàn)在一個(gè)批次中,不會(huì)出現(xiàn)在多個(gè)批次中)。
- 每個(gè)元組都會(huì)出現(xiàn)一次(不會(huì)遺漏任何的元組數(shù)據(jù))
這是最簡(jiǎn)單最容易理解的一種Spout類型,數(shù)據(jù)流被分割成固定的批次。storm中有與Kafka集成的事務(wù)型Spout的擴(kuò)展,代碼在這里。
既然事務(wù)型Spout這么簡(jiǎn)單易懂,為什么不在Trident中完全使用事務(wù)型Spout呢?其實(shí)就在于它的容錯(cuò)能力。比如,TransactionalTridentKafkaSpout的工作方式是,同一個(gè)txid的批次中將包含kafka所有分區(qū)的元組。一旦某個(gè)批次發(fā)出后,出現(xiàn)異常,需要重新發(fā)出,就需要完全相同的元組集合才能滿足事務(wù)型Spout要求的語義。但是這個(gè)時(shí)候,kafka某個(gè)節(jié)點(diǎn)異常(節(jié)點(diǎn)關(guān)閉或分區(qū)不可用),就無法獲取完全相同的的一批元組,那整個(gè)拓?fù)渚蜁?huì)應(yīng)為第3條語義(批次按順序執(zhí)行)停止。
這就是要有“不透明事務(wù)型”Spout的原因了,它能夠容忍數(shù)據(jù)源節(jié)點(diǎn)丟失,而且又能保證數(shù)據(jù)恰好被操作一次。
注:對(duì)kafka比較熟悉的應(yīng)該會(huì)想到,如果某一個(gè)topic支持復(fù)制,那即使一個(gè)節(jié)點(diǎn)不可用,還會(huì)有其他復(fù)制節(jié)點(diǎn)頂上,那TransactionalTridentKafkaSpout也能夠避免上面的問題。
下面繼續(xù)看看如何設(shè)計(jì)一個(gè)支持恰好一次特性的“事務(wù)型”Spout語義(簡(jiǎn)單的說就是同一個(gè)txid對(duì)應(yīng)的批次元組數(shù)據(jù)完全一致)的狀態(tài)實(shí)現(xiàn),這種狀態(tài)稱為“事務(wù)型狀態(tài)”。
比如,現(xiàn)在有一個(gè)單詞計(jì)數(shù)的拓?fù)洌枰獙卧~計(jì)數(shù)存儲(chǔ)在key/value數(shù)據(jù)庫中。key是單詞,value中包含單詞數(shù)量。另外,為了確定同一批次元組是否已經(jīng)被執(zhí)行,需要將txid也一同存儲(chǔ)在value中。這樣,當(dāng)需要更新單詞數(shù)量的時(shí)候,先比較txid是否相同,如果相同,就跳過更新。如果不同,就更新計(jì)數(shù)。
考慮這個(gè)為什么它工作的例子。 假設(shè)您正在處理由以下批次元組組成的txid 3:
比如,要處理一個(gè)txid是3的一批元組:
["man"]
["man"]
["dog"]
目前數(shù)據(jù)庫中存儲(chǔ)的數(shù)據(jù)為:
man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
在這個(gè)時(shí)候,發(fā)現(xiàn)“man”對(duì)應(yīng)的txid是1,當(dāng)前的txid是3,就可以更新了。然后“dog”對(duì)應(yīng)的txid是3,說明同一批次的元組數(shù)據(jù)已經(jīng)發(fā)送過了,就不需要更新。從這點(diǎn)可以看出,txid是3的批次元組是重復(fù)發(fā)送的,在更新“dog”數(shù)量后,在更新“man”數(shù)量前,出現(xiàn)了錯(cuò)誤。最后的結(jié)果就是:
man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
不透明事務(wù)型Spout
前面已經(jīng)提過,不透明事務(wù)型Spout不能保證相同txid對(duì)應(yīng)的批次中的元組數(shù)據(jù)完全一致。其特點(diǎn)如下:
- 每個(gè)元組都會(huì)在有且僅有一個(gè)批次中處理成功。
[OpaqueTridentKafkaSpout](http://github.com/apache/storm/tree/v1.1.0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java)具有這種特性,同時(shí)對(duì)kafka節(jié)點(diǎn)異常有很好的容錯(cuò)性。OpaqueTridentKafkaSpout在發(fā)送一個(gè)批次元組的時(shí)候,會(huì)從上次成功之后的位置開始發(fā)送,這樣就能夠保證元組不會(huì)漏發(fā)或重發(fā)。
基于上面的特點(diǎn),不透明事務(wù)型Spout就不同通過txid來直接判斷是否可以跳過狀態(tài)更新,因?yàn)榫哂邢嗤瑃xid的批次中元組可能發(fā)生了變化。
這就需要存儲(chǔ)更多的狀態(tài)信息了,而不僅僅是一個(gè)結(jié)果和一個(gè)txid了,還需要存儲(chǔ)前一個(gè)結(jié)果值。
比如,當(dāng)前批次的計(jì)數(shù)是2,需要進(jìn)行一次狀態(tài)更新,數(shù)據(jù)庫中的數(shù)據(jù)如下:
{
"value": 4,
"prevValue": 1,
"txid": 2
}
如果當(dāng)前的txid是3,與數(shù)據(jù)庫中的不同。在這種情況下,需要將prevValue的值該為value的值,value的值增加2,更新txid為3,最后的結(jié)果就是:
{
"value": 6,
"prevValue": 4,
"txid": 3
}
如果當(dāng)前的txid是2,等于數(shù)據(jù)庫中的txid。因?yàn)閠xid相同,說明上一次txid為2的批次處理失敗,但是本次的元組可能與上一次不同了。這個(gè)時(shí)候,就需要使用本次數(shù)據(jù)覆蓋上次處理結(jié)果。也就是說,prevValue值不變,value的值改為prevValue加2,txid不變,最后的結(jié)果如下:
{
"value": 3,
"prevValue": 1,
"txid": 2
}
這種方式的可行性依賴于Trident的強(qiáng)順序性。也就是說,一旦開始處理一個(gè)新的批次,就不會(huì)重復(fù)執(zhí)行上一個(gè)批次。不透明事務(wù)型Spout保證了不同批次之間沒有重復(fù)的情況,也就是每個(gè)元組只會(huì)在一個(gè)批次中處理成功,所以就可以放心的使用前一個(gè)值與當(dāng)前值覆蓋已存數(shù)據(jù)了。
非事務(wù)型Spout
非事務(wù)型Spout不能為批次提供任何保證。所以可能出現(xiàn)"至多一次"的處理,即在某個(gè)批次處理過程中失敗了,但是不會(huì)在重新處理;也可能提供“至少一次”的處理,即可能會(huì)有多個(gè)批次分別處理某個(gè)元組。也就是沒有辦法實(shí)現(xiàn)“恰好一次”的語義。
不同類型spout和狀態(tài)總結(jié)
下面是不同的spout/狀態(tài)組合是否支持“恰好一次”處理語義:

不透明事務(wù)狀態(tài)有最強(qiáng)的容錯(cuò)性,但是因?yàn)榇鎯?chǔ)txid和兩個(gè)結(jié)果帶來更大的開銷。事務(wù)型狀態(tài)只需要存儲(chǔ)一個(gè)狀態(tài)結(jié)果,但是只對(duì)事務(wù)型Spout有效。非事務(wù)型狀態(tài)要求存儲(chǔ)的數(shù)據(jù)更少,但是不能實(shí)現(xiàn)“恰好一次”的處理語義。
所以在選擇容錯(cuò)與存儲(chǔ)空間中,需要根據(jù)具體的需要選擇合適的組合。
狀態(tài)API
根據(jù)前面來看,“恰好一次”語義的原理有些復(fù)雜,但是作為用戶,并不需要了解這些txid對(duì)比、多值存儲(chǔ),因?yàn)門rident已經(jīng)在State中封裝了所有容錯(cuò)處理邏輯,只需要想下面著用攜帶碼就行:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))
.parallelismHint(6);
所有的不透明事務(wù)狀態(tài)邏輯已經(jīng)封裝在MemcachedState.opaque中,另外,狀態(tài)更新會(huì)自動(dòng)調(diào)整為批次操作,這樣可以減少與數(shù)據(jù)庫之間反復(fù)交互帶來的資源浪費(fèi)。
基本的State接口只有兩個(gè)方法:
public interface State {
void beginCommit(Long txid); // 對(duì)于像DRPC流發(fā)生的partitionPersist這樣的事情,可以是null
void commit(Long txid);
}
前面已經(jīng)說過,狀態(tài)更新開始和結(jié)束時(shí)都會(huì)獲取txid。Trident并不關(guān)心狀態(tài)如何操作,使用哪種方式更新,使用哪種方式讀取。
假如有一個(gè)包含用戶地址信息的定制數(shù)據(jù)庫,需要使用Trident與數(shù)據(jù)庫交互,State擴(kuò)展類中包含對(duì)于用戶信息的getter和setter方法:
public class LocationDB implements State {
public void beginCommit(Long txid) {
}
public void commit(Long txid) {
}
public void setLocation(long userId, String location) {
// 向數(shù)據(jù)庫設(shè)置地址信息
}
public String getLocation(long userId) {
// 從數(shù)據(jù)庫中獲取地址信息
}
}
然后就需要一個(gè)StateFactory來創(chuàng)建Trident所需的State對(duì)象,LocationDB所需的StateFactory大體結(jié)構(gòu)如下:
public class LocationDBFactory implements StateFactory {
public State makeState(Map conf, int partitionIndex, int numPartitions) {
return new LocationDB();
}
}
Trident提供了用于查詢狀態(tài)源的QueryFunction接口,以及更新狀態(tài)源的StateUpdater接口。比如,查詢LocationDB中用戶信息的QueryLocation:
TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
.stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"));
QueryLocation的代碼如下:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
List<String> ret = new ArrayList();
for (TridentTuple input : inputs) {
ret.add(state.getLocation(input.getLong(0)));
}
return ret;
}
public void execute(TridentTuple tuple, String location, TridentCollector collector) {
collector.emit(new Values(location));
}
}
QueryFunction操作分為兩步:首先,Trident會(huì)將收集到的數(shù)據(jù)放在一個(gè)批次中,發(fā)送給batchRetrieve方法。在這個(gè)例子中,batchRetrieve方法收到的是一些用戶id。batchRetrieve會(huì)返回一組與輸入元組長度相同的結(jié)果。輸入元組與輸出結(jié)果中各個(gè)元素是彼此對(duì)應(yīng)的。
從這點(diǎn)來看,上面的LocationDB類并沒有發(fā)揮Trident批處理優(yōu)勢(shì),所以需要盡心改造:
public class LocationDB implements State {
public void beginCommit(Long txid) {
}
public void commit(Long txid) {
}
public void setLocationsBulk(List<Long> userIds, List<String> locations) {
// set locations in bulk
}
public List<String> bulkGetLocations(List<Long> userIds) {
// get locations in bulk
}
}
對(duì)應(yīng)的QueryLocation類如下:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
List<Long> userIds = new ArrayList<Long>();
for (TridentTuple input : inputs) {
userIds.add(input.getLong(0));
}
return state.bulkGetLocations(userIds);
}
public void execute(TridentTuple tuple, String location, TridentCollector collector) {
collector.emit(new Values(location));
}
}
這段代碼大幅減少了數(shù)據(jù)庫操作。
對(duì)于更新狀態(tài),可以使用StateUpdater接口。比如下面的更新操作:
public class LocationUpdater extends BaseStateUpdater<LocationDB> {
public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
List<Long> ids = new ArrayList<Long>();
List<String> locations = new ArrayList<String>();
for (TridentTuple t : tuples) {
ids.add(t.getLong(0));
locations.add(t.getString(1));
}
state.setLocationsBulk(ids, locations);
}
}
對(duì)應(yīng)的更新操作拓?fù)渲芯涂梢允沁@樣:
TridentTopology topology = new TridentTopology();
TridentState locations =
topology.newStream("locations", locationsSpout)
.partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater());
partitionPersist方法會(huì)更新狀態(tài),StateUpdater接口接收一批元組和狀態(tài)信息,然后更新狀態(tài)。上面的LocationUpdater類中僅僅是從元組中抓取用戶id和地址信息,然后對(duì)狀態(tài)執(zhí)行批量處理。然后,partitionPersist會(huì)返回一個(gè)表示更新狀態(tài)后的TridentState對(duì)象。隨后就可以在拓?fù)涞钠渌胤绞褂?code>stateQuery方法查詢狀態(tài)。
在StateUpdater的updateState方法中有一個(gè)TridentCollector參數(shù),這個(gè)對(duì)象是可以將發(fā)送進(jìn)來的元組發(fā)送到一個(gè)新的數(shù)據(jù)流中。在這個(gè)例子中沒有用到。如果需要進(jìn)行比如向數(shù)據(jù)庫更新計(jì)數(shù)值的后續(xù)操作,可以通過TridentState#newValuesStream方法獲取新的數(shù)據(jù)流數(shù)據(jù)。
persistentAggregate
Trident使用一個(gè)名為persistentAggregate的方法更新狀態(tài)。前面已經(jīng)出現(xiàn)過,這里再寫一遍:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
partitionPersist是一個(gè)接收Trident聚合器作為參數(shù)并對(duì)狀態(tài)數(shù)據(jù)進(jìn)行更新的方法,persistentAggregate就是構(gòu)建于partitionPersist上層的一個(gè)編程抽象。在這個(gè)例子中,通過groupBy返回一個(gè)分組數(shù)據(jù),Trident需要一個(gè)實(shí)現(xiàn)MapState接口的對(duì)象。分組字段是狀態(tài)的key,聚合結(jié)果是狀態(tài)的value。MapState接口如下:
public interface MapState<T> extends State {
List<T> multiGet(List<List<Object>> keys);
List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
void multiPut(List<List<Object>> keys, List<T> vals);
}
如果你需要在未分組的數(shù)據(jù)流上執(zhí)行聚合操作時(shí),Trident需要一個(gè)實(shí)現(xiàn)Snapshottable接口的對(duì)象:
public interface Snapshottable<T> extends State {
T get();
T update(ValueUpdater updater);
void set(T o);
}
MemoryMapState 和 MemcachedState 都實(shí)現(xiàn)了這些接口.
實(shí)現(xiàn)MapState接口
實(shí)現(xiàn)MapState接口非常簡(jiǎn)單,Trident幾乎把所有事都做完了。OpaqueMap、TransactionalMap和NonTransactionalMap都分別實(shí)現(xiàn)了各自的容錯(cuò)語義。只需要為這些類提供一個(gè)用于對(duì)不同key/value進(jìn)行批量獲取、批量修改的IBackingMap實(shí)現(xiàn)就行。IBackingMap接口如下:
public interface IBackingMap<T> {
List<T> multiGet(List<List<Object>> keys);
void multiPut(List<List<Object>> keys, List<T> vals);
}
OpaqueMap會(huì)使用OpaqueValue作為vals參數(shù)調(diào)用multiPut方法;TransactionalMap會(huì)使用TransactionalValue作為參數(shù);NonTransactionalMaps會(huì)直接把拓?fù)鋵?duì)象傳入。
Trident還提供了CachedMap類來實(shí)現(xiàn)key/value的自動(dòng)LRU緩存操作。
最后,Trident還提供了SnapshottableMap類,該類通過將全局聚合的結(jié)果存入一個(gè)固定key中的方法將MapState對(duì)象轉(zhuǎn)化為Snapshottable對(duì)象。
可以參考MemcachedState的實(shí)現(xiàn)來了解如何將這些工具結(jié)合在一起來提供一個(gè)高性能的MapState實(shí)現(xiàn)。MemcachedState支持不透明事務(wù)、事務(wù)和非事務(wù)語義。
個(gè)人主頁: http://www.howardliu.cn
個(gè)人博文: storm筆記:Trident狀態(tài)
CSDN主頁: http://blog.csdn.net/liuxinghao
CSDN博文: storm筆記:Trident狀態(tài)