
水位線
是數(shù)據(jù)流中插入的一個標記,用來表示事件時間的進展,它會隨著數(shù)據(jù)一起在任務間傳遞。
可以實現(xiàn)WatermarkStrategy接口自定義生成水位線
為數(shù)據(jù)流中的元素分配時間戳并生成水印以表示事件時間進度。 給定的 WatermarkStrategy 用于創(chuàng)建 TimestampAssigner 和 WatermarkGenerator。對于數(shù)據(jù)流中的每個事件,調(diào)用 TimestampAssigner.extractTimestamp(Object, long) 方法來分配事件時間戳。對于數(shù)據(jù)流中的每個事件,將調(diào)用 WatermarkGenerator.onEvent(Object, long, WatermarkOutput)。定期(由 ExecutionConfig.getAutoWatermarkInterval() 定義)將調(diào)用 WatermarkGenerator.onPeriodicEmit(WatermarkOutput) 方法來發(fā)射水位線。也可以不在onPeriodicEmit里面,在onEvent中判斷滿足某一條件才發(fā)射。
常見的水印生成模式可以作為 WatermarkStrategy 類中的靜態(tài)方法找到。
flink內(nèi)置:
forMonotonousTimestamps有序流的水位線
forBoundedOutOfOrderness亂序流的水位線
WatermarkStrategy.forMonotonousTimestamps() ==WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
自定義時還可以指定周期性生成或者是斷點式生成
周期性生成比如200ms。斷點式生成在onevent中滿足一定條件才發(fā)射水位線
多個并行子任務時,下游可能會收到多個上游發(fā)來的水位線,木桶原理,小的為準。因為水位線的本質是當前時間之前的數(shù)據(jù),都已經(jīng)到齊了。
窗口
按鍵分區(qū)和非按鍵分區(qū)數(shù)據(jù)流
區(qū)別在于調(diào)用窗口算子之前是否有keyby操作。
經(jīng)過按鍵分區(qū) keyBy 操作后,數(shù)據(jù)流會按照 key 被分為多條邏輯流(logical streams),就是 KeyedStream?;贙eyedStream 進行窗口操作時, 窗口計算會在多個并行子任務上同時執(zhí)行。相同 key 的數(shù)據(jù)會被發(fā)送到同一個并行子任務,而窗口操作會基于每個 key 進行單獨的處理。所以可以認為,每個 key 上都定義了一組窗口,各自獨立地進行統(tǒng)計計算。
如果沒有進行 keyBy,那么原始的 DataStream 就不會分成多條邏輯流。這時窗口邏輯只能在一個任務(task)上執(zhí)行,就相當于并行度變成了 1。所以在實際應用中一般不推薦使用這種方式。對于非按鍵分區(qū)的窗口操作,手動調(diào)大窗口算子的并行度也是無效的,windowAll 本身就是一個非并行的操作
達到窗口關閉時間已經(jīng)觸發(fā)計算然后銷毀了(窗口默認被銷毀),所以無法再進入到窗口中,自然也就無法更新計算結果了
增量聚合函數(shù)
ReduceFunction和AggregateFunction
.keyBy(r -> r.f0)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1,
Tuple2<String, Long> value2) throws Exception {
// 定義累加規(guī)則,窗口閉合時,向下游發(fā)送累加結果
? ? ? ? ? ? ? ? ? ? return Tuple2.of(value1.f0, value1.f1 + value2.f1);
? ? ? ? ? ? ? ? }
})
ReduceFunction 可以解決大多數(shù)歸約聚合的問題,但是這個接口有一個限制,就是聚合狀態(tài)的類型、輸出結果的類型都必須和輸入數(shù)據(jù)類型一樣。這就迫使我們必須在聚合前,先將數(shù)據(jù)轉換(map)成預期結果類型;
AggregateFunction 可以看作是 ReduceFunction 的通用版本,這里有三種類型:輸入類型(IN)、累加器類型(ACC)和輸出類型(OUT)。輸入類型 IN 就是輸入流中元素的數(shù)據(jù)類型;累加器類型 ACC 則是我們進行聚合的中間狀態(tài)類型;而輸出類型當然就是最終計算結果的類型。
createAccumulator():創(chuàng)建一個累加器,這就是為聚合創(chuàng)建了一個初始狀態(tài),每個聚合任務只會調(diào)用一次。
? add():將輸入的元素添加到累加器中。這就是基于聚合狀態(tài),對新來的數(shù)據(jù)進行進一步聚合的過程。方法傳入兩個參數(shù):當前新到的數(shù)據(jù) value,和當前的累加器accumulator;返回一個新的累加器值,也就是對聚合狀態(tài)進行更新。每條數(shù)據(jù)到來之后都會調(diào)用這個方法。
? getResult():從累加器中提取聚合的輸出結果。也就是說,我們可以定義多個狀態(tài),然后再基于這些聚合的狀態(tài)計算出一個結果進行輸出。比如之前我們提到的計算平均值,就可以把 sum 和 count 作為狀態(tài)放入累加器,而在調(diào)用這個方法時相除得到最終結果。這個方法只在窗口要輸出結果時調(diào)用。
? merge():合并兩個累加器,并將合并后的狀態(tài)作為一個累加器返回。這個方法只在需要合并窗口的場景下才會被調(diào)用;最常見的合并窗口(Merging Window)的場景就是會話窗口(Session Windows)。
全窗口函數(shù)
窗口操作中的另一大類就是全窗口函數(shù)。與增量聚合函數(shù)不同,全窗口函數(shù)需要先收集窗口中的數(shù)據(jù),并在內(nèi)部緩存起來,等到窗口要輸出結果的時候再取出數(shù)據(jù)進行計算;
ProcessWindowFunction
增量聚合函數(shù)處理計算會更高效。舉一個最簡單的例子,對一組數(shù)據(jù)求和。大量的數(shù)據(jù)連續(xù)不斷到來,全窗口函數(shù)只是把它們收集緩存起來,并沒有處理;到了窗口要關閉、輸出結果的時候,再遍歷所有數(shù)據(jù)依次疊加,得到最終結果。而如果我們采用增量聚合的方式,那么只需要保存一個當前和的狀態(tài),每個數(shù)據(jù)到來時就會做一次加法,更新狀態(tài);到了要輸出結果的時候,只要將當前狀態(tài)直接拿出來就可以了。增量聚合相當于把計算量“均攤”到了窗口收集數(shù)據(jù)的過程中,自然就會比全窗口聚合更加高效、輸出更加實時。而全窗口函數(shù)的優(yōu)勢在于提供了更多的信息,可以認為是更加“通用”的窗口操作。它只負責收集數(shù)據(jù)、提供上下文相關信息,把所有的原材料都準備好,至于拿來做什么我們完全可以任意發(fā)揮。這就使得窗口計算更加靈活,功能更加強大。
也可以增量聚合和全窗口函數(shù)的結合起來使用
這樣調(diào)用的處理機制是:基于第一個參數(shù)(增量聚合函數(shù))來處理窗口數(shù)據(jù),每來一個數(shù)據(jù)就做一次聚合;等到窗口需要觸發(fā)計算時,則調(diào)用第二個參數(shù)(全窗口函數(shù))的處理邏輯輸出結果。需要注意的是,這里的全窗口函數(shù)就不再緩存所有數(shù)據(jù)了,而是直接將增量聚合函數(shù)的結果拿來當作了 Iterable 類型的輸入。一般情況下,這時的可迭代集合中就只有一個元素了。
窗口處理的主體還是增量聚合,而引入全窗口函數(shù)又可以獲取到更多的信息包裝輸出,這樣的結合兼具了兩種窗口函數(shù)的優(yōu)勢,在保證處理性能和實時性的同時支持了更加豐富的應用場景。
Trigger
Trigger 是一個抽象類,自定義時必須實現(xiàn)下面四個抽象方法;
? onElement():窗口中每到來一個元素,都會調(diào)用這個方法。
? onEventTime():當注冊的事件時間定時器觸發(fā)時,將調(diào)用這個方法。
? onProcessingTime ():當注冊的處理時間定時器觸發(fā)時,將調(diào)用這個方法。
? clear():當窗口關閉銷毀時,調(diào)用這個方法。一般用來清除自定義的狀態(tài)。
前三個方法返回類型都是 TriggerResult,這是一個枚舉類型(enum),其中定義了對窗口進行操作的四種類型。
? CONTINUE(繼續(xù)):什么都不做
? FIRE(觸發(fā)):觸發(fā)計算,輸出結果
? PURGE(清除):清空窗口中的所有數(shù)據(jù),銷毀窗口
? FIRE_AND_PURGE(觸發(fā)并清除):觸發(fā)計算輸出結果,并清除窗口
移除器(Evictor)
Evictor 接口定義了兩個方法:
? evictBefore():定義執(zhí)行窗口函數(shù)之前的移除數(shù)據(jù)操作
? evictAfter():定義執(zhí)行窗口函數(shù)之后的以處數(shù)據(jù)操作
默認情況下,預實現(xiàn)的移除器都是在執(zhí)行窗口函數(shù)(window fucntions)之前移除數(shù)據(jù)的
窗口的銷毀
一般情況下,當時間達到了結束點,就會直接觸發(fā)計算輸出結果、進而清除狀態(tài)銷毀窗口。這時窗口的銷毀可以認為和觸發(fā)計算是同一時刻。這里需要注意,F(xiàn)link 中只對時間窗口(TimeWindow)有銷毀機制;由于計數(shù)窗口(CountWindow)是基于全局窗口(GlobalWindw)實現(xiàn)的,而全局窗口不會清除狀態(tài),所以就不會被銷毀。
ProcessFunction
內(nèi)部單獨定義了兩個方法:一個是必須要實現(xiàn)的抽象方法.processElement();另一個是非抽象方法.onTimer()
八類處理函數(shù):
(1)ProcessFunction
最基本的處理函數(shù),基于 DataStream 直接調(diào)用.process()時作為參數(shù)傳入。
(2)KeyedProcessFunction
對流按鍵分區(qū)后的處理函數(shù),基于 KeyedStream 調(diào)用.process()時作為參數(shù)傳入。要想使用
定時器,比如基于 KeyedStream。
(3)ProcessWindowFunction
開窗之后的處理函數(shù),也是全窗口函數(shù)的代表?;?WindowedStream 調(diào)用.process()時作
為參數(shù)傳入。
(4)ProcessAllWindowFunction
同樣是開窗之后的處理函數(shù),基于 AllWindowedStream 調(diào)用.process()時作為參數(shù)傳入。
(5)CoProcessFunction
合并(connect)兩條流之后的處理函數(shù),基于 ConnectedStreams 調(diào)用.process()時作為參數(shù)傳入。
(6)ProcessJoinFunction
間隔連接(interval join)兩條流之后的處理函數(shù),基于 IntervalJoined 調(diào)用.process()時作為參數(shù)傳入。
(7)BroadcastProcessFunction
廣播連接流處理函數(shù),基于 BroadcastConnectedStream 調(diào)用.process()時作為參數(shù)傳入。這里的“廣播連接流”BroadcastConnectedStream,是一個未 keyBy 的普通 DataStream 與一個廣播流(BroadcastStream)做連接(conncet)之后的產(chǎn)物。
(8)KeyedBroadcastProcessFunction
按鍵分區(qū)的廣播連接流處理函數(shù),同樣是基于 BroadcastConnectedStream 調(diào)用.process()時作為參數(shù)傳入。與 BroadcastProcessFunction 不同的是,這時的廣播連接流,是一個 KeyedStream與廣播流(BroadcastStream)做連接之后的產(chǎn)物。
多流轉換
分流
使用側輸出流、定義多個側輸出流。process處理時加到對應的側輸出流中
合流
Union
最簡單的合流操作,就是直接將多條流合在一起,叫作流的“聯(lián)合”(union);聯(lián)合操作要求必須流中的數(shù)據(jù)類型必須相同,合并之后的新流會包括所有流中的元素,數(shù)據(jù)類型不變。不限制流的個數(shù)。
合流的時候數(shù)據(jù)順序?
Connect
流的聯(lián)合雖然簡單,不過受限于數(shù)據(jù)類型不能改變。connect限制只能2條流。合并后返回的是ConnectedStreams,相應的處理方法也是CoMapFunction之類的。
基于時間的合流
Windows Join
stream1.join(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)
window join類似于 inner join。也就是說,最后處理輸出的,只有兩條流中數(shù)據(jù)按 key 配對成功的那些
Interval Join
間隔聯(lián)結的思路就是針對一條流的每個數(shù)據(jù),開辟出其時間戳前后的一段時間間隔,看這期間是否有來自另一條流的數(shù)據(jù)匹配,匹配的規(guī)則也是key相同
stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Override
public void processElement(Integer left, Integer right, Context ctx,
Collector<String> out) {
????out.collect(left + "," + right);}});
Windows CoGroup
用法跟 window join 非常類似,也是將兩條流合并之后開窗處理匹配的元素,調(diào)用時只需要將.join()換為.coGroup()
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception; }
coGroup 操作比窗口的 join 更加通用,不僅可以實現(xiàn)類似 SQL 中的“內(nèi)連接”(inner join),也可以實現(xiàn)左外連接(left outer join)、右外連接(right outer join)和全外連接(full outer join)
狀態(tài)編程
????一個算子任務會按照并行度分為多個并行子任務執(zhí)行,而不同的子任務會占據(jù)不同的任務槽(task slot)。由于不同的 slot 在計算資源上是物理隔離的,所以 Flink能管理的狀態(tài)在并行任務間是無法共享的,每個狀態(tài)只能針對當前子任務的實例有效。而很多有狀態(tài)的操作(比如聚合、窗口)都是要先做 keyBy 進行按鍵分區(qū)的。按鍵分區(qū)之后,任務所進行的所有計算都應該只針對當前 key 有效,所以狀態(tài)也應該按照 key 彼此隔離。在這種情況下,狀態(tài)的訪問方式又會有所不同?;谶@樣的想法,我們又可以將托管狀態(tài)分為兩類:算子狀態(tài)和按鍵分區(qū)狀態(tài)。
????我們需要先聲明一個狀態(tài)的 StateDescriptor,這個 Descriptor 中包含了 StateName 和 State 的類型,在同一個 Operator 里,StateName 會作為這個 State 的唯一標識
? ? 算子狀態(tài)作用于當前并行子任務、按鍵分區(qū)狀態(tài)作用于輸入流的key級別。所以按鍵分區(qū)狀態(tài)只能在keyby后使用
按鍵分區(qū)狀態(tài)(Keyed State)
? ??對于 keyedState,我們需要 OperatorID、StateName,Key、Namespace 才能定位到 State 。
1. 值狀態(tài)(ValueState)
2.?列表狀態(tài)(ListState)
3.?映射狀態(tài)(MapState)
4.?歸約狀態(tài)(ReducingState)
5.?聚合狀態(tài)(AggregatingState)
狀態(tài)TTL
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))
? .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
? .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
? .build();
依次:設定的狀態(tài)生存時間、設置更新類型、設置狀態(tài)的可見性。(所謂的“狀態(tài)可見性”,是指因為清除操作并不是實時的,所以當狀態(tài)過期之后還有可能基于存在,這時如果對它進行訪問,能否正常讀取到就是一個問題了。這里設置的 NeverReturnExpired 是默認行為,表示從不返回過期值,也就是只要過期就認為它已經(jīng)被清除了,應用不能繼續(xù)讀取;這在處理會話或者隱私數(shù)據(jù)時比較重要。對應的另一種配置是 ReturnExpireDefNotCleanedUp,就是如果過期狀態(tài)還存在,就返回它的值)
目前的 TTL 設置只支持處理時間。另外,所有集合類型的狀態(tài)(例如ListState、MapState)在設置 TTL 時,都是針對每一項(per-entry)元素的。也就是說,一個列表狀態(tài)中的每一個元素,都會以自己的失效時間來進行清理,而不是整個列表一起清理。
算子狀態(tài)(Operator State)
作用于并行子任務,對于 OperatorState,我們只需要 OperatorID,StateName,即可定位到唯一的 OperatorState 值。
1. 列表狀態(tài)(ListState)
? ? 每個并行子任務維護一個列表狀態(tài),在當算子并行度進行縮放調(diào)整時,算子的列表狀態(tài)中的所有元素項會被統(tǒng)一收集起來,相當于把多個分區(qū)的列表合并成了一個“大列表”,然后再均勻地分配給所有并行任務。這種“均勻分配”的具體方法就是“輪詢”(round-robin),與之前介紹的 rebanlance 數(shù)據(jù)傳輸方式類似,是通過逐一“發(fā)牌”的方式將狀態(tài)項平均分配的。這種方式也叫作“平均分割重組”(even-splitredistribution)。
2.?聯(lián)合列表狀態(tài)(UnionListState)
? ? 與列表狀態(tài)類似,僅僅是進行縮放調(diào)整時對于狀態(tài)的分配方式不同。
3.?廣播狀態(tài)(BroadcastState)
? ??有時我們希望算子并行子任務都保持同一份“全局”狀態(tài),用來做統(tǒng)一的配置和規(guī)則設定。
狀態(tài)持久化和狀態(tài)后端
Flink 會定期保存檢查點,在檢查點中會記錄每個算子的 id 和狀態(tài);如果發(fā)生故障,F(xiàn)link 就會用最近一次成功保存的檢查點來恢復應用的狀態(tài),重新啟動處理流程,就如同“讀檔”一樣。
狀態(tài)后端的分類:
1. 哈希表狀態(tài)后端
????即放內(nèi)存,具體實現(xiàn)上,哈希表狀態(tài)后端在內(nèi)部會直接把狀態(tài)當作對象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的狀態(tài),以及窗口中收集的數(shù)據(jù)和觸發(fā)器(triggers),都會以鍵值對(key-value)的形式存儲起來,所以底層是一個哈希表(HashMap)
2. RocksDB
????RocksDB 是一種內(nèi)嵌的 key-value 存儲介質,可以把數(shù)據(jù)持久化到本地硬盤。配置EmbeddedRocksDBStateBackend 后,會將處理中的數(shù)據(jù)全部放入 RocksDB 數(shù)據(jù)庫中,RocksDB默認存儲在 TaskManager 的本地數(shù)據(jù)目錄里。與 HashMapStateBackend 直接在堆內(nèi)存中存儲對象不同,這種方式下狀態(tài)主要是放在RocksDB 中的。數(shù)據(jù)被存儲為序列化的字節(jié)數(shù)組(Byte Arrays),讀寫操作需要序列化/反序列化,因此狀態(tài)的訪問性能要差一些。
為每個Job配置狀態(tài)后端:env.setStateBackend(new HashMapStateBackend());
指定checkpoint的目錄:state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
3.FsStateBackend
檢查點
????所有任務狀態(tài)在同一時間點的一個“快照”(snapshot),它的觸發(fā)是周期性的。當所有任務都恰好處理完一個相同的輸入數(shù)據(jù)的時候,將它們的狀態(tài)保存下來。首先,這樣避免了除狀態(tài)之外其他額外信息的存儲,提高了檢查點保存的效率。其次,一個數(shù)據(jù)要么就是被所有任務完整地處理完,狀態(tài)得到了保存;要么就是沒處理完,狀態(tài)全部沒保存:這就相當于構建了一個“事務”(transaction)。
檢查點的保存
JobManagerCheckpointStorage(默認)
FileSystemCheckpointStorage
也可以設置定期去保存檢查點作用于恢復
/user-defined-checkpoint-dir /{job-id} | + --shared/ + --taskowned/ + --chk-1/ + --chk-2/ + --chk-3/
barrier概念
? ? barrier注入數(shù)據(jù)流中,根據(jù)數(shù)據(jù)流一起流轉。每個barrier攜帶一個快照ID。來自不同快照的多個障礙可以同時在流中,這意味著各種快照可能同時發(fā)生。一旦Sink操作符(流式 DAG 的末端)從其所有輸入流中接收到屏障 n,它就會向檢查點協(xié)調(diào)器確認快照 n。 在所有Sink都確認快照后,則完成了此次快照。

