Storm Trident State 三種事務

原文鏈接 譯者:魏勇

Trident 中含有對狀態(tài)化(stateful)的數(shù)據(jù)源進行讀取和寫入操作的一級抽象封裝工具。這個所謂的狀態(tài)(state)既可以保存在拓撲內(nèi)部(保存在內(nèi)存中并通過 HDFS 來實現(xiàn)備份),也可以存入像 Memcached 或者 Cassandra 這樣的外部數(shù)據(jù)庫中。而對于 Trident API 而言,這兩種機制并沒有任何區(qū)別。

Trident 使用一種容錯性的方式實現(xiàn)對 state 的管理,這樣,即使在發(fā)生操作失敗或者重試的情況下狀態(tài)的更新操作仍然是冪等的?;谶@個機制,每條消息都可以看作被恰好處理了一次,然后你就可以很容易地推斷出 Trident 拓撲的狀態(tài)。
State 的更新過程支持多級容錯性保證機制。在討論這一點之前,我們先來看一個例子,這個例子展示了如何實現(xiàn)恰好一次的語義的技術。假如你正在對數(shù)據(jù)流進行一個計數(shù)聚合操作,并打算將計數(shù)結果存入數(shù)據(jù)庫中。在這個例子里,你存入數(shù)據(jù)庫的就是一個對應計數(shù)結果的值,每次處理新 tuple 的時候就會增加這個值。
考慮到可能存在的處理失敗情況,tuple 有可能需要重新處理。這樣就給 state 的更新操作帶來了一個問題(或者其他的副作用)—— 你無法知道當前的這個 tuple 的更新操作是否已經(jīng)處理過了。也許你之前沒有處理過這個 tuple,那么你現(xiàn)在就需要增加計數(shù)結果;也許你之前已經(jīng)處理過 tuple 了并且成功地增加了計數(shù)結果,但是在后續(xù)操作過程中 tuple 的處理失敗了,并由此引發(fā)了 tuple 的重新處理操作,這時你就不能再增加計數(shù)結果了;還有可能你之前在使用這個 tuple 更新數(shù)據(jù)庫的時候出錯了,也就是說計數(shù)值的更新操作并未成功,此時在 tuple 的重新處理過程中你仍然需要更新數(shù)據(jù)庫。
所以說,如果只是向數(shù)據(jù)庫中簡單地存入計數(shù)值,你確實無法知道 tuple 是否已經(jīng)被處理過。因此,你需要一些更多的信息來做決定。Trident 提供了一種支持恰好一次處理的語義,如下所述:
通過小數(shù)據(jù)塊(batch)的方式來處理 tuple(可以參考Trident 教程一文)
為每個 batch 提供一個唯一的 id,這個 id 稱為 “事務 id”(transaction id,txid)。如果需要對 batch 重新處理,這個 batch 上仍然會賦上相同的 txid。
State 的更新操作是按照 batch 的順序進行的。也就是說,在 batch 2 完成處理之前,batch 3 的狀態(tài)更新操作不會進行。

基于這幾個基本性質,你的 State 的實現(xiàn)就可以檢測到 tuple 的 batch 是否已經(jīng)被處理過,并根據(jù)檢測結果選擇合適的 state 更新操作。你具體采用的操作取決于你的輸入 spout 提供的語義,這個語義對每個 batch 都是有效的。有三類支持容錯性的 spout:“非事務型”(non-transactional)、“事務型”(transactional)以及“模糊事務型”(opaque transactional)。接下來我們來分析下每種 spout 類型的容錯性語義。
事務型 spout(Transactional spouts)
記住一點,Trident 是通過小數(shù)據(jù)塊(batch)的方式來處理 tuple 的,而且每個 batch 都會有一個唯一的 txid。spout 的特性是由他們所提供的容錯性保證機制決定的,而且這種機制也會對每個 batch 發(fā)生作用。事務型 spout 包含以下特性:
每個 batch 的 txid 永遠不會改變。對于某個特定的 txid,batch 在執(zhí)行重新處理操作時所處理的 tuple 集和它的第一次處理操作完全相同。
不同 batch 中的 tuple 不會出現(xiàn)重復的情況(某個 tuple 只會出現(xiàn)在一個 batch 中,而不會同時出現(xiàn)在多個 batch 中)。
每個 tuple 都會放入一個 batch 中(處理操作不會遺漏任何的 tuple)。

