Flink CEP

1 復(fù)雜事件

1.1 引入

在Flink RichFunction&state這篇博文中我們一起學(xué)習(xí)了下如何結(jié)合使用keyBy state和TreeSet在一條無界流中進(jìn)行全局的分組求top n操作,可以解決一些實(shí)時(shí)看板相關(guān)的業(yè)務(wù)問題。在Flink BroadcastStream這篇博文中我們也學(xué)習(xí)到了如何使用廣播流來處理監(jiān)控規(guī)則經(jīng)常變更的程序日志監(jiān)控業(yè)務(wù)。

那么現(xiàn)在我們又遇到了一個(gè)新的業(yè)務(wù)需求:判斷一個(gè)用戶在點(diǎn)擊了商品之后是否立即進(jìn)行了下單付款操作,如果是的話將用戶名和點(diǎn)擊時(shí)間以及下單付款時(shí)間都輸出到控制臺(tái)。
我們思考了一下,發(fā)現(xiàn)繼續(xù)使用keyBy state操作好像也能夠?qū)崿F(xiàn)這個(gè)業(yè)務(wù)需求,只是我們的判斷邏輯和代碼都會(huì)比較復(fù)雜一些。邏輯思路如下

1.將數(shù)據(jù)整理成UserAction(ip: String, timestamp: Long, name: String, action: String)
2.通過name對(duì)UserAction進(jìn)行keyBy操作,得到KeyedStream[UserAction, String]
3.KeyedStream.process(xxx)此時(shí)在每個(gè)name分組中,需要一個(gè)keyBy state同時(shí)保存該name對(duì)應(yīng)的"click"操作的UserAction的信息
  使用一個(gè)ValueState[UserAction]來保存"click"信息,"buy"信息不需要保存
4.這個(gè)時(shí)候碰到的問題就是在每個(gè)name分組如何判斷"click","buy"的UserAction是有順序來到該分組
5.針對(duì)于4的問題,可以通過ValueState存儲(chǔ)的信息來進(jìn)行判斷
  a."click","buy"先后來到該name分組
    "click"來到該name分組時(shí),直接將"click"添加更新進(jìn)ValueState
    "buy"來到該name分組時(shí),判斷ValueState是否有存儲(chǔ)"click"
        有則將"click"取出來和當(dāng)前"buy"整合并發(fā)送到下游,且要清空ValueState信息
        沒有直接清空ValueState信息,不發(fā)送任何數(shù)據(jù)到下游
  b.如果有非"click","buy"信息來到name分組,均直接清空ValueState信息,不發(fā)送任何數(shù)據(jù)到下游

當(dāng)我們遇到的業(yè)務(wù)并不是很復(fù)雜的時(shí)候使用上邊的思路完全可以解決,但是如果要是碰到更復(fù)雜一些的業(yè)務(wù)需求可能判斷邏輯就會(huì)越來越復(fù)雜:"click"和"buy"之間可以有"show"操作;"click"和"buy"之間可以有零或者多個(gè)"click"操作。

那怎么能夠使用更簡(jiǎn)潔的邏輯和代碼來實(shí)現(xiàn)這種越來越復(fù)雜的問題呢?

1.2 復(fù)雜事件

我們先來給這樣的業(yè)務(wù)來定這樣一個(gè)描述:檢測(cè)和發(fā)現(xiàn)無界事件流中多個(gè)記錄的關(guān)聯(lián)規(guī)則,也就是從無界事件流中得到滿足規(guī)則的復(fù)雜事件。

CEP(Complex Event Processing)就是在無界事件流中檢測(cè)事件模式,讓我們掌握數(shù)據(jù)中重要的部分。flink CEP是在flink中實(shí)現(xiàn)的復(fù)雜事件處理庫。

01_show.png
目標(biāo):從簡(jiǎn)單事件流中發(fā)現(xiàn)一些高階特征
輸入:一個(gè)或者多個(gè)簡(jiǎn)單事件構(gòu)成的事件流
處理:檢測(cè)簡(jiǎn)單事件之間的聯(lián)系,多個(gè)事件組合一起符合匹配規(guī)則,將該多個(gè)事件構(gòu)成復(fù)雜事件
輸出:符合規(guī)則的復(fù)雜事件

2 Pattern API

我們先來認(rèn)識(shí)一下CEP中Pattern模式吧,也就是規(guī)則的制定。

val start: Pattern[X, X] = Pattern.begin[X]("start")

Pattern根據(jù)模式的組合種類,分為了三種

2.1 個(gè)體模式(Individual Patterns)

組成復(fù)雜規(guī)則的每一個(gè)單獨(dú)的模式定義,就是“個(gè)體模式”

start.where(condition: F => Boolean)
  • 量詞

個(gè)體模式根據(jù)接收同一種事件的數(shù)量又可以分為“單例模式”和“循環(huán)模式“,我們通過一個(gè)“量詞”來指定接受同一種事件的數(shù)量。

start.times(2)                      // 必須2次
start.times(2, 5)                   // 2,3,4或者5次都可以
start.times(2, 5).greedy            // 2,3,4或者5次,并且盡可能的重復(fù)匹配
start.times(2).optional             // 0或者2次
start.oneOrMore                     // 1次或者多次
start.timesOrMore(2).optional.greedy// 0, 2或者多次,并且盡可能的重復(fù)匹配
  • 條件

個(gè)體模式的條件,可以在一個(gè)個(gè)體模式上判斷使用多個(gè)條件,只有當(dāng)條件都滿足的情況下才算匹配成功