aligned Checkpointing?
? ? 每當接收到 Barrier,算子進行本地的 Checkpoint 快照,并在完成后異步上傳本地快照,同時將 Barrier 以廣播方式發(fā)送至下游。當某個 Checkpoint 的所有 Barrier 到達 DAG 末端且所有算子完成快照,則標志著全局快照的成功。在有多個輸入 Channel 的情況下,為了數(shù)據(jù)準確性,算子會等待所有流的 Barrier 都到達之后才會開始本地的快照,這種機制被稱為 Barrier 對齊。在對齊的過程中,算子只會繼續(xù)處理的來自未出現(xiàn) Barrier Channel 的數(shù)據(jù),而其余 Channel 的數(shù)據(jù)會被寫入輸入隊列,直至在隊列滿后被阻塞。對齊的策略下,如果某個并行子任務收到的barrire更快,則需要將數(shù)據(jù)先緩存起來,直到其他子任務也收到barrier。? ??

? ? 當算子包含任何形式的狀態(tài)時,這些狀態(tài)也必須是快照的一部分。算子在收到上游所有的snapshot barriers后,發(fā)送snapshot barriers到下游之前完成自身狀態(tài)的snapshot。因此狀態(tài)可能是很大的,所以需要保存到狀態(tài)后端。而在checkpoint中僅僅保存了一個指向該狀態(tài)的指針