這是一種很容易理解的 spout,其中的數(shù)據(jù)流會被分解到固定的 batches 中。Storm-contrib 項目中提供了一種基于 Kafka 的事務型 spout 實現(xiàn)。
看到這里,你可能會有這樣的疑問:為什么不在拓撲中完全使用事務型 spout 呢?這個原因很好理解。一方面,有些時候事務型 spout 并不能提供足夠可靠的容錯性保障,所以不需要使用事務型 spout。比如,TransactionalTridentKafkaSpout
的工作方式就是使得帶有某個 txid 的 batch 中包含有來自一個 Kafka topic 的所有 partition 的 tuple。一旦一個 batch 被發(fā)送出去,在將來無論重新發(fā)送這個 batch 多少次,batch 中都會包含有完全相同的 tuple 集,這是由事務型 spout 的語義決定的?,F(xiàn)在假設 TransactionalTridentKafkaSpout
發(fā)送出的某個 batch 處理失敗了,而與此同時,Kafka 的某個節(jié)點因為故障下線了。這時你就無法重新處理之前的 batch 了(因為 Kafka 的節(jié)點故障,Kafka topic 必然有一部分 partition 無法獲取到),這個處理過程也會因此終止。
這就是要有“模糊事務型” spout 的原因了 —— 模糊事務型 spout 支持在數(shù)據(jù)源節(jié)點丟失的情況下仍然可以實現(xiàn)恰好一次的處理語義。我們會在下一節(jié)討論這類 spout。
順便提一點,如果 Kafka 支持數(shù)據(jù)復制,那么就可以放心地使用事務型 spout 提供的容錯性機制了,因為這種情況下某個節(jié)點的故障不會導致數(shù)據(jù)丟失,不過 Kafka 暫時還不支持該特性。(本文的寫作時間應該較早,Kakfa 早就已經(jīng)可以支持復制的機制了 —— 譯者注)。
在討論“模糊事務型” spout 之前,讓我們先來看看如何為事務型 spout 設計一種支持恰好一次語義的 State。這個 State 就稱為 “事務型 state”,它支持對于特定的 txid 永遠只與同一組 tuple 相關聯(lián)的特性。
假如你的拓撲需要計算單詞數(shù),而且你準備將計數(shù)結果存入一個 K-V 型數(shù)據(jù)庫中。這里的 key 就是單詞,value 對應于單詞數(shù)。從上面的討論中你應該已經(jīng)明白了僅僅存儲計數(shù)結果是無法確定某個 batch 中的tuple 是否已經(jīng)被處理過的。所以,現(xiàn)在你應該將 txid 作為一種原子化的值與計數(shù)值一起存入數(shù)據(jù)庫。隨后,在更新計數(shù)值的時候,你就可以將數(shù)據(jù)庫中的 txid 與當前處理的 batch 的 txid 進行比對。如果兩者相同,你就可以跳過更新操作 —— 由于 Trident 的強有序性處理機制,可以確定數(shù)據(jù)庫中的值是對應于當前的 batch 的。如果兩者不同,你就可以放心地增加計數(shù)值。由于一個 batch 的 txid 永遠不會改變,而且 Trident 能夠保證 state 的更新操作完全是按照 batch 的順序進行的,所以,這樣的處理邏輯是完全可行的。
下面來看一個例子。假如你正在處理 txid 3,其中包含有以下幾個 tuple:
["man"]["man"]["dog"]

假如數(shù)據(jù)庫中有以下幾個 key-value 對:
man => [count=3, txid=1]dog => [count=4, txid=3]apple => [count=10, txid=2]

其中與 “man” 相關聯(lián)的 txid 為 1。由于當前處理的 txid 為 3,你就可以確定當前處理的 batch 與數(shù)據(jù)庫中存儲的值無關,這樣你就可以放心地將 “man” 的計數(shù)值加上 2 并更新 txid 為 3。另一方面,由于 “dog” 的 txid 與當前的 txid 相同,所以,“dog” 的計數(shù)是之前已經(jīng)處理過的,現(xiàn)在不能再對數(shù)據(jù)庫中的計數(shù)值進行更新操作。這樣,在結束 txid3 的更新操作之后,數(shù)據(jù)庫中的結果就會變成這樣:
man => [count=5, txid=3]dog => [count=4, txid=3]apple => [count=10, txid=2]

