flink維表關(guān)聯(lián)系列之維表服務(wù)與Flink異步IO

維表關(guān)聯(lián)系列目錄:
一、維表服務(wù)與Flink異步IO
二、Mysql維表關(guān)聯(lián):全量加載
三、Hbase維表關(guān)聯(lián):LRU策略
四、Redis維表關(guān)聯(lián):實時查詢
五、kafka維表關(guān)聯(lián):廣播方式
六、自定義異步查詢

一、維表服務(wù)

維度或者是維表概念熟知應(yīng)該從數(shù)據(jù)倉庫維度建模開始了解的,區(qū)別于事實表業(yè)務(wù)真實發(fā)生的數(shù)據(jù),通常用來表示業(yè)務(wù)屬性,比喻訂單業(yè)務(wù)中,商品屬性、商家屬性都可以稱之為維度表。在flink 流處理實時分析中或者實時數(shù)倉中,同樣需要使用維表來完成一些數(shù)據(jù)過濾或者字段補齊操作,但是我們所需要的維度數(shù)據(jù)通常存儲在Mysql/Redis/Hbase/Es這樣的外部數(shù)據(jù)庫中,并且可能是會隨時變動的,根據(jù)業(yè)務(wù)要求數(shù)據(jù)的時效性,需要不同程度的感知維表數(shù)據(jù)的變化,在實際使用中常常會有以下幾種方案可供選擇:

  1. 在維度數(shù)據(jù)量比較小并且業(yè)務(wù)要求的時效性不高,可以定時全量加載維度數(shù)據(jù)到內(nèi)存中,直接從內(nèi)存中查詢維度數(shù)據(jù);

  2. 在維度數(shù)據(jù)量比較大并且業(yè)務(wù)要求的時效性不高,這時候全量加載就會撐爆內(nèi)存,可以使用LRU的緩存策略,當(dāng)緩存的維度數(shù)據(jù)達到一定大小,采用淘汰最近最少使用的數(shù)據(jù),同時還可以設(shè)置數(shù)據(jù)的過期時間;

  3. 業(yè)務(wù)要求數(shù)據(jù)時效性比較高,那么就需要flink實時查詢,這個時候需要注意外部存儲所能承受的QPS;

  4. 最后一種方案直接將維度數(shù)據(jù)發(fā)送到kafka中,flink任務(wù)消費kafka的維度數(shù)據(jù),然后使用廣播方式將維度數(shù)據(jù)廣播到每一個處理task中,這種方式同樣要求數(shù)據(jù)量比較小

二、Flink 異步IO

flink異步IO用于對外部訪問的一種優(yōu)化手段,可參考http://wuchong.me/blog/2017/05/17/flink-internals-async-io 阿里云邪大牛對flink 異步IO的介紹,里面詳細介紹了異步IO相對于同步處理的性能優(yōu)化與有序、無序原理實現(xiàn),在這里分析一些源碼幫助理解。

1.  `@Override`

2.  `public  void processElement(StreamRecord<IN> element)  throws  Exception  {`

3.  `final  StreamRecordQueueEntry<OUT> streamRecordBufferEntry =  new  StreamRecordQueueEntry<>(element);`

4.  `if  (timeout >  0L)  {`

5.  `// register a timeout for this AsyncStreamRecordBufferEntry`

6.  `long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();`

8.  `final  ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(`

9.  `timeoutTimestamp,`

10.  `new  ProcessingTimeCallback()  {`

11.  `@Override`

12.  `public  void onProcessingTime(long timestamp)  throws  Exception  {`

13.  `userFunction.timeout(element.getValue(), streamRecordBufferEntry);`

14.  `}`

15.  `});`

16.  `// Cancel the timer once we've completed the stream record buffer entry. This will remove`

17.  `// the register trigger task`

18.  `streamRecordBufferEntry.onComplete(`

19.  `(StreamElementQueueEntry<Collection<OUT>> value)  ->  {`

20.  `timerFuture.cancel(true);`

21.  `},`

22.  `executor);`

23.  `}`

24.  `addAsyncBufferEntry(streamRecordBufferEntry);`