Unaligned Checkpointing?
? ??檢查點屏障實際上不再嵌入到數(shù)據(jù)流中。可以將未處理的緩沖數(shù)據(jù)(in-flight data)也保存進檢查點。這樣,當我們遇到一個分區(qū)barrier時就不需等待對齊,而是可以直接啟動狀態(tài)的保存了。但是這種情況下對state backend的IO壓力很大,因為需要將其他子任務落后的數(shù)據(jù)一并保存到狀態(tài)后端中。
Exactly Once vs. At Least Once?
????對齊步驟可能會增加流式傳輸程序的等待時間。 通常,這種額外的延遲大約為幾毫秒,但我們已經(jīng)看到一些異常值的延遲顯著增加的情況。 對于所有記錄都需要始終超低延遲(幾毫秒)的應用程序,F(xiàn)link 有一個開關可以在檢查點期間跳過流對齊。 一旦操作員從每個輸入中看到檢查點障礙,檢查點快照仍然會被繪制。
????當跳過對齊時,操作員會繼續(xù)處理所有輸入,即使在檢查點 n 的一些檢查點屏障到達之后也是如此。 這樣,運算符還會在獲取檢查點 n 的狀態(tài)快照之前處理屬于檢查點 n+1 的元素。 在恢復時,這些記錄將作為副本出現(xiàn),因為它們都包含在檢查點 n 的狀態(tài)快照中,并且將作為檢查點 n 之后的數(shù)據(jù)的一部分重放。????
????兩者區(qū)別就在于對齊方式,如果是不強制對齊的話,數(shù)據(jù)會繼續(xù)發(fā)到下游處理,并且最終輸出到第三方儲存。假設checkpoint n已經(jīng)完成,但是這個checkpoint中包括了緩沖數(shù)據(jù),如果此時發(fā)生故障重啟,這些緩沖數(shù)據(jù)就會進行重放,而實際上某些數(shù)據(jù)已經(jīng)輸出到第三方存儲,所以導致了重復數(shù)據(jù)。
enableUnalignedCheckpoints:只能用于exactly once,是不是兩者組合起來就等于Al least once? 這塊后續(xù)再看看
savepoint
????與此相反,保存點是由用戶創(chuàng)建、擁有和刪除的。保存點的使用是手動的備份和恢復。 例如,這可能是更新Flink 版本、更改作業(yè)圖、更改并行度等等。 可以認為保存點是Operator ID -> State?for each stateful operator