現(xiàn)在我們再來討論一下“模糊事務型” spout。
模糊事務型 spout(Opaque transactional spouts)
前面已經(jīng)提到過,模糊事務型 spout 不能保證一個 txid 對應的 batch 中包含的 tuple 完全一致。模糊事務型 spout 有以下的特性:
每個 tuple 都會通過某個 batch 處理完成。不過,在 tuple 處理失敗的時候,tuple 有可能繼續(xù)在另一個 batch 中完成處理,而不一定是在原先的 batch 中完成處理。

OpaqueTridentKafkaSpout 就具有這樣的特性,同時它對 Kafka 節(jié)點的丟失問題具有很好的容錯性。OpaqueTridentKafkaSpout
在發(fā)送一個 batch 的時候總會總上一個 batch 結束的地方開始發(fā)送新 tuple。這一點可以保證 tuple 不會被遺漏,而且也不會被多個 batch 處理。
不過,模糊事務型 spout 的缺點就在于不能通過 txid 來識別數(shù)據(jù)庫中的 state 是否是已經(jīng)處理過的。這是因為在 state 的更新的過程中,batch 有可能會發(fā)生變化。
在這種情況下,你應該在數(shù)據(jù)庫中存儲更多的 state 信息。除了一個結果值和 txid 之外,你還應該存入前一個結果值。我們再以上面的計數(shù)值的例子來分析以下這個問題。假如你的 batch 的部分計數(shù)值是 “2”,現(xiàn)在你需要應用一個更新操作。假定現(xiàn)在數(shù)據(jù)庫中的值是這樣的:
{ value = 4, prevValue = 1, txid = 2}

情形1:假如當前處理的 txid 為 3,這與數(shù)據(jù)庫中的 txid 不同。這時可以將 “prevValue” 的值設為 “value” 的值,再為 “value” 的值加上部分計數(shù)的結果并更新 txid。執(zhí)行完這一系列操作之后的數(shù)據(jù)庫中的值就會變成這樣:

{ value = 6, prevValue = 4, txid = 3}

情形2:如果當前處理的 txid 為 2,也就是和數(shù)據(jù)庫中存儲的 txid 一致,這種情況下的處理邏輯與上面的 txid 不一致的情況又有所不同。因為此時你會知道數(shù)據(jù)庫中的更新操作是由上一個擁有相同 txid 的batch 做出的。不過那個 batch 有可能與當前的 batch 并不相同,所以你需要忽略它的操作。這個時候,你應該將 “prevValue” 加上 batch 中的部分計數(shù)值來計算新的 “value”。在這個操作之后數(shù)據(jù)庫中的值就會變成這樣:

{ value = 3, prevValue = 1, txid = 2}

這種方法之所以可行是因為 Trident 具有強順序性處理的特性。一旦 Trident 開始處理一個新的 batch 的狀態(tài)更新操作,它永遠不會回到過去的 batch 的處理上。同時,由于模糊事務型 spout 會保證 batch 之間不會存在重復 —— 每個 tuple 只會被某一個 batch 完成處理 —— 所以你可以放心地使用 prevValue 來更新 value。
非事務型 spout(Non-transactional spouts)
非事務型 spout 不能為 batch 提供任何的安全性保證。非事務型 spout 有可能提供一種“至多一次”的處理模型,在這種情況下 batch 處理失敗后 tuple 并不會重新處理;也有可能提供一種“至少一次”的處理模型,在這種情況下可能會有多個 batch 分別處理某個 tuple。總之,此類 spout 不能提供“恰好一次”的語義。
不同類型的 Spout 與 State 的總結
下圖顯示了不同的 spout/state 的組合是否支持恰好一次的消息處理語義:


模糊事務型 state 具有最好的容錯性特征,不過這是以在數(shù)據(jù)庫中存儲更多的內(nèi)容為代價的(一個 txid 和兩個 value)。事務型 state 要求的存儲空間相對較小,但是它的缺點是只對事務型 spout 有效。相對的,非事務型要求的存儲空間最少,但是它也不能提供任何的恰好一次的消息執(zhí)行語義。
你選擇 state 與 spout 的時候必須在容錯性與存儲空間占用之間權衡。可以根據(jù)你的應用的需求來確定哪種組合最適合你。
State API
從上文的描述中你已經(jīng)了解到了恰好一次的消息執(zhí)行語義的原理是多么的復雜。不過作為用戶你并不需要處理這些復雜的 txid 比對、多值存儲等操作,Trident 已經(jīng)在 State 中封裝了所有的容錯性處理邏輯,你只需要像下面這樣寫代碼即可:
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count")) .parallelismHint(6);

