什么是FlinkCEP
FlinkCEP(Complex event processing for Flink) 是在Flink實(shí)現(xiàn)的復(fù)雜事件處理庫. 它可以讓你在無界流或有界流中檢測(cè)出特定的數(shù)據(jù),有機(jī)會(huì)掌握數(shù)據(jù)中重要的那部分。
是一種基于動(dòng)態(tài)環(huán)境中事件流的分析技術(shù),事件在這里通常是有意義的狀態(tài)變化,通過分析事件間的關(guān)系,利用過濾、關(guān)聯(lián)、聚合等技術(shù),根據(jù)事件間的時(shí)序關(guān)系和聚合關(guān)系制定檢測(cè)規(guī)則,持續(xù)地從事件流中查詢出符合要求的事件序列,最終分析得到更復(fù)雜的復(fù)合事件。
- 目標(biāo):從有序的簡(jiǎn)單事件流中發(fā)現(xiàn)一些高階特征
- 輸入:一個(gè)或多個(gè)由簡(jiǎn)單事件構(gòu)成的事件流
- 處理:識(shí)別簡(jiǎn)單事件之間的內(nèi)在聯(lián)系,多個(gè)符合一定規(guī)則的簡(jiǎn)單事件構(gòu)成復(fù)雜事件
- 輸出:滿足規(guī)則的復(fù)雜事件

