storm筆記:Trident狀態(tài)

storm筆記:Trident應(yīng)用中說了下Trident的使用,這里說下Trident幾種狀態(tài)的變化及其對(duì)應(yīng)API的使用。

本文內(nèi)容來自Trident State,部分內(nèi)容根據(jù)實(shí)際情況做出修改。

Trident中有對(duì)狀態(tài)數(shù)據(jù)進(jìn)行讀取和寫入操作的一流抽象工具。狀態(tài)既可以保存在拓?fù)鋬?nèi)部,比如保存在內(nèi)容中并由HDFS存儲(chǔ),也可以通過外部存儲(chǔ)(比如Memcached或Cassandra)存儲(chǔ)在數(shù)據(jù)庫中。而對(duì)于Trident的API而言,這兩種機(jī)制沒有任何區(qū)別。

Trident以容錯(cuò)的方式管理狀態(tài),以便在重試或失敗時(shí)的狀態(tài)更新是冪等的。在大數(shù)據(jù)處理中,數(shù)據(jù)處理的冪等性是非常重要的一個(gè)指標(biāo),這樣能夠保證每個(gè)消息即使處理了多次,結(jié)果也像是只處理了一次一樣。

在進(jìn)行狀態(tài)更新時(shí)可能需要各種級(jí)別的容錯(cuò)能力,在這之前,我們來看一個(gè)例子說明實(shí)現(xiàn)“恰好一次”語義所需的技巧。比如,正在對(duì)流中的數(shù)據(jù)進(jìn)行計(jì)數(shù)聚合操作,每次處理新的元組時(shí),都會(huì)將運(yùn)行的計(jì)數(shù)結(jié)果存儲(chǔ)在數(shù)據(jù)庫中。

如果發(fā)生故障時(shí),元組將重新執(zhí)行計(jì)數(shù)操作。這就會(huì)在執(zhí)行狀態(tài)更新時(shí)出現(xiàn)問題,因?yàn)檫@個(gè)時(shí)候不知道是不是已經(jīng)更新過該元組狀態(tài)。也許還沒有處理該元組數(shù)據(jù),這個(gè)時(shí)候就需要增加計(jì)數(shù)。也許已經(jīng)處理該元組,并成功增加計(jì)數(shù),但是在下一步的時(shí)候出現(xiàn)問題,這種情況下,就不應(yīng)該增加計(jì)數(shù)。也有可能是處理元組正常,更新計(jì)數(shù)是異常,這個(gè)時(shí)候就需要更新計(jì)數(shù)。

所以說,如果只是在數(shù)據(jù)庫中存儲(chǔ)計(jì)數(shù)信息,就不知道元組是否已經(jīng)處理過。因此,就需要更多的信息作為輔助。Trident提供了下面三個(gè)性質(zhì),來實(shí)現(xiàn)“恰好一次”的處理:

  1. 元組都是以小批次處理
  2. 每批元組都會(huì)給出一個(gè)唯一ID,稱為事務(wù)ID(transaction id,txid)。如果批次重復(fù)處理,txid也會(huì)相同。
  3. 狀態(tài)的更新操作是按照元組批次的順序執(zhí)行的。也就是說,在批次2狀態(tài)更新成功之前,不會(huì)進(jìn)行批次3的狀態(tài)更新。

根據(jù)這些特性,就可以通過檢查到該元組的批次是否已被處理,并根據(jù)檢測(cè)結(jié)果采取適當(dāng)?shù)牟僮鞲聽顟B(tài)了。采取的具體操作取決于Spout的類型。Spout有三種類型:“非事務(wù)型(non-transactional)”,“事務(wù)型(transactional)”和“不透明事務(wù)型(opaque transactional)”。對(duì)應(yīng)的容錯(cuò)能力也是三種:“非事務(wù)”,“事務(wù)”和“不透明事務(wù)”。下面來看看Spout的各個(gè)類型及對(duì)應(yīng)的容錯(cuò)能力。

事務(wù)型Spout