所有處理模糊事務型 state 的邏輯已經(jīng)封裝在 MemcachedState.opaque
的調(diào)用中了。另外,狀態(tài)更新都會自動調(diào)整為批處理操作,這樣可以減小與數(shù)據(jù)庫的反復交互的資源損耗。
基本的 State
接口只有兩個方法:
public interface State { void beginCommit(Long txid); // 對于類似于在 DRPC 流上進行 partitionPersist 的操作,此方法可以為空 void commit(Long txid);}

前面已經(jīng)說過,state 更新操作的開始時和結束時都會獲取一個 txid。對于你的 state 怎么工作,你在其中使用什么樣的方法執(zhí)行更新操作,或者使用什么樣的方法從 state 中讀取數(shù)據(jù),Trident 并不關心。
假如你有一個包含有用戶的地址信息的定制數(shù)據(jù)庫,你需要使用 Trident 與該數(shù)據(jù)庫交互。你的 State 的實現(xiàn)就會包含有用于獲取與設置用戶信息的方法,比如下面這樣:
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocation(long userId, String location) { // code to access database and set location } public String getLocation(long userId) { // code to get location from database }}

接著你就可以為 Trident 提供一個 StateFactory 來創(chuàng)建 Trident 任務內(nèi)部的 State 對象的實例。對應于你的數(shù)據(jù)庫(LocationDB)的 StateFactory 大概是這樣的:
public class LocationDBFactory implements StateFactory { public State makeState(Map conf, int partitionIndex, int numPartitions) { return new LocationDB(); } }

Trident 提供了一個用于查詢 state 數(shù)據(jù)源的 QueryFunction
接口,以及一個用于更新 state 數(shù)據(jù)源的 StateUpdater
接口。例如,我們可以寫一個查詢 LocationDB 中的用戶地址信息的 “QueryLocation”。讓我們從你在拓撲中使用這個操作的方式開始。假如在拓撲中需要讀取輸入流中的 userid 信息:
TridentTopology topology = new TridentTopology();TridentState locations = topology.newStaticState(new LocationDBFactory());topology.newStream("myspout", spout) .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))

這里的 QueryLocation
的實現(xiàn)可能是這樣的:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<String> ret = new ArrayList(); for(TridentTuple input: inputs) { ret.add(state.getLocation(input.getLong(0))); } return ret; } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } }

QueryFunction
的執(zhí)行包含兩個步驟。首先,Trident 會將讀取的一些數(shù)據(jù)中匯總為一個 batch 傳入 batchRetrieve 方法中。在這個例子中,batchRetrieve 方法會收到一些用戶 id。然后 batchRetrieve 會返回一個與輸入 tuple 列表大小相同的隊列。結果隊列的第一個元素與第一個輸入 tuple 對應,第二個元素與第二個輸入 tuple 相對應,以此類推。
你會發(fā)現(xiàn)這段代碼并沒有發(fā)揮出 Trident 批處理的優(yōu)勢,因為這段代碼僅僅一次查詢一下 LocationDB。所以,實現(xiàn) LocationDB 的更好的方式應該是這樣的:
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocationsBulk(List<Long> userIds, List<String> locations) { // set locations in bulk } public List<String> bulkGetLocations(List<Long> userIds) { // get locations in bulk }}

然后,你可以這樣實現(xiàn) QueryLocation
方法:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<Long> userIds = new ArrayList<Long>(); for(TridentTuple input: inputs) { userIds.add(input.getLong(0)); } return state.bulkGetLocations(userIds); } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } }

這段代碼大幅減少了域數(shù)據(jù)庫的IO,具有更高的執(zhí)行效率。
你需要使用 StateUpdater
接口來更新 state。下面是一個更新 LocationDB 的地址信息的 StateUpdater 實現(xiàn):
public class LocationUpdater extends BaseStateUpdater<LocationDB> { public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) { List<Long> ids = new ArrayList<Long>(); List<String> locations = new ArrayList<String>(); for(TridentTuple t: tuples) { ids.add(t.getLong(0)); locations.add(t.getString(1)); } state.setLocationsBulk(ids, locations); }}

