一.什么是CEP
復(fù)雜事件處理(Complex Event Processing,CEP)
Flink CEP是在Flink中實現(xiàn)的復(fù)雜事件處理(CEP)庫
CEP允許在無休止的事件流中檢測事件模式,讓我們有機會掌握數(shù)據(jù)中重要的部分
一個或多個由簡單事件構(gòu)成的事件流通過一定的規(guī)則匹配,然后輸出用戶想得到的數(shù)據(jù)——滿足規(guī)則的復(fù)雜事件
二.CEP特點

目標(biāo):從有序的簡單事件流中發(fā)現(xiàn)一些高階特征
輸入:一個或多個由簡單事件構(gòu)成的事件流
處理:識別簡單事件之間的內(nèi)在聯(lián)系,多個符合一定規(guī)則的簡單事件構(gòu)成復(fù)雜事件
輸出:滿足規(guī)則的復(fù)雜事件
三. Pattern API
處理事件的規(guī)則,被叫做"模式"(Pattern)
Flink CEP提供了Pattern API,用于對輸入流數(shù)據(jù)進行復(fù)雜事件規(guī)則定義,用來提取符合規(guī)則的時間序列
DataStream<Event> input = ...
// 定義一個Pattern
Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(...)
.next("middle").subtype(SubEvent.class).where(...)
.followedBy("end").where(...);
// 將創(chuàng)建好的Pattern應(yīng)用到輸入事件流上
PatternStream<Event> patternStream = CEP.pattern(input,pattern);
// 檢出匹配事件序列,處理得到結(jié)果
DataStream<Alert> result = patternStream.select(...);
3.1 個體模式(Individual Patterns)
組成復(fù)雜規(guī)則的每一個單獨的模式定義,就是"個體模式"
start.times(3).where(new SimpleCondition<Event>() {...})
個體模式可以包括"單例(singleton)模式"和"循環(huán)(looping)模式"
單例模式只接收一個事件,而循環(huán)模式可以接收多個
量詞(Quantifier)
可以在一個個體模式后追加量詞,也就是指定循環(huán)次數(shù)
//匹配出現(xiàn)4次
start.times(4)
//匹配出現(xiàn)2/3/4次
start.time(2,4).greedy
//匹配出現(xiàn)0或者4次
start.times(4).optional
//匹配出現(xiàn)1次或者多次
start.oneOrMore
//匹配出現(xiàn)2,3,4次
start.times(2,4)
//匹配出現(xiàn)0次,2次或者多次,并且盡可能多的重復(fù)匹配
start.timesOrMore(2),optional.greedy
條件(Condition)
- 每個模式都需要指定觸發(fā)條件,作為模式是否接受事件進入的判斷依據(jù)
- CEP中的個體模式主要通過調(diào)用.where(),.or()和.until()來指定條件
- 按不同的調(diào)用方式,可以分成以下幾類
1)簡單條件(Simple Condition)
通過.where()方法對事件中的字段進行判斷篩選,決定是否接受該事件
start.where(new SimpleCondition<Event>){
@Override
public boolean filter(Event value) throws Exception{
return value.getName.startsWith("foo");
}
}
2)組合條件(Combining Condition)
將簡單條件進行合并;.or()方法表示或邏輯相連,where的直接組合就是AND
pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)
3)終止條件(Stop Condition)
如果使用了oneOrMore或者oneOrMore.optional,建議使用.until()作為終止條件,以便清理狀態(tài)
4)迭代條件(Iterative Condition)
能夠?qū)δJ街八薪邮盏氖录M行處理
可以調(diào)用ctx.getEventsForPattern("name")
.where(new IterativeCondition<Event>(){...})
3.2 組合模式(Combining Patterns)
組合模式(Combining Patterns)也叫模式序列。
1)很多個體模式組合起來,就形成了整個的模式序列
2)模式序列必須以一個"初始模式"開始
Pattern<Event, Event> start = Pattern.<Event>begin("start")

嚴(yán)格近鄰(Strict Contiguity)
1)所有事件按照嚴(yán)格的順序出現(xiàn),中間沒有任何不匹配的事件,由.next()指定
2)例如對于模式"a next b",事件序列[a,c,b1,b2]沒有匹配寬松近鄰(Relaxed Contiguity)
1)允許中間出現(xiàn)不匹配的事件,由.followedBy()指定
2)例如對于模式"a followedBy b",事件序列[a,c,b1,b2]匹配為[a,b1]非確定性寬松近鄰(Non-Deterministic Relaxed Contiguity)
1)進一步放寬條件,之前已經(jīng)匹配過的事件也可以再次使用,由.followByAny()指定
2)例如對于模式"a followedAny b",事件序列[a,c,b1,b2]匹配為{a,b1},{a,b2}除了以上模式序列外,還可以定義"不希望出現(xiàn)某種近鄰關(guān)系":
1).notNext() 不嚴(yán)格近鄰
2).notFollowedBy()不在兩個事件之間發(fā)生
(eg,a not FollowedBy c,a Followed By b,a希望之后出現(xiàn)b,且不希望ab之間出現(xiàn)c)需要注意:
1)所有模式序列必須以.begin()開始
2)模式序列不能以.notFollowedBy()結(jié)束
3)"not "類型的模式不能被optional 所修飾
4)此外,還可以為模式指定事件約束,用來要求在多長時間內(nèi)匹配有效:
next.within(Time.seconds(10))
3.3 模式組
3.3.1 模式的檢測
指定要查找的模式序列后,就可以將其應(yīng)用于輸入流以檢測潛在匹配
調(diào)用CEP.pattern(),給定輸入流和模式,就能得到一個PatternStream
DataStream<Event> input = ...
Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(...)...
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
3.3.2 匹配事件提取
- 創(chuàng)建PatternStrean之后,就可以應(yīng)用select或者flatselect方法,從檢測到的事件序列中提取事件了
- select()方法需要輸入一個select function作為參數(shù),每個成功匹配的事件序列都會調(diào)用它
- select() 以一個Map<String,List<IN]>> 來接收匹配到的事件序列,其中Key就是每個模式的名稱,而value就是所有接收到的事件的List類型
public OUT select(Map<String, List<IN>> pattern) throws Exception {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0);
return new OUT(startEvent, endEvent);
}
3.3.3 超時事件提取
當(dāng)一個模式通過within關(guān)鍵字定義了檢測窗口時間時,部分事件序列可能因為超過窗口長度而被丟棄;為了能夠處理這些超時的部分匹配,select和flatSelect API調(diào)用允許指定超時處理程序
超時處理程序會接收到目前為止由模式匹配到的所有事件,由一個OutputTag定義接收到的超時事件序列
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<ComplexEvent> flatResult =
patternStream.flatSelect(
outputTag,
new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<TimeoutEvent> timeoutFlatResult =
flatResult.getSideOutput(outputTag);