Trident是按照批次發(fā)送元組進(jìn)行處理的,每個(gè)批次的元組被賦予唯一的事務(wù)ID。Spout的特性根據(jù)他們所提供容錯(cuò)性保證機(jī)制來決定的,而且這種機(jī)制也會(huì)對(duì)每個(gè)批次發(fā)生作用。事務(wù)型Spout有如下特性:

  1. 每個(gè)批次的txid不變,對(duì)于一個(gè)特定的txid,重復(fù)執(zhí)行時(shí),它所包含的元組數(shù)據(jù)與第一次完全相同。
  2. 元組只會(huì)在一個(gè)批次出現(xiàn),不會(huì)重復(fù)(某個(gè)元組只會(huì)出現(xiàn)在一個(gè)批次中,不會(huì)出現(xiàn)在多個(gè)批次中)。
  3. 每個(gè)元組都會(huì)出現(xiàn)一次(不會(huì)遺漏任何的元組數(shù)據(jù))

這是最簡(jiǎn)單最容易理解的一種Spout類型,數(shù)據(jù)流被分割成固定的批次。storm中有與Kafka集成的事務(wù)型Spout的擴(kuò)展,代碼在這里。

既然事務(wù)型Spout這么簡(jiǎn)單易懂,為什么不在Trident中完全使用事務(wù)型Spout呢?其實(shí)就在于它的容錯(cuò)能力。比如,TransactionalTridentKafkaSpout的工作方式是,同一個(gè)txid的批次中將包含kafka所有分區(qū)的元組。一旦某個(gè)批次發(fā)出后,出現(xiàn)異常,需要重新發(fā)出,就需要完全相同的元組集合才能滿足事務(wù)型Spout要求的語義。但是這個(gè)時(shí)候,kafka某個(gè)節(jié)點(diǎn)異常(節(jié)點(diǎn)關(guān)閉或分區(qū)不可用),就無法獲取完全相同的的一批元組,那整個(gè)拓?fù)渚蜁?huì)應(yīng)為第3條語義(批次按順序執(zhí)行)停止。

這就是要有“不透明事務(wù)型”Spout的原因了,它能夠容忍數(shù)據(jù)源節(jié)點(diǎn)丟失,而且又能保證數(shù)據(jù)恰好被操作一次。

注:對(duì)kafka比較熟悉的應(yīng)該會(huì)想到,如果某一個(gè)topic支持復(fù)制,那即使一個(gè)節(jié)點(diǎn)不可用,還會(huì)有其他復(fù)制節(jié)點(diǎn)頂上,那TransactionalTridentKafkaSpout也能夠避免上面的問題。

下面繼續(xù)看看如何設(shè)計(jì)一個(gè)支持恰好一次特性的“事務(wù)型”Spout語義(簡(jiǎn)單的說就是同一個(gè)txid對(duì)應(yīng)的批次元組數(shù)據(jù)完全一致)的狀態(tài)實(shí)現(xiàn),這種狀態(tài)稱為“事務(wù)型狀態(tài)”。

比如,現(xiàn)在有一個(gè)單詞計(jì)數(shù)的拓?fù)洌枰獙卧~計(jì)數(shù)存儲(chǔ)在key/value數(shù)據(jù)庫中。key是單詞,value中包含單詞數(shù)量。另外,為了確定同一批次元組是否已經(jīng)被執(zhí)行,需要將txid也一同存儲(chǔ)在value中。這樣,當(dāng)需要更新單詞數(shù)量的時(shí)候,先比較txid是否相同,如果相同,就跳過更新。如果不同,就更新計(jì)數(shù)。

考慮這個(gè)為什么它工作的例子。 假設(shè)您正在處理由以下批次元組組成的txid 3:

比如,要處理一個(gè)txid是3的一批元組:

["man"]
["man"]
["dog"]

目前數(shù)據(jù)庫中存儲(chǔ)的數(shù)據(jù)為:

man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

在這個(gè)時(shí)候,發(fā)現(xiàn)“man”對(duì)應(yīng)的txid是1,當(dāng)前的txid是3,就可以更新了。然后“dog”對(duì)應(yīng)的txid是3,說明同一批次的元組數(shù)據(jù)已經(jīng)發(fā)送過了,就不需要更新。從這點(diǎn)可以看出,txid是3的批次元組是重復(fù)發(fā)送的,在更新“dog”數(shù)量后,在更新“man”數(shù)量前,出現(xiàn)了錯(cuò)誤。最后的結(jié)果就是:

man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

不透明事務(wù)型Spout