// 組合條件 (參考sql中的 where or)
.where()    // 條件相連為and
.or()       // 條件相連為or
// 終止條件
.until()    // 當(dāng)使用了oneOrMore或者oneOrMore.optional時(shí)需要進(jìn)行終止,以便清楚狀態(tài)
// 迭代條件
.where(condition: (F, Context[F]) => Boolean) // 調(diào)用上下文對(duì)前邊接收的事件進(jìn)行處理
// ctx.getEventsForPattern("start")

2.2 組合模式(Combining Patterns)

多個(gè)個(gè)體模式組合起來就形成了一個(gè)組合模式

Pattern.begin[X]("start").where(condition: F => Boolean)
        .next("middle").where(condition: F => Boolean)

注意:組合模式必須由一個(gè)“開始個(gè)體模式”開始

val start: Pattern[X, X] = Pattern.begin[X]("start")

2.3 模式組(Groups of Patterns)

將一個(gè)組合模式作為條件嵌套在個(gè)體模式里,成為一組模式

3 近鄰

當(dāng)我們?cè)趯?duì)事件流進(jìn)行復(fù)雜事件處理,有時(shí)我們需要兩個(gè)事件在流中必須相連,有時(shí)只要兩個(gè)事件出現(xiàn)在流中不必須相連中間可以間隔一個(gè)其他事件。也就是我們對(duì)多個(gè)事件組成規(guī)則嚴(yán)格性的寬容度,那么如何來表達(dá)這個(gè)容忍度呢?使用近鄰。

嚴(yán)格近鄰:所有事件嚴(yán)格按照順序進(jìn)行,中間沒有任何不匹配的事件。使用next()指定

對(duì)于模式"a next b",事件流[a, c, b, d],沒有匹配
02_yange.png

寬松近鄰:允許中間出現(xiàn)不匹配事件。使用followedBy()指定

對(duì)于模式"a followedBy b",事件流[a, c, b, d],匹配為 [a, b]
03_kuansong.png

非確定寬松近鄰:一個(gè)匹配的事件能夠再次使用。使用followedByAny()指定

對(duì)于模式"a followedByAny b",事件流[a, c, b1, b2],匹配為 [a, b1],[a, b2]
04_feiqueding_kuansong.png

4 示例

此處我們使用flink CEP來實(shí)現(xiàn) 1.1引入 中提到的業(yè)務(wù)需求

1 CEP并不包含在flink中,使用前需要自己導(dǎo)入cep jar包

    <!-- 使用cep需要引入該jar包,
        scala語言編寫要導(dǎo)入 artifactId前綴為 flink-cep-scala -->
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep-scala_2.12</artifactId>
        <version>1.9.1</version>
    </dependency>

2 定義Pattern模式且必須要以Pattern.begin("xxx")開始,"xxx"為別名,在后邊PatternStream.select()中獲取符合模式的數(shù)據(jù)時(shí)會(huì)使用到。

val pattern = Pattern.begin[T]("start").where(...).next("middle").where(...)
    // 定義一個(gè)cep pattern模式。此處復(fù)雜的事務(wù)為:用戶click后馬上進(jìn)行buy操作
    val pattern: Pattern[UserAction, UserAction] =
      Pattern.begin[UserAction]("start").where(new SimpleCondition[UserAction] {
        override def filter(t: UserAction): Boolean = t.action.equals("click")
      })
        .next("middle").where(new SimpleCondition[UserAction] {
        override def filter(t: UserAction): Boolean = t.action.equals("buy")
      })
    // 獲取一個(gè)普通的流
    val input: KeyedStream[UserAction, String] = env.addSource(
        new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), initProps()))
      .map { line => 
        val strs: Array[String] = line.split(",")
        UserAction(strs(0), strs(1).toLong, strs(2), strs(3)) // 將記錄轉(zhuǎn)換為UserAction類型
      }.keyBy(_.name)
    // 將我們定義好的cep pattern應(yīng)用于這個(gè)普通的流
    val patternStream: PatternStream[UserAction] = CEP.pattern(input, pattern)
    // 通過select算子獲取符合pattern的事務(wù)數(shù)據(jù),并打印結(jié)果
    patternStream.select(new PatternSelectFunction[UserAction, String] {
      override def select(map: util.Map[String, util.List[UserAction]]): String = {
        val click: UserAction = map.get("start").iterator().next()
        val buy: UserAction = map.get("middle").iterator().next()
        // 打印用戶的名稱,點(diǎn)擊和購買的時(shí)間
        s"name: ${click.name}, click: ${click.timestamp}, buy: ${buy.timestamp}"
      }
    }).print()
05_cep_demo.png

示例代碼中在Pattern指定時(shí),我們使用next()來表示需要得到是符合嚴(yán)格近鄰的復(fù)雜事件,所以"click","order","buy"這樣的復(fù)雜事件并不符合模式也不會(huì)被輸出打印到控制臺(tái)。

此博文,我們只是簡(jiǎn)單的介紹了下CEP及其使用方式,但是CEP的知識(shí)遠(yuǎn)不遠(yuǎn)不止這些。后邊有機(jī)會(huì)的話我們?cè)僖黄鹕钊雽W(xué)習(xí),此處就不進(jìn)行展開了。
下一篇博文我們就來一起學(xué)習(xí)一下flink中的三種時(shí)間語義和窗口機(jī)制吧。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 光消失在地平線 街燈明了, 星星睜開了眼睛, 夜,如稠墨般。 唯獨(dú), 璀璨的燈迷失了我的眼。 觸手的光明, 一片冰...
    風(fēng)中的旅行閱讀 224評(píng)論 0 0
  • 坐標(biāo)/蛟橋園 最近看了一些書和電影,沒人交談總感覺悶悶的,想到了簡(jiǎn)書,重新裝回來,也想把每天的一些小事情記錄下來,...
    就是蘇蘇呀閱讀 217評(píng)論 1 1

友情鏈接更多精彩內(nèi)容