【編者按】本文作者為 Bill Bejeck,主要介紹如何有效利用新的 Apache Kafka 客戶端來滿足數(shù)據(jù)處理需求。文章系國內(nèi) ITOM 管理平臺 OneAPM 編譯呈現(xiàn),以下為正文。
如果你使用的系統(tǒng)需要傳輸大量數(shù)據(jù),就算你沒用過 Kafka,很有可能已經(jīng)聽說過它了。從較高層面來說,Kafka 是一個對錯誤零容忍、分布式的發(fā)布訂閱信息系統(tǒng),設(shè)計目的是提供高速服務(wù)及處理成千上萬信息的能力。Kafka 提供多種應(yīng)用,其中一種是實(shí)時處理。實(shí)時處理通常包括讀取主題(源)的數(shù)據(jù)進(jìn)行分析或轉(zhuǎn)換工作,然后將結(jié)果寫入另一個主題(sink)。目前要完成這些工作,你有以下兩種備選:
- 通過 KafkaConsumer 使用自定義代碼來讀取數(shù)據(jù),然后通過 KafkaProducer 寫出數(shù)據(jù)。
- 使用發(fā)展成熟的流處理框架,例如 Spark Steaming、Flink 或者 Storm。
雖然兩種方法都很好,在某些情況下,如果能有一個處于兩種之間的方法就更好了。為此,《Kafka 改進(jìn)方案》流程提出了一個處理器接口。處理器接口的目的是引入一個客戶端,以便處理 Kafka 消耗的數(shù)據(jù),并將結(jié)果寫入 Kafka。該處理器客戶端包括兩個組成部分:
- 一個“低層級”的處理器,能夠提供數(shù)據(jù)處理、組合處理和本地狀態(tài)存儲的接口
- 一個“高層級”的流 DSL,能夠滿足大部分處理執(zhí)行需求。
接下來將會有一系列文章介紹新的 Kafka 處理器客戶端,本文主要介紹“低層級”的處理器功能。在后續(xù)文章中,將會介紹“高層級”的 DSL 和涉及到其它技術(shù)的高級用例。如想了解處理器客戶端的動機(jī)和目標(biāo)的完整描述,請閱讀《方案》原文。免責(zé)聲明:本人與 Confluent 并無任何關(guān)系,僅僅是 Kafka 的一名熱心用戶。
處理器接口的潛在用例
在筆者看來,處理器接口是個有用工具的原因有以下幾點(diǎn):
- 在處理奇異值時需要發(fā)出通知或警報。換句話說,業(yè)務(wù)需求就是:你不需要建立模型或在其它被處理的數(shù)據(jù)語境中檢查這個值。舉個例子,當(dāng)有人使用虛假信用卡時,你希望能立即收到通知。
- 在進(jìn)行分析時篩選數(shù)據(jù)。理想狀態(tài)下,篩選出中到高比例的數(shù)據(jù)應(yīng)該重新分區(qū),來避免數(shù)據(jù)傾斜問題。分區(qū)操作成本很高,因此通過篩選哪些數(shù)據(jù)要發(fā)送到你的分析群集,就可以省去篩選和重新分區(qū)步驟。
- 你只想對源數(shù)據(jù)的某一部分進(jìn)行分析,同時把所有數(shù)據(jù)傳輸?shù)搅硪粋€存儲空間。
第一個處理器范例
在第一個處理器范例中,筆者要轉(zhuǎn)化虛構(gòu)的客戶購買數(shù)據(jù),并進(jìn)行以下操作:
- 掩藏信用卡號的處理器。
- 用于收集客戶名字和消費(fèi)金額的處理器,這些信息將會用于一個獎勵項(xiàng)目。
- 用于收集郵編和購買商品的處理器,這些信息可以幫助判斷消費(fèi)者的購物模式。
以下是處理器對象的簡要介紹。三個處理器都繼承了 AbstractProcessor 類,該類提供了punctuate 和 close 方法的空操作重寫。在本范例中,只需要實(shí)現(xiàn) process 方法,該行為就會執(zhí)行到每條信息。任務(wù)完成后,將會調(diào)用 context().forward方法,它會將修改后的或者新的鍵值對轉(zhuǎn)發(fā)給下游的消費(fèi)者。(context() 方法會檢索 init 方法在父類中預(yù)置的 context 實(shí)例變量)。然后, context().commit 方法被調(diào)用,提交包括信息偏移在內(nèi)的流當(dāng)前狀態(tài)。
打造處理器圖形
現(xiàn)在需要定義有向無環(huán)圖(DAG)來決定信息的流向。這是關(guān)系到處理器接口是否會“出現(xiàn)偏差”的地方。要打造處理器節(jié)點(diǎn)圖,需要用到拓?fù)錁?gòu)造器(ToplogyBuilder)。雖然筆者的信息屬于 JSON,但還是需要定義 序列化 和 反序列化實(shí)例,因?yàn)樘幚砥靼凑疹愋蛠硖幚?。下面是來?PurchaseProcessorDriver的一部分代碼,它們構(gòu)成了圖形拓?fù)?、序列化程序和反序列化程序?/p>
//Serializers for types used in the processors
JsonDeserializer<Purchase> purchaseJsonDeserializer = new JsonDeserializer<>(Purchase.class);
JsonSerializer<Purchase> purchaseJsonSerializer = new JsonSerializer<>();
JsonSerializer<RewardAccumulator> rewardAccumulatorJsonSerializer = new JsonSerializer<>();
JsonSerializer<PurchasePattern> purchasePatternJsonSerializer = new JsonSerializer<>();
StringDeserializer stringDeserializer = new StringDeserializer();
StringSerializer stringSerializer = new StringSerializer();
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.addSource("SOURCE", stringDeserializer, purchaseJsonDeserializer, "src-topic")
.addProcessor("PROCESS", CreditCardAnonymizer::new, "SOURCE")
.addProcessor("PROCESS2", PurchasePatterns::new, "PROCESS")
.addProcessor("PROCESS3", CustomerRewards::new, "PROCESS")
.addSink("SINK", "patterns", stringSerializer, purchasePatternJsonSerializer, "PROCESS2")
.addSink("SINK2", "rewards",stringSerializer, rewardAccumulatorJsonSerializer, "PROCESS3")
.addSink("SINK3", "purchases", stringSerializer, purchaseJsonSerializer, "PROCESS");
//Use the topologyBuilder and streamingConfig to start the kafka streams process
KafkaStreams streaming = new KafkaStreams(topologyBuilder, streamingConfig);
streaming.start();
There’s several steps here, so let’s do a quick walkthrough
上面的代碼涉及到幾個步驟,以下是其簡介:
- 在第11行有個源節(jié)點(diǎn)叫做“SOURCE”,一個用于鍵的 StringDeserializer 以及生成的 JsonSerializer 來處理 Purchase 對象,和供給源代碼的1到N個主題。本范例中使用的是1個主題“src-topic”的輸入信息。
- 接下來開始添加處理器節(jié)點(diǎn)。
addProcessor方法以一個 Strings 命名,一個 ProcessorSupplier,以及1到N個父節(jié)點(diǎn)。在本范例中,第一個處理器是“SOURCE”節(jié)點(diǎn)的孩子,同時又是后兩個處理器的父親。在這里需要注意 ProcessorSupplier 的句法。該代碼在利用方法處理(method handles),后者可以在 Java8中用作供應(yīng)實(shí)例的 lambda 表達(dá)式。代碼繼續(xù)用同樣的方式定義接下來的兩個處理器。 - 最后添加 sink(輸出主題)來完成信息通道。
addSink方法用到一個 String 名字、主題名字、鍵值序列化程序、值序列化程序和1到 N 個父節(jié)點(diǎn)。在3個addSink方法中,可以看到之前在代碼中創(chuàng)建的 JSONDeserializer 對象。
下面是拓?fù)錁?gòu)造器(TopologyBuilder)的最終結(jié)果圖示:

狀態(tài)處理器
處理器接口并不僅限于處理當(dāng)前收到的值,還能維護(hù)集合、總和過程中使用的狀態(tài),或者連接將要收到的信息。為了充分利用狀態(tài)處理功能,在創(chuàng)建處理拓?fù)鋾r,使用 TopologyBuilder.addStateStore 方法創(chuàng)建一個 KeyValueStore。可以創(chuàng)建兩種存儲區(qū):(1)內(nèi)存式的;(2)利用堆外存儲的 RocksDB 存儲。選擇哪個取決于值的有效時長。對于數(shù)量較多的動態(tài)值,RocksDB 比較適合,對于時效較短的詞條,內(nèi)存式更適合。在指定 String、Integer 或較長的鍵值和值時,Stores 類別提供序列化、反序列化實(shí)例。但是如果使用自定義類型的鍵值或值時,需要提供自定義的序列化程序和反序列化程序。
狀態(tài)處理器范例
在本范例中,將會看到 process 方法和另外兩個重寫方法:init 和 punctuate。process 方法抽取股票代號,更新或創(chuàng)建交易信息,然后把匯總結(jié)果放入存儲區(qū)。
在 init 方法中的操作有:
- 設(shè)置 ProcessorContext 引用。
- 用
ProcessorContext.schedule方法,以便控制punctuate方法的執(zhí)行頻率。在本范例中頻率為10秒1次。 - 給構(gòu)建
TopologyBuilder(下文將會談到)時創(chuàng)建的狀態(tài)存儲區(qū)設(shè)置一個引用。
punctuate 方法會迭代存儲區(qū)內(nèi)的所有值,并且一旦它們在過去11秒內(nèi)更新,StockTransactionSummary 對象就會被發(fā)送給消費(fèi)者。
利用狀態(tài)存儲區(qū)打造一個拓?fù)錁?gòu)造器
跟前面的例子一樣,看處理器代碼只完成了一半的工作。以下是部分源代碼,創(chuàng)建了 TopologyBuilder,其中包括一個 KeyValueStore:
TopologyBuilder builder = new TopologyBuilder();
JsonSerializer<StockTransactionSummary> stockTxnSummarySerializer = new JsonSerializer<>();
JsonDeserializer<StockTransactionSummary> stockTxnSummaryDeserializer = new JsonDeserializer<>(StockTransactionSummary.class);
JsonDeserializer<StockTransaction> stockTxnDeserializer = new JsonDeserializer<>(StockTransaction.class);
JsonSerializer<StockTransaction> stockTxnJsonSerializer = new JsonSerializer<>();
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
builder.addSource("stocks-source", stringDeserializer, stockTxnDeserializer, "stocks")
.addProcessor("summary", StockSummary::new, "stocks-source")
.addStateStore(Stores.create("stock-transactions").withStringKeys()
.withValues(stockTxnSummarySerializer,stockTxnSummaryDeserializer).inMemory().maxEntries(100).build(),"summary")
.addSink("sink", "stocks-out", stringSerializer,stockTxnJsonSerializer,"stocks-source")
.addSink("sink-2", "transaction-summary", stringSerializer, stockTxnSummarySerializer, "summary");
System.out.println("Starting KafkaStreaming");
KafkaStreams streaming = new KafkaStreams(builder, streamingConfig);
streaming.start();
System.out.println("Now started");
這段代碼在創(chuàng)建序列化、非序列化和拓?fù)錁?gòu)造器方面并無不同。但是有一點(diǎn)不同。第13、14行創(chuàng)建了一個內(nèi)存存儲區(qū)(命名為“summary”),供處理器使用。傳到 Stores.create 方法的名字跟前面在處理器 init 方法中用來重寫存儲區(qū)的名字一樣。在指定鍵值時,可以使用便捷的 Stores.withStringKeys() 方法,因?yàn)?Strings 本來就是它支持的類型,不需要提供參數(shù)。但是因?yàn)槭褂昧祟愋突闹担允褂昧?withValues 方法,并提供了序列化和非序列化實(shí)例。
用范例代碼運(yùn)行處理器
本文所示范例可與實(shí)時 Kafka 群集進(jìn)行對比。指導(dǎo)說明參見本文的 github 資源庫。
結(jié)論
目前為止,筆者已經(jīng)介紹了 Kafka 處理器接口的“低層級”部分。希望能讓大家領(lǐng)略到這個新接口能夠給 Kafka 的新老用戶帶來的實(shí)用性和多樣性。在下一篇文章中,筆者將會介紹“高層級”的 DSL 接口,以及連接、時窗功能等問題。最后要強(qiáng)調(diào)的一點(diǎn),就是處理器接口和 Kafka 流還在繼續(xù)開發(fā)中,很快將會有更新。
本文系 OneAPM 工程師整理呈現(xiàn)。OneAPM 能為您提供端到端的應(yīng)用性能解決方案,我們支持所有常見的框架及應(yīng)用服務(wù)器,助您快速發(fā)現(xiàn)系統(tǒng)瓶頸,定位異常根本原因。分鐘級部署,即刻體驗(yàn),性能監(jiān)控從來沒有如此簡單。想閱讀更多技術(shù)文章,請訪問 OneAPM 官方技術(shù)博客。
本文轉(zhuǎn)自 OneAPM 官方博客
原文地址:https://dzone.com/articles/introducing-the-kafka-processor-client