前面已經(jīng)提過,不透明事務(wù)型Spout不能保證相同txid對(duì)應(yīng)的批次中的元組數(shù)據(jù)完全一致。其特點(diǎn)如下:

  1. 每個(gè)元組都會(huì)在有且僅有一個(gè)批次中處理成功。

[OpaqueTridentKafkaSpout](http://github.com/apache/storm/tree/v1.1.0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java)具有這種特性,同時(shí)對(duì)kafka節(jié)點(diǎn)異常有很好的容錯(cuò)性。OpaqueTridentKafkaSpout在發(fā)送一個(gè)批次元組的時(shí)候,會(huì)從上次成功之后的位置開始發(fā)送,這樣就能夠保證元組不會(huì)漏發(fā)或重發(fā)。

基于上面的特點(diǎn),不透明事務(wù)型Spout就不同通過txid來直接判斷是否可以跳過狀態(tài)更新,因?yàn)榫哂邢嗤瑃xid的批次中元組可能發(fā)生了變化。

這就需要存儲(chǔ)更多的狀態(tài)信息了,而不僅僅是一個(gè)結(jié)果和一個(gè)txid了,還需要存儲(chǔ)前一個(gè)結(jié)果值。

比如,當(dāng)前批次的計(jì)數(shù)是2,需要進(jìn)行一次狀態(tài)更新,數(shù)據(jù)庫中的數(shù)據(jù)如下:

{
  "value": 4,
  "prevValue": 1,
  "txid": 2
}

如果當(dāng)前的txid是3,與數(shù)據(jù)庫中的不同。在這種情況下,需要將prevValue的值該為value的值,value的值增加2,更新txid為3,最后的結(jié)果就是:

{
  "value": 6,
  "prevValue": 4,
  "txid": 3
}

如果當(dāng)前的txid是2,等于數(shù)據(jù)庫中的txid。因?yàn)閠xid相同,說明上一次txid為2的批次處理失敗,但是本次的元組可能與上一次不同了。這個(gè)時(shí)候,就需要使用本次數(shù)據(jù)覆蓋上次處理結(jié)果。也就是說,prevValue值不變,value的值改為prevValue加2,txid不變,最后的結(jié)果如下:

{
  "value": 3,
  "prevValue": 1,
  "txid": 2
}

這種方式的可行性依賴于Trident的強(qiáng)順序性。也就是說,一旦開始處理一個(gè)新的批次,就不會(huì)重復(fù)執(zhí)行上一個(gè)批次。不透明事務(wù)型Spout保證了不同批次之間沒有重復(fù)的情況,也就是每個(gè)元組只會(huì)在一個(gè)批次中處理成功,所以就可以放心的使用前一個(gè)值與當(dāng)前值覆蓋已存數(shù)據(jù)了。

非事務(wù)型Spout

非事務(wù)型Spout不能為批次提供任何保證。所以可能出現(xiàn)"至多一次"的處理,即在某個(gè)批次處理過程中失敗了,但是不會(huì)在重新處理;也可能提供“至少一次”的處理,即可能會(huì)有多個(gè)批次分別處理某個(gè)元組。也就是沒有辦法實(shí)現(xiàn)“恰好一次”的語義。

不同類型spout和狀態(tài)總結(jié)

下面是不同的spout/狀態(tài)組合是否支持“恰好一次”處理語義:

Spouts vs States

不透明事務(wù)狀態(tài)有最強(qiáng)的容錯(cuò)性,但是因?yàn)榇鎯?chǔ)txid和兩個(gè)結(jié)果帶來更大的開銷。事務(wù)型狀態(tài)只需要存儲(chǔ)一個(gè)狀態(tài)結(jié)果,但是只對(duì)事務(wù)型Spout有效。非事務(wù)型狀態(tài)要求存儲(chǔ)的數(shù)據(jù)更少,但是不能實(shí)現(xiàn)“恰好一次”的處理語義。

所以在選擇容錯(cuò)與存儲(chǔ)空間中,需要根據(jù)具體的需要選擇合適的組合。

狀態(tài)API

