Flink基礎(chǔ)系列34-Flink CEP簡介

一.什么是CEP

  1. 復(fù)雜事件處理(Complex Event Processing,CEP)

  2. Flink CEP是在Flink中實現(xiàn)的復(fù)雜事件處理(CEP)庫

  3. CEP允許在無休止的事件流中檢測事件模式,讓我們有機會掌握數(shù)據(jù)中重要的部分

  4. 一個或多個由簡單事件構(gòu)成的事件流通過一定的規(guī)則匹配,然后輸出用戶想得到的數(shù)據(jù)——滿足規(guī)則的復(fù)雜事件

二.CEP特點

image.png

目標(biāo):從有序的簡單事件流中發(fā)現(xiàn)一些高階特征

輸入:一個或多個由簡單事件構(gòu)成的事件流

處理:識別簡單事件之間的內(nèi)在聯(lián)系,多個符合一定規(guī)則的簡單事件構(gòu)成復(fù)雜事件

輸出:滿足規(guī)則的復(fù)雜事件

三. Pattern API

處理事件的規(guī)則,被叫做"模式"(Pattern)

Flink CEP提供了Pattern API,用于對輸入流數(shù)據(jù)進行復(fù)雜事件規(guī)則定義,用來提取符合規(guī)則的時間序列

DataStream<Event> input = ...
// 定義一個Pattern
Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(...)
  .next("middle").subtype(SubEvent.class).where(...)
  .followedBy("end").where(...);
// 將創(chuàng)建好的Pattern應(yīng)用到輸入事件流上
PatternStream<Event> patternStream = CEP.pattern(input,pattern);
// 檢出匹配事件序列,處理得到結(jié)果
DataStream<Alert> result = patternStream.select(...);

3.1 個體模式(Individual Patterns)

組成復(fù)雜規(guī)則的每一個單獨的模式定義,就是"個體模式"

start.times(3).where(new SimpleCondition<Event>() {...})

個體模式可以包括"單例(singleton)模式"和"循環(huán)(looping)模式"

單例模式只接收一個事件,而循環(huán)模式可以接收多個

量詞(Quantifier)
可以在一個個體模式后追加量詞,也就是指定循環(huán)次數(shù)

//匹配出現(xiàn)4次
start.times(4)
//匹配出現(xiàn)2/3/4次
start.time(2,4).greedy
//匹配出現(xiàn)0或者4次
start.times(4).optional
//匹配出現(xiàn)1次或者多次
start.oneOrMore
//匹配出現(xiàn)2,3,4次
start.times(2,4)
//匹配出現(xiàn)0次,2次或者多次,并且盡可能多的重復(fù)匹配
start.timesOrMore(2),optional.greedy

條件(Condition)

  1. 每個模式都需要指定觸發(fā)條件,作為模式是否接受事件進入的判斷依據(jù)
  2. CEP中的個體模式主要通過調(diào)用.where(),.or()和.until()來指定條件
  3. 按不同的調(diào)用方式,可以分成以下幾類
    1)簡單條件(Simple Condition)
    通過.where()方法對事件中的字段進行判斷篩選,決定是否接受該事件
start.where(new SimpleCondition<Event>){
  @Override
  public boolean filter(Event value) throws Exception{
    return value.getName.startsWith("foo");
  }
}

2)組合條件(Combining Condition)
將簡單條件進行合并;.or()方法表示或邏輯相連,where的直接組合就是AND

pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)

3)終止條件(Stop Condition)
如果使用了oneOrMore或者oneOrMore.optional,建議使用.until()作為終止條件,以便清理狀態(tài)

4)迭代條件(Iterative Condition)
能夠?qū)δJ街八薪邮盏氖录M行處理
可以調(diào)用ctx.getEventsForPattern("name")

.where(new IterativeCondition<Event>(){...})

3.2 組合模式(Combining Patterns)

組合模式(Combining Patterns)也叫模式序列。
1)很多個體模式組合起來,就形成了整個的模式序列
2)模式序列必須以一個"初始模式"開始

Pattern<Event, Event> start = Pattern.<Event>begin("start")
image.png
  1. 嚴(yán)格近鄰(Strict Contiguity)
    1)所有事件按照嚴(yán)格的順序出現(xiàn),中間沒有任何不匹配的事件,由.next()指定
    2)例如對于模式"a next b",事件序列[a,c,b1,b2]沒有匹配

  2. 寬松近鄰(Relaxed Contiguity)
    1)允許中間出現(xiàn)不匹配的事件,由.followedBy()指定
    2)例如對于模式"a followedBy b",事件序列[a,c,b1,b2]匹配為[a,b1]

  3. 非確定性寬松近鄰(Non-Deterministic Relaxed Contiguity)
    1)進一步放寬條件,之前已經(jīng)匹配過的事件也可以再次使用,由.followByAny()指定
    2)例如對于模式"a followedAny b",事件序列[a,c,b1,b2]匹配為{a,b1},{a,b2}

  4. 除了以上模式序列外,還可以定義"不希望出現(xiàn)某種近鄰關(guān)系":
    1).notNext() 不嚴(yán)格近鄰
    2).notFollowedBy()不在兩個事件之間發(fā)生
    (eg,a not FollowedBy c,a Followed By b,a希望之后出現(xiàn)b,且不希望ab之間出現(xiàn)c)

  5. 需要注意:
    1)所有模式序列必須以.begin()開始
    2)模式序列不能以.notFollowedBy()結(jié)束
    3)"not "類型的模式不能被optional 所修飾
    4)此外,還可以為模式指定事件約束,用來要求在多長時間內(nèi)匹配有效:
    next.within(Time.seconds(10))

3.3 模式組

3.3.1 模式的檢測

指定要查找的模式序列后,就可以將其應(yīng)用于輸入流以檢測潛在匹配
調(diào)用CEP.pattern(),給定輸入流和模式,就能得到一個PatternStream

DataStream<Event> input = ...
Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(...)...

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

3.3.2 匹配事件提取

  1. 創(chuàng)建PatternStrean之后,就可以應(yīng)用select或者flatselect方法,從檢測到的事件序列中提取事件了
  2. select()方法需要輸入一個select function作為參數(shù),每個成功匹配的事件序列都會調(diào)用它
  3. select() 以一個Map<String,List<IN]>> 來接收匹配到的事件序列,其中Key就是每個模式的名稱,而value就是所有接收到的事件的List類型
public OUT select(Map<String, List<IN>> pattern) throws Exception {
  IN startEvent = pattern.get("start").get(0);
  IN endEvent = pattern.get("end").get(0);
  return new OUT(startEvent, endEvent);
}

3.3.3 超時事件提取

當(dāng)一個模式通過within關(guān)鍵字定義了檢測窗口時間時,部分事件序列可能因為超過窗口長度而被丟棄;為了能夠處理這些超時的部分匹配,select和flatSelect API調(diào)用允許指定超時處理程序

超時處理程序會接收到目前為止由模式匹配到的所有事件,由一個OutputTag定義接收到的超時事件序列

PatternStream<Event> patternStream = CEP.pattern(input, pattern);
OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<ComplexEvent> flatResult = 
  patternStream.flatSelect(
  outputTag,
  new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
  new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<TimeoutEvent> timeoutFlatResult = 
  flatResult.getSideOutput(outputTag);

參考:

  1. https://www.bilibili.com/video/BV1qy4y1q728
  2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_15-cep
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容