25.  `userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);`

26.  `}`

代碼入口是AsyncWaitOperator算子processElement 方法,表示處理元素方法,每個處理的元素都會被封裝成為StreamRecordQueueEntry對象,該對象會被放入內(nèi)部有序或者無序的隊列中,Emitter則負責(zé)從隊列里面取數(shù)據(jù),那么如何判斷已經(jīng)進入的元素已經(jīng)完成異步IO操作了呢?答案就在StreamRecordQueueEntry里面:

  1. StreamRecordQueueEntry持有CompletableFuture對象,CompletableFuture是java8 提供了一個更強大的異步調(diào)用處理類,提供了異步獲取結(jié)果無需阻塞、多階段關(guān)聯(lián)異步調(diào)用。具體用法可參考https://www.cnblogs.com/cjsblog/p/9267163.html

  2. StreamRecordQueueEntry對象添加到隊列的同時執(zhí)行其onComplete方法,內(nèi)部調(diào)用的是CompletableFuture的onComplete,表示在完成異步IO的回調(diào)方法,回調(diào)方法是一個信號燈釋放操作,會通知Emitter可以從隊列中讀取數(shù)據(jù)了

  3. StreamRecordQueueEntry對象會被作為AsyncFunction函數(shù)的asyncInvoke方法的入?yún)?,在這個方法里面需要使用外部存儲異步客戶端或者使用線程池中執(zhí)行作為異步客戶端去查詢數(shù)據(jù)并且調(diào)用其complete方法,實際上也就是調(diào)用StreamRecordQueueEntry對象中complete方法,那么就會觸發(fā)之前注冊的onComplete回調(diào)方法完成后續(xù)操作

在AsyncFunction函數(shù)中還有一個timeout方法,在異步調(diào)用超時的情況下會被觸發(fā)。接下來看下其實現(xiàn)原理:

  1. 在processElement方法里面timeout>0的邏輯里面,通過flink提供的定時機制注冊了一個ProcessingTimeCallback的回調(diào),那么在超過timout時間就會調(diào)用其onProcessingTime方法,在onProcessingTime方法中會調(diào)用AsyncFunction中timeout方法

  2. AsyncFunction中timeout方法中調(diào)用了ResultFuture對象(實際上就是StreamRecordQueueEntry對象)中CompletableFuture的completeExceptionally方法,那么檢測到該CompletableFuture還是處于uncomplete的狀態(tài)就會拋出異常

  3. 在timeout>0的邏輯里面還有一個調(diào)用StreamRecordQueueEntry對象的onComplete回調(diào)方法,在其CompletableFuture完成時會調(diào)用cancel 取消超時回調(diào)。

在AsyncFunction函數(shù)中默認timeout方法僅僅是會拋出Async function call has timed out.異常,我們也可以重寫該方法,獲取更多的信息。

image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • title: 論事件驅(qū)動與異步IOtag: 事件驅(qū)動 異步IOcategories: notes 轉(zhuǎn)載自人云思云 ...
    cmustard閱讀 1,029評論 0 0
  • 架構(gòu) Apache Flink是一個框架和分布式處理引擎,用于對無界和有界數(shù)據(jù)流進行有狀態(tài)計算。Flink設(shè)計為在...
    盜夢者_56f2閱讀 38,048評論 0 6
  • 前言 前端工程師因為需要操縱Ajax(Ajax的A就是Asynchronous的意思),因此,是最了解異步IO的人...
    白昔月閱讀 4,087評論 1 8
  • 打從記事起,就覺得我對物的揣摩,強于對人性的參透。比如,我很小就聽得出生完蛋的雞與沒生蛋的雞叫聲不同,卻怎么也猜不...
    鶴舞松泉閱讀 516評論 0 4
  • 嬰兒胃容量大小和排泄特點 要想知道寶寶需要吃多少,就得先了解嬰兒胃的大小和排泄特點: 胃的大?。?嬰兒的生理胃容量...
    bcd66852a3e6閱讀 11,423評論 0 0

友情鏈接更多精彩內(nèi)容