Flink 作為底層的流處理框架。主要出于以下幾點(diǎn)原因:
第一,F(xiàn)link 是一個(gè)純流式系統(tǒng),吞吐量實(shí)際測(cè)試可達(dá) 100K EPS。而不像某些框架是用 mini batch 的模式來(lái)達(dá)到所謂的流式處理的;
第二,面對(duì)不同的用戶數(shù)據(jù)格式,我們必須支持多種數(shù)據(jù)源,這一點(diǎn)上 Flink 內(nèi)置的對(duì)多種數(shù)據(jù)源的支持(CSV,Kafka,Hbase,Text,Socket 數(shù)據(jù)等)也為用戶數(shù)據(jù)的接入提供了便利;
第三,F(xiàn)link 強(qiáng)大的窗口機(jī)制(包括翻轉(zhuǎn)窗口,滑動(dòng)窗口,session 窗口,全窗口以及允許用戶自定義窗口)可以滿足復(fù)雜的業(yè)務(wù)邏輯,使得用戶可以編寫(xiě)復(fù)雜的業(yè)務(wù)規(guī)則;
第四,F(xiàn)link 內(nèi)置的 RocksDB 數(shù)據(jù)存儲(chǔ)格式使其數(shù)據(jù)處速度快且資源消耗少,在 Checkpoint 上起到了至關(guān)重要的作用;
第五,F(xiàn)link 對(duì)算子(operator)的高可控性,使得用戶可以靈活添加刪除或更改算子行為。這一點(diǎn)對(duì)于動(dòng)態(tài)部署有著至關(guān)重要的意義。
規(guī)則引擎方面我們有兩個(gè)選擇:
Flink 原生 CEP 組件和 Drools 規(guī)則引擎。那么兩者各有什么優(yōu)勢(shì)和劣勢(shì)呢?首先我們看一下 Flink CEP。當(dāng)前穩(wěn)定的 Flink1.3 版本的 CEP 是一套極具通用性、易于使用的實(shí)時(shí)流式事件處理方案。作為 Flink 的原生組件,省去了第三方庫(kù)與 Flink 配合使用時(shí)可能會(huì)導(dǎo)致的各種問(wèn)題。但其功能現(xiàn)階段看來(lái)還比較基礎(chǔ),不能表達(dá)復(fù)雜的業(yè)務(wù)場(chǎng)景,同時(shí)它不能夠做到動(dòng)態(tài)更新(這是一個(gè)痛點(diǎn))。具體如何解決我們稍后會(huì)看到。
什么是 Drools?
Drools 是一套基于 JVM 的,實(shí)現(xiàn)了 RETE 算法的規(guī)則引擎。它可以將多變的規(guī)則從硬編碼中解放出來(lái),以規(guī)則腳本的形式存在。右邊圖中顯示的是一個(gè)典型的 Drools 規(guī)則的定義方式??梢钥吹?,其語(yǔ)義與 Java 非常類(lèi)似。既可以導(dǎo)入既有的 Java POJO(圖中 Person 類(lèi)),也可以在規(guī)則文件中直接定義類(lèi)(EventA)。when 語(yǔ)句中是具體的判斷條件,then 語(yǔ)句中是滿足判斷條件之后所做的操作。操作可以是任意的,不僅限于對(duì)滿足條件的那個(gè)對(duì)象進(jìn)行操作。比如你可以在 then 里調(diào)用某個(gè) Java 類(lèi)的方法,或者調(diào)用某個(gè)全局變量??傊?,可以在 Drools 規(guī)則文件中 import Java 類(lèi),然后對(duì)其進(jìn)行操作。
Drools 有些什么優(yōu)缺點(diǎn)呢?
它最大優(yōu)勢(shì)在于語(yǔ)法規(guī)則簡(jiǎn)單,類(lèi)似 Java,編寫(xiě)門(mén)檻不高、能夠無(wú)縫化與 Java 集成,且用戶可以對(duì) Drools 規(guī)則進(jìn)行動(dòng)態(tài)配置。但這套方案也存在著自己的不足之處:例如其內(nèi)置聚合功能速度緩慢,不適合我們自身或者客戶使用場(chǎng)景下的大量聚合操作任務(wù)。另外,其內(nèi)置事件序列處理機(jī)制也需要消耗大量?jī)?nèi)存資源。
下面我們來(lái)看一個(gè)具體的例子。
可以看到我們這里有一條檢測(cè) VPN 可疑行為的規(guī)則。規(guī)則當(dāng)中包含三條判斷條件。
第一條 metric 用來(lái)判斷一小時(shí)能登錄失敗的次數(shù)。
第二條演示的是用戶與設(shè)備之間的實(shí)體關(guān)系,表達(dá)式 expression == “[vpn.user, vpn.device]”說(shuō)明了這一點(diǎn)。
第三條演示的是在序列算法下異常值大于 50 的行為。
最后會(huì)將滿足條件的三個(gè)行為收集起來(lái)發(fā)送給下游的模塊。下游模塊可以是另一個(gè)算子,或者是持久化結(jié)果的 DB。
有了 Flink 作為流計(jì)算引擎,有了 Drools 作為規(guī)則引擎,那么我們?nèi)绾螌烧呓Y(jié)合放到一個(gè)系統(tǒng)里發(fā)揮作用呢。我們需要做的是將源數(shù)據(jù)輸入到 Flink 生成所謂的事件流,同時(shí)將 Drools 規(guī)則文本讀取到 Flink 生成所謂的規(guī)則流。而 Flink 中提供了一個(gè) CoFlatMapFunction 可以將兩個(gè)流結(jié)合起來(lái)進(jìn)行分析。在這個(gè) function 里我們所要做的就是將在 Flink 里結(jié)合機(jī)器學(xué)習(xí)算法計(jì)算出來(lái)的結(jié)果與 Drools 規(guī)則進(jìn)行匹配。
但事實(shí)上,這個(gè)方案在實(shí)際運(yùn)行當(dāng)中會(huì)有一些性能上的問(wèn)題。這些問(wèn)題主要表現(xiàn)在長(zhǎng)周期行為的分析上。比如,機(jī)器學(xué)習(xí)算法需要對(duì)長(zhǎng)周期行為(數(shù)據(jù)往往跨越三個(gè)月)進(jìn)行計(jì)算,得出異常值。那么這種情況下我們需要維護(hù)算法生成的長(zhǎng)周期行為的狀態(tài)。具體方法可以是直接保存在 Drools Engine 中,或者將其保存在外部 DB 中,再或者可以利用 Flink 的 stateful operator 來(lái)維護(hù)狀態(tài)。但現(xiàn)有情況下,每種方法都多多少少會(huì)有一些問(wèn)題。接下來(lái)我們看看具體問(wèn)題都有哪些。
需要保存過(guò)往窗口的狀態(tài),作為中間結(jié)果送入 Drools 規(guī)則引擎進(jìn)行計(jì)算。Flink 內(nèi)置的窗口機(jī)制在窗口結(jié)束時(shí)會(huì)清除窗口狀態(tài)。 Flink內(nèi)置的RocksDB存儲(chǔ)結(jié)構(gòu)在窗口清理時(shí)會(huì)自動(dòng)刪除數(shù)據(jù)。 Flink產(chǎn)生的長(zhǎng)周期聚合結(jié)果被送入 Drool 規(guī)則引擎進(jìn)行匹配的時(shí)候往往會(huì)消耗大量?jī)?nèi)存。可以看到,主要的痛點(diǎn)就在于中間結(jié)果的維護(hù)和資源消耗的問(wèn)題。面對(duì)這些問(wèn)題我們可以嘗試以下的做法。
首先想到的是用 redis,memcached 之類(lèi)的 KV store 來(lái)保存中間結(jié)果。但實(shí)際測(cè)試結(jié)果表明,它們的性能趕不上 Flink 的速度。所以在追求高吞吐量的情況下,此方法行不通。其次,可以通過(guò)修改 Flink RockDB backend 的源碼來(lái)解決窗口清理時(shí)自動(dòng)刪除數(shù)據(jù)的問(wèn)題。同時(shí)為了保證過(guò)期數(shù)據(jù)不擠壓,需要引入“TTL”(time to live)屬性,是的 rocksdb 在超時(shí)的時(shí)候自動(dòng)刪除過(guò)期數(shù)據(jù)。內(nèi)存問(wèn)題主要是由 Drools 引擎引起的。因?yàn)槊恳粭l事件與規(guī)則匹配都會(huì)生成一個(gè) Fact,默認(rèn)情況下 fact 無(wú)論是否匹配,Drools 都不會(huì)立刻刪除它。你必須手動(dòng)的刪除它。但當(dāng)事件數(shù)量過(guò)大或者規(guī)則數(shù)量過(guò)大時(shí),即使你手動(dòng)刪除沒(méi)有匹配的 fact,可能也會(huì)出現(xiàn)某一時(shí)間段大量 fact 存在于內(nèi)存中的情況。所以可行的辦法是設(shè)定閾值來(lái)控制內(nèi)存中允許同時(shí)存在的 fact 的數(shù)量,同時(shí)清理失效的 fact?;蛘咭部梢员M量保持規(guī)則簡(jiǎn)單化。復(fù)雜的聚合規(guī)則交給 Flink 去做。
可以看到,以上方案所產(chǎn)生的性能問(wèn)題主要在于 Drools。其實(shí)除了以上的方案,我們還有一個(gè) Plan B。Flink1.4 Snapshot 版本增加了一些新功能。利用這些新功能,我們可以直接使用 Flink CEP 并做到動(dòng)態(tài)更新。這些功能主要包括:新版本加入了對(duì)算子粒度的操作。我們可以 checkpoint 某一個(gè)算子的狀態(tài)。同時(shí) Flink CEP 中新增了 pattern group 的概念??梢詫⒍鄠€(gè)規(guī)則 pattern 歸為同一個(gè) group。這樣增加了規(guī)則的表達(dá)能力。利用這些功能,我們重新設(shè)計(jì)了一個(gè)系統(tǒng)來(lái)實(shí)現(xiàn)規(guī)則的動(dòng)態(tài)更新。下面我們來(lái)看一下新設(shè)計(jì)的工作流程。
簡(jiǎn)單來(lái)講,整個(gè)工作流程就是用戶更新規(guī)則,新規(guī)則被翻譯成 Java 源碼,然后編譯并打包成可執(zhí)行 jar,這個(gè)時(shí)候系統(tǒng)將觸發(fā) Flink 的 Savepoint,保存當(dāng)前 operator 的狀態(tài),然后 cancel 當(dāng)前運(yùn)行的 Flink Job,然后把新生成的 jar 發(fā)布到 Flink 上去,同時(shí)讀取最新的 operator 狀態(tài),恢復(fù)整個(gè)系統(tǒng)的運(yùn)行。值得提出的一點(diǎn)是,根據(jù)規(guī)則文件里規(guī)則的數(shù)量和復(fù)雜度。我們可以劃分規(guī)則生成多個(gè) jar 發(fā)布到 Flink 上。這樣單個(gè) job 的負(fù)載就不至于過(guò)高。這種動(dòng)態(tài)生成規(guī)則代碼的方式擴(kuò)展性和并發(fā)性更出色,不存在單一大負(fù)載算子。缺陷在于從 Savepoint 到整個(gè)流程恢復(fù)會(huì)有數(shù)秒延遲。