然后你就可以在 Trident 拓撲中這樣使用這個操作:
TridentTopology topology = new TridentTopology();TridentState locations = topology.newStream("locations", locationsSpout) .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())

partitionPersist
操作會更新 state 數(shù)據(jù)源。StateUpdater
接收 State 和一批 tuple 作為輸入,然后更新這個 State。上面的代碼僅僅從輸入 tuple 中抓取 userid 和 location 信息,然后對 State 執(zhí)行一個批處理更新操作。
在 Trident 拓撲更新 LocationDB 之后,partitionPersist
會返回一個表示更新后狀態(tài)的 TridentState
對象。隨后你就可以在拓撲的其他地方使用 stateQuery
方法對這個 state 執(zhí)行查詢操作。
你也許注意到了 StateUpdater 中有一個 TridentCollector 參數(shù)。發(fā)送到這個 collector 的 tuple 會進入一個“新的數(shù)值流”中。在這個例子里向這個新的流發(fā)送 tuple 并沒有意義,不過如果你需要處理類似于更新數(shù)據(jù)庫中的計數(shù)值這樣的操作,你可以考慮將更新后的技術結果發(fā)送到這個流中??梢酝ㄟ^ TridentState.newValuesStream
方法來獲取新的流的數(shù)據(jù)。
persistentAggregate
Trident 使用一個稱為 persistentAggregate
的方法來更新 State。你已經(jīng)在前面的數(shù)據(jù)流單詞統(tǒng)計的例子里見過了這個方法,這里再寫一遍:
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))

partitionPersist 是一個接收 Trident 聚合器作為參數(shù)并對 state 數(shù)據(jù)源進行更新的方法,persistentAggregate 就是構建于 partitionPersist 上層的一個編程抽象。在這個例子里,由于是一個分組數(shù)據(jù)流(grouped stream),Trident 需要你提供一個實現(xiàn) MapState
接口的 state。被分組的域就是 state 中的 key,而聚合的結果就是 state 中的 value。MapState
接口是這樣的:
public interface MapState<T> extends State { List<T> multiGet(List<List<Object>> keys); List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters); void multiPut(List<List<Object>> keys, List<T> vals);}

而當你在非分組數(shù)據(jù)流上執(zhí)行聚合操作時(全局聚合操作),Trident 需要你提供一個實現(xiàn)了 Snapshottable
接口的對象:
public interface Snapshottable<T> extends State { T get(); T update(ValueUpdater updater); void set(T o);}

MemoryMapStateMemcachedState 都實現(xiàn)了上面兩個接口。
實現(xiàn) Map State 接口
實現(xiàn) MapState
接口非常簡單,Trident 幾乎已經(jīng)為你做好了所有的準備工作。OpaqueMap
、TransactionalMap
、與NonTransactionalMap
類都分別實現(xiàn)了各自的容錯性語義。你只需要為這些類提供一個用于對不同的 key/value 進行 multiGets 與 multiPuts 處理的 IBackingMap 實現(xiàn)類。IBackingMap
接口是這樣的:
public interface IBackingMap<T> { List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals); }

OpaqueMap 會使用 OpaqueValue 作為 vals 參數(shù)來調(diào)用 multiPut 方法,TransactionalMap 會使用 TransactionalValue 作為參數(shù),而 NonTransactionalMap 則直接將拓撲中的對象傳入。
Trident 也提供了一個 CachedMap 用于實現(xiàn) K-V map 的自動 LRU 緩存功能。
最后,Trident 還提供了一個 SnapshottableMap 類,該類通過將全局聚合結果存入一個固定的 key 中的方法將 MapState 對象轉化為一個 Snapshottable 對象。
可以參考 MemcachedState 的實現(xiàn)來了解如何將這些工具結合到一起來提供一個高性能的 MapState。MemcachedState
支持選擇模糊事務型、事務型或者非事務型語義。
原創(chuàng)文章,轉載請注明: 轉載自并發(fā)編程網(wǎng) – ifeve.com本文鏈接地址: Apache Storm 官方文檔 —— Trident State

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容