根據(jù)前面來看,“恰好一次”語義的原理有些復(fù)雜,但是作為用戶,并不需要了解這些txid對(duì)比、多值存儲(chǔ),因?yàn)門rident已經(jīng)在State中封裝了所有容錯(cuò)處理邏輯,只需要想下面著用攜帶碼就行:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =
        topology.newStream("spout1", spout)
                .each(new Fields("sentence"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))
                .parallelismHint(6);

所有的不透明事務(wù)狀態(tài)邏輯已經(jīng)封裝在MemcachedState.opaque中,另外,狀態(tài)更新會(huì)自動(dòng)調(diào)整為批次操作,這樣可以減少與數(shù)據(jù)庫之間反復(fù)交互帶來的資源浪費(fèi)。

基本的State接口只有兩個(gè)方法:

public interface State {
    void beginCommit(Long txid); // 對(duì)于像DRPC流發(fā)生的partitionPersist這樣的事情,可以是null
    void commit(Long txid);
}

前面已經(jīng)說過,狀態(tài)更新開始和結(jié)束時(shí)都會(huì)獲取txid。Trident并不關(guān)心狀態(tài)如何操作,使用哪種方式更新,使用哪種方式讀取。

假如有一個(gè)包含用戶地址信息的定制數(shù)據(jù)庫,需要使用Trident與數(shù)據(jù)庫交互,State擴(kuò)展類中包含對(duì)于用戶信息的getter和setter方法:

public class LocationDB implements State {
    public void beginCommit(Long txid) {
    }

    public void commit(Long txid) {
    }

    public void setLocation(long userId, String location) {
        // 向數(shù)據(jù)庫設(shè)置地址信息
    }

    public String getLocation(long userId) {
        // 從數(shù)據(jù)庫中獲取地址信息
    }
}

然后就需要一個(gè)StateFactory來創(chuàng)建Trident所需的State對(duì)象,LocationDB所需的StateFactory大體結(jié)構(gòu)如下:

public class LocationDBFactory implements StateFactory {
    public State makeState(Map conf, int partitionIndex, int numPartitions) {
        return new LocationDB();
    }
}

Trident提供了用于查詢狀態(tài)源的QueryFunction接口,以及更新狀態(tài)源的StateUpdater接口。比如,查詢LocationDB中用戶信息的QueryLocation

TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
        .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"));

QueryLocation的代碼如下:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<String> ret = new ArrayList();
        for (TridentTuple input : inputs) {
            ret.add(state.getLocation(input.getLong(0)));
        }
        return ret;
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }
}

QueryFunction操作分為兩步:首先,Trident會(huì)將收集到的數(shù)據(jù)放在一個(gè)批次中,發(fā)送給batchRetrieve方法。在這個(gè)例子中,batchRetrieve方法收到的是一些用戶id。batchRetrieve會(huì)返回一組與輸入元組長度相同的結(jié)果。輸入元組與輸出結(jié)果中各個(gè)元素是彼此對(duì)應(yīng)的。

從這點(diǎn)來看,上面的LocationDB類并沒有發(fā)揮Trident批處理優(yōu)勢(shì),所以需要盡心改造:

public class LocationDB implements State {
    public void beginCommit(Long txid) {
    }

    public void commit(Long txid) {
    }

    public void setLocationsBulk(List<Long> userIds, List<String> locations) {
        // set locations in bulk
    }

    public List<String> bulkGetLocations(List<Long> userIds) {
        // get locations in bulk
    }
}

對(duì)應(yīng)的QueryLocation類如下:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<Long> userIds = new ArrayList<Long>();
        for (TridentTuple input : inputs) {
            userIds.add(input.getLong(0));
        }
        return state.bulkGetLocations(userIds);
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }
}

這段代碼大幅減少了數(shù)據(jù)庫操作。

對(duì)于更新狀態(tài),可以使用StateUpdater接口。比如下面的更新操作:

public class LocationUpdater extends BaseStateUpdater<LocationDB> {
    public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
        List<Long> ids = new ArrayList<Long>();
        List<String> locations = new ArrayList<String>();
        for (TridentTuple t : tuples) {
            ids.add(t.getLong(0));
            locations.add(t.getString(1));
        }
        state.setLocationsBulk(ids, locations);
    }
}

對(duì)應(yīng)的更新操作拓?fù)渲芯涂梢允沁@樣:

TridentTopology topology = new TridentTopology();
TridentState locations =
        topology.newStream("locations", locationsSpout)
                .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater());