總結(jié):FlinkCEP 用于編寫規(guī)則,實(shí)現(xiàn)對(duì)流數(shù)據(jù)進(jìn)行處理,最終獲取我們想要的結(jié)果。
Flink CEP應(yīng)用場(chǎng)景
風(fēng)險(xiǎn)控制
對(duì)用戶異常行為模式進(jìn)行實(shí)時(shí)檢測(cè),當(dāng)一個(gè)用戶發(fā)生了不該發(fā)生的行為,判定這個(gè)用戶是不是有違規(guī)操作的嫌疑。策略營(yíng)銷
用預(yù)先定義好的規(guī)則對(duì)用戶的行為軌跡進(jìn)行實(shí)時(shí)跟蹤,對(duì)行為軌跡匹配預(yù)定義規(guī)則的用戶實(shí)時(shí)發(fā)送相應(yīng)策略的推廣。運(yùn)維監(jiān)控
靈活配置多指標(biāo)、多依賴來實(shí)現(xiàn)更復(fù)雜的監(jiān)控模式。
Flink CEP入門案例
- 導(dǎo)入依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.13.0</version>
</dependency>
- 給定一個(gè)流(必須指定水?。?/li>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
// 讀取數(shù)據(jù)封裝成javaBean
SingleOutputStreamOperator<Sensor> returns = source.flatMap((FlatMapFunction<String, Sensor>) (value, out) -> {
out.collect(new Sensor(value.split(",")));
}).returns(Types.POJO(Sensor.class));
// 添加水印
SingleOutputStreamOperator<Sensor> operator = returns.assignTimestampsAndWatermarks(
// 最大亂序程度
WatermarkStrategy.<Sensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((SerializableTimestampAssigner<Sensor>) (element, recordTimestamp) -> element.getTs() * 1000)
);
為了方便,將數(shù)據(jù)封裝成了對(duì)象
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Sensor {
private Integer id;
private Long ts;
private Long vc;
public Sensor(String[] args){
this.id=Integer.valueOf(args[0]);
this.ts=Long.valueOf(args[1]);
this.vc=Long.valueOf(args[2]);
}
}
- 定義規(guī)則
// 定義模式
Pattern<Sensor, Sensor> beginPattern = Pattern.<Sensor>begin("begin")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
});
Pattern 別導(dǎo)錯(cuò)了
import org.apache.flink.cep.pattern.Pattern;
begin :cep 都是從 begin開始。
<X> Pattern<X, X> begin(final String name)
每個(gè)模式都必須有一個(gè)唯一的名稱,稍后您可以使用該名稱來標(biāo)識(shí)匹配的事件。
模式名稱不能包含字符":"。
begin("begin")
- 將CEP 應(yīng)用到流中
PatternStream<Sensor> pattern = CEP.pattern(watermarks, beginPattern);
SingleOutputStreamOperator<String> select = pattern.select((PatternSelectFunction<Sensor, String>)
// 從"begin" 名稱中,獲取模式里的數(shù)據(jù)
pattern1 -> pattern1.get("begin").toString());
- 返回結(jié)果
select.print();
env.execute();
- sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
- 程序
@Test
public void test01() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.readTextFile("input/sensor.txt");
//DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
// 讀取數(shù)據(jù)封裝成javaBean
SingleOutputStreamOperator<Sensor> flatMapStream = source.flatMap((FlatMapFunction<String, Sensor>) (value, out) -> {
out.collect(new Sensor(value.split(",")));
}).returns(Types.POJO(Sensor.class));
// 添加水印
SingleOutputStreamOperator<Sensor> watermarks = flatMapStream.assignTimestampsAndWatermarks(
// 最大亂序程度
WatermarkStrategy.<Sensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((SerializableTimestampAssigner<Sensor>) (element, recordTimestamp) -> element.getTs() * 1000)
);
// 定義模式
Pattern<Sensor, Sensor> beginPattern = Pattern.
<Sensor>begin("begin")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
});
// CEP
PatternStream<Sensor> pattern = CEP.pattern(watermarks, beginPattern);
SingleOutputStreamOperator<String> select = pattern.select((PatternSelectFunction<Sensor, String>)
pattern1 -> pattern1.get("begin").toString());
select.print();
env.execute();
}
- 運(yùn)行結(jié)果,都是
sensor_1的數(shù)據(jù)結(jié)果。
11> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
10> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
12> [Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
模式API
單個(gè)模式
單個(gè)模式可以是單例模式或者循環(huán)模式.
單例模式
單例模式只接受一個(gè)事件. 默認(rèn)情況模式都是單例模式.
前面的例子就是一個(gè)單例模式
循環(huán)模式
循環(huán)模式可以接受多個(gè)事件.
單例模式配合上量詞就是循環(huán)模式.(非常類似我們熟悉的正則表達(dá)式)
- times(n)
// 定義模式
Pattern<Sensor, Sensor> beginPattern = Pattern.
<Sensor>begin("begin")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
}).times(2);
用于循環(huán)匹配,必須滿足N條數(shù)。
匹配規(guī)則:設(shè) time(2)
原始數(shù)據(jù)
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
首先匹配到 sensor_1,1,10 ,因?yàn)楸仨氁獮?,所以就會(huì)一直等著,直到遇到另一個(gè)sensor_1的數(shù)據(jù)
(sensor_1,1,10 | sensor_1,2,20)
sensor_1,1,10 匹配到之后,就不再匹配了,sensor_1,2,20繼續(xù)往后匹配,直到遇到sensor_1,4,40
(sensor_1,2,20 | sensor_1,4,40)
sensor_1,4,40 同樣等著,直到遇到下一個(gè)sensor_1
(sensor_1,4,40 | sensor_1,6,60)
因?yàn)榱魇教幚?,所?code>sensor_1與
sensor_1可能并不是有序的,中間可能相差好幾條數(shù)據(jù),但是匹配規(guī)則,就是滿足了
time(n)的退出,否則一直等著。若程序結(jié)束都沒有遇到,就自動(dòng)釋放(不輸出,因?yàn)椴粷M足time(n))。
運(yùn)行結(jié)果
9> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
11> [Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
10> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
-
timesOrMore(n)
匹配規(guī)則和times(n)類似,timesOrMore(n)匹配規(guī)則則是n條及n條以上。
原始數(shù)據(jù)
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
匹配規(guī)則:設(shè) timesOrMore(2)
第一次進(jìn)來的是sensor_1,1,10,不滿足timesOrMore的條件,所以等著。
第二次:進(jìn)來sensor_1,2,20,滿足timesOrMore的條件,記錄起來。sensor_1,2,20需要等待下一次sensor_1的數(shù)據(jù)
(sensor_1,1,10 | sensor_1,2,20)
第三次:sensor_1,4,40,sensor_1,1,10 滿足, sensor_1,2,20也滿足
(sensor_1,1,10 | sensor_1,2,20 | ensor_1,4,40)
(sensor_1,2,20 | ensor_1,4,40)
第四次:sensor_1,6,60,sensor_1,1,10 滿足, sensor_1,2,20 滿足,ensor_1,4,40 滿足
(sensor_1,1,10 | sensor_1,2,20 | ensor_1,4,40 | sensor_1,6,60)
(sensor_1,2,20 | ensor_1,4,40 | sensor_1,6,60)
(ensor_1,4,40 | sensor_1,6,60)
最終輸出結(jié)果
多并行度,輸出順序不一致,但是整體結(jié)果和上面推斷是一致的。
10> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
15> [Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
14> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
12> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
11> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
13> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
-
.times(n,m)
或許你已經(jīng)發(fā)現(xiàn)了timesOrMore的弊端,若數(shù)據(jù)中有10000條sensor_1,那么最后一個(gè)結(jié)果絕對(duì)有一萬條數(shù)據(jù)。這樣的結(jié)果,很有可能造成OOM。所以有必要的可以限定的以下次數(shù),比如使用times(n,m),它表示一個(gè)范圍,從n開始到m結(jié)束(包含m),比如 我們只要 2條,3條,4條的數(shù)據(jù),那么就可以寫成times(2,4)
原始數(shù)據(jù)
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
匹配規(guī)則:設(shè) times(2,3)
第一次:sensor_1,1,10,不滿足,等著
第二次:sensor_1,2,20,sensor_1,1,10滿足,sensor_1,2,20等著
(sensor_1,1,10,sensor_1,2,20)
第三次: sensor_1,4,40,sensor_1,1,10滿足,sensor_1,2,20滿足,sensor_1,4,40等著
(sensor_1,1,10,sensor_1,2,20,sensor_1,4,40)
(sensor_1,2,20,sensor_1,4,40)
第四次,sensor_1,6,60, 因?yàn)樯舷逓?,sensor_1,1,10不再往下執(zhí)行,sensor_1,2,20滿足,sensor_1,4,40滿足,sensor_1,6,60等著,直到遇到sensor_1
(sensor_1,2,20,sensor_1,4,40,sensor_1,6,60)
(sensor_1,4,40,sensor_1,6,60)
最終輸出
上限是3,所以列表長(zhǎng)度最大為3。
1> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
3> [Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
16> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
15> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
2> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
times(n,m) 與 timesOrMore(n) 的區(qū)別就是,timesOrMore(n) 沒有上限。
-
oneOrMore()
作用就是一次或多次,等價(jià)于timesOrMore(1)
條件
對(duì)每個(gè)模式你可以指定一個(gè)條件來決定一個(gè)進(jìn)來的事件是否被接受進(jìn)入這個(gè)模式,例如前面用到的where就是一種條件
- 迭代條件
這是最普遍的條件類型。使用它可以指定一個(gè)基于前面已經(jīng)被接受的事件的屬性或者它們的一個(gè)子集的統(tǒng)計(jì)數(shù)據(jù)來決定是否接受時(shí)間序列的條件。
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.<WaterSensor>begin("start")
.where(new IterativeCondition<WaterSensor>() {
@Override
public boolean filter(WaterSensor value, Context<WaterSensor> ctx) throws Exception {
return "sensor_1".equals(value.getId());
}
});
- 簡(jiǎn)單條件
這種類型的條件擴(kuò)展了前面提到的IterativeCondition類,它決定是否接受一個(gè)事件只取決于事件自身的屬性。
Pattern<WaterSensor, WaterSensor> pattern = Pattern
.<WaterSensor>begin("start")
.where(new SimpleCondition<WaterSensor>() {
@Override
public boolean filter(WaterSensor value) throws Exception {
System.out.println(value);
return "sensor_1".equals(value.getId());
}
});
- 組合條件
把多個(gè)條件結(jié)合起來使用. 這適用于任何條件,你可以通過依次調(diào)用where()來組合條件。 最終的結(jié)果是每個(gè)單一條件的結(jié)果的邏輯AND。
如果想使用OR來組合條件,你可以像下面這樣使用or()方法。
匹配 sensor_1 和 sensor_7的數(shù)據(jù)
原始數(shù)據(jù)
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
程序
// 定義模式
Pattern<Sensor, Sensor> beginPattern = Pattern.
<Sensor>begin("begin")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
}).or(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_7".equals(value.getId());
}
});
運(yùn)行結(jié)果
4> [Sensor(id=sensor_7, ts=7, vc=30)]
1> [Sensor(id=sensor_1, ts=2, vc=20)]
2> [Sensor(id=sensor_1, ts=4, vc=40)]
16> [Sensor(id=sensor_1, ts=1, vc=10)]
3> [Sensor(id=sensor_1, ts=6, vc=60)]
若是要用and條件呢?CEP 并沒有and(),但是指定多個(gè)where(),作用和and一樣。
// 定義模式
Pattern<Sensor, Sensor> beginPattern = Pattern.
<Sensor>begin("begin")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
}).where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return value.getTs()<4;
}
});
運(yùn)行結(jié)果
14> [Sensor(id=sensor_1, ts=1, vc=10)]
15> [Sensor(id=sensor_1, ts=2, vc=20)]
- 停止條件(until)
如果使用循環(huán)模式(oneOrMore, timesOrMore), 可以指定一個(gè)停止條件, 否則有可能會(huì)內(nèi)存吃不消.
意思是滿足了給定的條件的事件出現(xiàn)后,就不會(huì)再有事件被接受進(jìn)入模式了。
原始數(shù)據(jù)
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
當(dāng)程序遇到sensor_2時(shí)就停止
// 定義模式
Pattern<Sensor, Sensor> beginPattern = Pattern.
<Sensor>begin("begin")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
}).timesOrMore(2).until(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_2".equals(value.getId());
}
});
運(yùn)行結(jié)果
8> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
異常:until 只能用在循環(huán)條件。
org.apache.flink.cep.pattern.MalformedPatternException: The until condition is only applicable to looping states.
組合模式
把多個(gè)單個(gè)模式組合在一起就是組合模式. 組合模式由一個(gè)初始化模式(.begin(...))開頭
- 原始數(shù)據(jù)
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
-
嚴(yán)格連續(xù)(嚴(yán)格緊鄰)
比如sensor_1與sensor_1之間必須相鄰的比如:sensor_1,1,10和sensor_1,2,20,sensor_1,2,20和sensor_1,4,40就不行,之間還相隔著sensor_2,3,30。
如何實(shí)現(xiàn)?使用next(),就表示嚴(yán)格緊鄰
// 定義模式
Pattern<Sensor, Sensor> beginPattern = Pattern
.<Sensor>begin("start")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
})
.next("next")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
});
輸出結(jié)果,只有sensor_1,1,10滿足
12> [Sensor(id=sensor_1, ts=1, vc=10)]
嚴(yán)格模式還有一個(gè)很重要特性,就是能夠解決亂序問題
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,3,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
試問:此時(shí)運(yùn)行sensor_1,2,20是否滿足?是否算得上與sensor_1,3,40為相鄰?
12> [Sensor(id=sensor_1, ts=1, vc=10)]
13> [Sensor(id=sensor_1, ts=2, vc=20)]
雖然再流輸入的順序上,
sensor_1,3,40是靠在sensor_2,4,30后的,但是從時(shí)間上來說,sensor_1,2,20和sensor_1,3,40是有序的。內(nèi)部上是有一個(gè)類似排序機(jī)制,但是在實(shí)時(shí)上,可能就不好說了,這里為了測(cè)試,用得離線的方式,所以是沒有問題的。
-
松散連續(xù)
忽略匹配的事件之間的不匹配的事件。
原始數(shù)據(jù)
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
followedBy("by"):*松散連續(xù)
// 定義模式
Pattern<Sensor, Sensor> beginPattern = Pattern
.<Sensor>begin("start")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
})
.followedBy("by")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
});
輸出結(jié)果
9> [Sensor(id=sensor_1, ts=1, vc=10)]
10> [Sensor(id=sensor_1, ts=2, vc=20)]
11> [Sensor(id=sensor_1, ts=4, vc=40)]
-
非確定的松散連續(xù)
更進(jìn)一步的松散連續(xù),允許忽略掉一些匹配事件的附加匹配
當(dāng)且僅當(dāng)數(shù)據(jù)為a,c,b,b時(shí),對(duì)于followedBy模式而言命中的為{a,b},對(duì)于followedByAny而言會(huì)有兩次命中{a,b},{a,b}
原始數(shù)據(jù)
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
// 定義模式
Pattern<Sensor, Sensor> beginPattern = Pattern
.<Sensor>begin("start")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
})
//.next("next")
.followedByAny("by")
.where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor value) throws Exception {
return "sensor_1".equals(value.getId());
}
});
輸出結(jié)果
6> [Sensor(id=sensor_1, ts=1, vc=10)]
8> [Sensor(id=sensor_1, ts=4, vc=40)]
4> [Sensor(id=sensor_1, ts=1, vc=10)]
3> [Sensor(id=sensor_1, ts=1, vc=10)]
5> [Sensor(id=sensor_1, ts=2, vc=20)]
7> [Sensor(id=sensor_1, ts=2, vc=20)]