todo:檢查點保存點保存數(shù)據(jù)的形式,k應當就是uid+狀態(tài)名稱+key,v是狀態(tài)
SQL
將table轉換為流
(1)調(diào)用 toDataStream()方法:針對只會插入、不會更新的表
(2)調(diào)用 toChangelogStream()方法

flink sql指定時間屬性
事件時間
? ? 1. SQL中直接指定,這里我們把 ts 字段定義為事件時間屬性,而且基于 ts 設置了 5 秒的水位線延遲

?2. 在數(shù)據(jù)流轉換為表時定義,

處理時間
1. 在sql中直接定義

2. 在數(shù)據(jù)流轉換為表時定義

????在 Flink 1.12 之前的版本中,Table API 和 SQL 提供了一組“分組窗口”(Group Window)函數(shù),常用的時間窗口如滾動窗口、滑動窗口、會話窗口都有對應的實現(xiàn);具體在 SQL 中就是調(diào)用 TUMBLE()、HOP()、SESSION(),傳入時間屬性字段、窗口大小等參數(shù)就可以了。以滾動窗口為例:TUMBLE(ts, INTERVAL '1' HOUR)
? ? 從1.13開始開始使用窗口表值函數(shù)(TVF)來定義窗口,在窗口 TVF 的返回值中,除去原始表中的所有列,還增加了用來描述窗口的額外 3 個列:“窗口起始點”(window_start)、“窗口結束點”(window_end)、“窗口時間”(window_time)。起始點和結束點比較好理解,這里的“窗口時間”指的是窗口中的時間屬性,它的值等于window_end - 1ms,所以相當于是窗口中能夠包含數(shù)據(jù)的最大時間戳。
(1)滾動窗口(TUMBLE)
? ??TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)這里基于時間字段 ts,對表 EventTable 中的數(shù)據(jù)開了大小為 1 小時的滾動窗口。窗口會將表中的每一行數(shù)據(jù),按照它們 ts 的值分配到一個指定的窗口中。
(2)滑動窗口(HOP)
? ??HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));這里我們基于時間屬性 ts,在表 EventTable 上創(chuàng)建了大小為 1 小時的滑動窗口,每 5 分鐘滑動一次。需要注意的是,緊跟在時間屬性字段后面的第三個參數(shù)是步長(slide),第四個參數(shù)才是窗口大小(size)。
(3)累積窗口(CUMULATE)
? ??CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))這里我們基于時間屬性 ts,在表 EventTable 上定義了一個統(tǒng)計周期為 1 天、累積步長為 1小時的累積窗口。注意第三個參數(shù)為步長 step,第四個參數(shù)則是最大窗口長度。
????為了防止狀態(tài)無限增長耗盡資源,F(xiàn)link Table API 和 SQL 可以在表環(huán)境中配置狀態(tài)的生存時間(TTL)
TableConfig tableConfig = tableEnv.getConfig();
tableConfig.setIdleStateRetention(Duration.ofMinutes(60)); 或者
configuration.setString("table.exec.state.ttl", "60 min");
Join查詢
regular join
????常規(guī)聯(lián)結(Regular Join)是 SQL 中原生定義的 Join 方式,聯(lián)結用 INNER JOIN 來定義,這塊也是innerjoin、left join、right join、full join等常規(guī)知識。
函數(shù)
系統(tǒng)函數(shù)
????Flink SQL中的系統(tǒng)函數(shù)又主要可以分為兩大類:標量函數(shù)(Scalar Functions)和聚合函數(shù)(Aggregate Functions)。
標量函數(shù)
? ??比較函數(shù)、邏輯函數(shù)、算術函數(shù)、字符串函數(shù)等
聚合函數(shù)
? ??COUNT(*)、RANK()
自定義函數(shù)
????Flink 的 Table API 和 SQL 提供了多種自定義函數(shù)的接口,以抽象類的形式定義。當前 UDF主要有以下幾類:
??標量函數(shù)(Scalar Functions):將輸入的標量值轉換成一個新的標量值;? ??

? 表函數(shù)(Table Functions):將標量值轉換成一個或多個新的行數(shù)據(jù),也就是擴展成一個表;

? 聚合函數(shù)(Aggregate Functions):將多行數(shù)據(jù)里的標量值轉換成一個新的標量值;
????聚合函數(shù)的概念我們之前已經(jīng)接觸過多次,如 SUM()、MAX()、MIN()、AVG()、COUNT()都是常見的系統(tǒng)內(nèi)置聚合函數(shù)。
? 表聚合函數(shù)(Table Aggregate Functions):將多行數(shù)據(jù)里的標量值轉換成一個或多個新的行數(shù)據(jù)。
????目前 SQL 中沒有直接使用表聚合函數(shù)的方式,所以需要使用 Table API 的方式來調(diào)用
Flink CEP

維表Join




Row和Rowdata
????Row為RowData的低階數(shù)據(jù)類型,常常用于DataStream中的使用;而RowData是通過不同的Conveter轉換Row得到的高階數(shù)據(jù),用于TableAPI和Sql中使用?;兩者中的存儲內(nèi)容基本是一致的。