partitionPersist方法會(huì)更新狀態(tài),StateUpdater接口接收一批元組和狀態(tài)信息,然后更新狀態(tài)。上面的LocationUpdater類中僅僅是從元組中抓取用戶id和地址信息,然后對(duì)狀態(tài)執(zhí)行批量處理。然后,partitionPersist會(huì)返回一個(gè)表示更新狀態(tài)后的TridentState對(duì)象。隨后就可以在拓?fù)涞钠渌胤绞褂?code>stateQuery方法查詢狀態(tài)。

StateUpdaterupdateState方法中有一個(gè)TridentCollector參數(shù),這個(gè)對(duì)象是可以將發(fā)送進(jìn)來的元組發(fā)送到一個(gè)新的數(shù)據(jù)流中。在這個(gè)例子中沒有用到。如果需要進(jìn)行比如向數(shù)據(jù)庫更新計(jì)數(shù)值的后續(xù)操作,可以通過TridentState#newValuesStream方法獲取新的數(shù)據(jù)流數(shù)據(jù)。

persistentAggregate

Trident使用一個(gè)名為persistentAggregate的方法更新狀態(tài)。前面已經(jīng)出現(xiàn)過,這里再寫一遍:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =
        topology.newStream("spout1", spout)
                .each(new Fields("sentence"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

partitionPersist是一個(gè)接收Trident聚合器作為參數(shù)并對(duì)狀態(tài)數(shù)據(jù)進(jìn)行更新的方法,persistentAggregate就是構(gòu)建于partitionPersist上層的一個(gè)編程抽象。在這個(gè)例子中,通過groupBy返回一個(gè)分組數(shù)據(jù),Trident需要一個(gè)實(shí)現(xiàn)MapState接口的對(duì)象。分組字段是狀態(tài)的key,聚合結(jié)果是狀態(tài)的value。MapState接口如下:

public interface MapState<T> extends State {
    List<T> multiGet(List<List<Object>> keys);
    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
    void multiPut(List<List<Object>> keys, List<T> vals);
}

如果你需要在未分組的數(shù)據(jù)流上執(zhí)行聚合操作時(shí),Trident需要一個(gè)實(shí)現(xiàn)Snapshottable接口的對(duì)象:

public interface Snapshottable<T> extends State {
    T get();
    T update(ValueUpdater updater);
    void set(T o);
}

MemoryMapStateMemcachedState 都實(shí)現(xiàn)了這些接口.

實(shí)現(xiàn)MapState接口

實(shí)現(xiàn)MapState接口非常簡(jiǎn)單,Trident幾乎把所有事都做完了。OpaqueMap、TransactionalMapNonTransactionalMap都分別實(shí)現(xiàn)了各自的容錯(cuò)語義。只需要為這些類提供一個(gè)用于對(duì)不同key/value進(jìn)行批量獲取、批量修改的IBackingMap實(shí)現(xiàn)就行。IBackingMap接口如下:

public interface IBackingMap<T> {
    List<T> multiGet(List<List<Object>> keys);
    void multiPut(List<List<Object>> keys, List<T> vals);
}

OpaqueMap會(huì)使用OpaqueValue作為vals參數(shù)調(diào)用multiPut方法;TransactionalMap會(huì)使用TransactionalValue作為參數(shù);NonTransactionalMaps會(huì)直接把拓?fù)鋵?duì)象傳入。

Trident還提供了CachedMap類來實(shí)現(xiàn)key/value的自動(dòng)LRU緩存操作。

最后,Trident還提供了SnapshottableMap類,該類通過將全局聚合的結(jié)果存入一個(gè)固定key中的方法將MapState對(duì)象轉(zhuǎn)化為Snapshottable對(duì)象。

可以參考MemcachedState的實(shí)現(xiàn)來了解如何將這些工具結(jié)合在一起來提供一個(gè)高性能的MapState實(shí)現(xiàn)。MemcachedState支持不透明事務(wù)、事務(wù)和非事務(wù)語義。


個(gè)人主頁: http://www.howardliu.cn

個(gè)人博文: storm筆記:Trident狀態(tài)

CSDN主頁: http://blog.csdn.net/liuxinghao

CSDN博文: storm筆記:Trident狀態(tài)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容