維表關(guān)聯(lián)系列目錄:
一、維表服務(wù)與Flink異步IO
二、Mysql維表關(guān)聯(lián):全量加載
三、Hbase維表關(guān)聯(lián):LRU策略
四、Redis維表關(guān)聯(lián):實時查詢
五、kafka維表關(guān)聯(lián):廣播方式
六、自定義異步查詢
一、維表服務(wù)
維度或者是維表概念熟知應(yīng)該從數(shù)據(jù)倉庫維度建模開始了解的,區(qū)別于事實表業(yè)務(wù)真實發(fā)生的數(shù)據(jù),通常用來表示業(yè)務(wù)屬性,比喻訂單業(yè)務(wù)中,商品屬性、商家屬性都可以稱之為維度表。在flink 流處理實時分析中或者實時數(shù)倉中,同樣需要使用維表來完成一些數(shù)據(jù)過濾或者字段補齊操作,但是我們所需要的維度數(shù)據(jù)通常存儲在Mysql/Redis/Hbase/Es這樣的外部數(shù)據(jù)庫中,并且可能是會隨時變動的,根據(jù)業(yè)務(wù)要求數(shù)據(jù)的時效性,需要不同程度的感知維表數(shù)據(jù)的變化,在實際使用中常常會有以下幾種方案可供選擇:
在維度數(shù)據(jù)量比較小并且業(yè)務(wù)要求的時效性不高,可以定時全量加載維度數(shù)據(jù)到內(nèi)存中,直接從內(nèi)存中查詢維度數(shù)據(jù);
在維度數(shù)據(jù)量比較大并且業(yè)務(wù)要求的時效性不高,這時候全量加載就會撐爆內(nèi)存,可以使用LRU的緩存策略,當(dāng)緩存的維度數(shù)據(jù)達到一定大小,采用淘汰最近最少使用的數(shù)據(jù),同時還可以設(shè)置數(shù)據(jù)的過期時間;
業(yè)務(wù)要求數(shù)據(jù)時效性比較高,那么就需要flink實時查詢,這個時候需要注意外部存儲所能承受的QPS;
最后一種方案直接將維度數(shù)據(jù)發(fā)送到kafka中,flink任務(wù)消費kafka的維度數(shù)據(jù),然后使用廣播方式將維度數(shù)據(jù)廣播到每一個處理task中,這種方式同樣要求數(shù)據(jù)量比較小
二、Flink 異步IO
flink異步IO用于對外部訪問的一種優(yōu)化手段,可參考http://wuchong.me/blog/2017/05/17/flink-internals-async-io 阿里云邪大牛對flink 異步IO的介紹,里面詳細介紹了異步IO相對于同步處理的性能優(yōu)化與有序、無序原理實現(xiàn),在這里分析一些源碼幫助理解。
1. `@Override`
2. `public void processElement(StreamRecord<IN> element) throws Exception {`
3. `final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);`
4. `if (timeout > 0L) {`
5. `// register a timeout for this AsyncStreamRecordBufferEntry`
6. `long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();`
8. `final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(`
9. `timeoutTimestamp,`
10. `new ProcessingTimeCallback() {`
11. `@Override`
12. `public void onProcessingTime(long timestamp) throws Exception {`
13. `userFunction.timeout(element.getValue(), streamRecordBufferEntry);`
14. `}`
15. `});`
16. `// Cancel the timer once we've completed the stream record buffer entry. This will remove`
17. `// the register trigger task`
18. `streamRecordBufferEntry.onComplete(`
19. `(StreamElementQueueEntry<Collection<OUT>> value) -> {`
20. `timerFuture.cancel(true);`
21. `},`
22. `executor);`
23. `}`
24. `addAsyncBufferEntry(streamRecordBufferEntry);`
25. `userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);`
26. `}`
代碼入口是AsyncWaitOperator算子processElement 方法,表示處理元素方法,每個處理的元素都會被封裝成為StreamRecordQueueEntry對象,該對象會被放入內(nèi)部有序或者無序的隊列中,Emitter則負責(zé)從隊列里面取數(shù)據(jù),那么如何判斷已經(jīng)進入的元素已經(jīng)完成異步IO操作了呢?答案就在StreamRecordQueueEntry里面:
StreamRecordQueueEntry持有CompletableFuture對象,CompletableFuture是java8 提供了一個更強大的異步調(diào)用處理類,提供了異步獲取結(jié)果無需阻塞、多階段關(guān)聯(lián)異步調(diào)用。具體用法可參考https://www.cnblogs.com/cjsblog/p/9267163.html
StreamRecordQueueEntry對象添加到隊列的同時執(zhí)行其onComplete方法,內(nèi)部調(diào)用的是CompletableFuture的onComplete,表示在完成異步IO的回調(diào)方法,回調(diào)方法是一個信號燈釋放操作,會通知Emitter可以從隊列中讀取數(shù)據(jù)了
StreamRecordQueueEntry對象會被作為AsyncFunction函數(shù)的asyncInvoke方法的入?yún)?,在這個方法里面需要使用外部存儲異步客戶端或者使用線程池中執(zhí)行作為異步客戶端去查詢數(shù)據(jù)并且調(diào)用其complete方法,實際上也就是調(diào)用StreamRecordQueueEntry對象中complete方法,那么就會觸發(fā)之前注冊的onComplete回調(diào)方法完成后續(xù)操作
在AsyncFunction函數(shù)中還有一個timeout方法,在異步調(diào)用超時的情況下會被觸發(fā)。接下來看下其實現(xiàn)原理:
在processElement方法里面timeout>0的邏輯里面,通過flink提供的定時機制注冊了一個ProcessingTimeCallback的回調(diào),那么在超過timout時間就會調(diào)用其onProcessingTime方法,在onProcessingTime方法中會調(diào)用AsyncFunction中timeout方法
AsyncFunction中timeout方法中調(diào)用了ResultFuture對象(實際上就是StreamRecordQueueEntry對象)中CompletableFuture的completeExceptionally方法,那么檢測到該CompletableFuture還是處于uncomplete的狀態(tài)就會拋出異常
在timeout>0的邏輯里面還有一個調(diào)用StreamRecordQueueEntry對象的onComplete回調(diào)方法,在其CompletableFuture完成時會調(diào)用cancel 取消超時回調(diào)。
在AsyncFunction函數(shù)中默認timeout方法僅僅是會拋出Async function call has timed out.異常,我們也可以重寫該方法,獲取更多的信息。