在這樣一種場(chǎng)景,用戶(hù)的登錄行為數(shù)據(jù)都會(huì)以
LoginEvent的行式記錄下來(lái),每次失敗或者成功以及錯(cuò)誤都會(huì)記錄下來(lái),一般客戶(hù)端都會(huì)進(jìn)行檢驗(yàn),正常的用戶(hù)不可能在一秒鐘之內(nèi)登錄錯(cuò)誤多次,這時(shí)候我就得懷疑這部分?jǐn)?shù)據(jù)是不是機(jī)器對(duì)用戶(hù)的密碼進(jìn)行暴力破解,如果有需要我們得將這些攻擊IP進(jìn)行封鎖。
Flink - CEP 優(yōu)點(diǎn)
復(fù)雜性:多個(gè)流join,窗口聚合,事件序列或patterns檢測(cè)
低延遲:秒或毫秒級(jí)別,比如做信用卡盜刷檢測(cè),或攻擊檢測(cè)
高吞吐:每秒上萬(wàn)條消息
執(zhí)行流程

用戶(hù)的登錄日志數(shù)據(jù)會(huì)以實(shí)時(shí)的方式傳遞給Flink,常用的有Kafka,MQ等消息中間件。
接著使用Flink-CEP進(jìn)行模式匹配,匹配到了就會(huì)發(fā)出告警處理。
案例
用戶(hù)登錄日志格式如下表
| 時(shí)間 | 用戶(hù)編號(hào) | IP地扯 | 登錄類(lèi)型 |
|---|---|---|---|
| 2018-11-19T12:12:12 | 1 | 192.168.0.1 | fail |
| 2018-11-19T12:12:12 | 1 | 192.168.0.1 | fail |
| 2018-11-19T12:12:12 | 1 | 192.168.0.1 | fail |
| 2018-11-19T12:12:12 | 2 | 192.168.10,10 | success |
依賴(lài)
compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: "1.6.2"
compile group: 'org.apache.flink', name: 'flink-cep-scala_2.11', version: "1.6.2"
使用流式處理環(huán)境,并模擬上面的登錄日志
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<LoginEvent> loginEventStream = env.fromCollection(Arrays.asList(
new LoginEvent("1","192.168.0.1","fail"),
new LoginEvent("1","192.168.0.1","fail"),
new LoginEvent("1","192.168.0.1","fail"),
new LoginEvent("2","192.168.10,10","success")
));
定義一個(gè)登錄日志的POJO
public static class LoginEvent implements Serializable {
private String userId;//用戶(hù)ID
private String ip;//登錄IP
private String type;//登錄類(lèi)型
public LoginEvent() {
}
public LoginEvent(String userId, String ip, String type) {
this.userId = userId;
this.ip = ip;
this.type = type;
}
// gets sets
}
再定義一個(gè)告警的POJO
public static class LoginWarning implements Serializable {
private String userId;
private String type;
private String ip;
public LoginWarning() {
}
public LoginWarning(String userId, String type, String ip) {
this.userId = userId;
this.type = type;
this.ip = ip;
}
}
開(kāi)始定義匹配模式,首先為第條日志定義一個(gè)first名稱(chēng),如果滿(mǎn)足第一個(gè)where條件,則進(jìn)入下一個(gè)監(jiān)聽(tīng)事件second,如果在 1秒 鐘之內(nèi)兩個(gè)模式都滿(mǎn)足,則成為loginFail告警。
Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>
begin("begin")
.where(new IterativeCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent, Context context) throws Exception {
return loginEvent.getType().equals("fail");
}
})
.next("next")
.where(new IterativeCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent, Context context) throws Exception {
return loginEvent.getType().equals("fail");
}
})
.within(Time.seconds(1));
登錄失敗事件模式定義好了之后,我們就可以把它應(yīng)用到數(shù)據(jù)流當(dāng)中了。
因?yàn)槲覀兪轻槍?duì)用戶(hù)這個(gè)維度進(jìn)行監(jiān)聽(tīng)的,所以我們需要對(duì)用戶(hù)進(jìn)行分組,以便可以鎖定用戶(hù)IP。
PatternStream<LoginEvent> patternStream = CEP.pattern(
loginEventStream.keyBy(LoginEvent::getUserId),
loginFailPattern);
所匹配的到事件會(huì)以一個(gè)Map<String, List<LoginEvent>>返回,key為事件名稱(chēng),value為匹配的數(shù)據(jù)列表。
DataStream<LoginWarning> loginFailDataStream = patternStream.select((Map<String, List<LoginEvent>> pattern) -> {
List<LoginEvent> first = pattern.get("begin");
List<LoginEvent> second = pattern.get("next");
我們可以將告警事件直接打印出來(lái)
loginFailDataStream.print();
最終會(huì)產(chǎn)生如下兩條告警
6> LoginWarning{userId='1', type='192.168.0.2', ip='fail'}
6> LoginWarning{userId='1', type='192.168.0.3', ip='fail'}
完整項(xiàng)目代碼 github傳送門(mén)
附送Scala版本
Scala 版本就是那么簡(jiǎn)潔明了
object FlinkLoginFail {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val loginEventStream = env.fromCollection(List(
new LoginEvent("1", "192.168.0.1", "fail"),
new LoginEvent("1", "192.168.0.2", "fail"),
new LoginEvent("1", "192.168.0.3", "fail"),
new LoginEvent("2", "192.168.10,10", "success")
))
val loginFailPattern = Pattern.begin[LoginEvent]("begin")
.where(_.getType.equals("fail"))
.next("next")
.where(_.getType.equals("fail"))
.within(Time.seconds(1))
val patternStream = CEP.pattern(loginEventStream, loginFailPattern)
val loginFailDataStream = patternStream
.select((pattern: Map[String, Iterable[LoginEvent]]) => {
val first = pattern.getOrElse("begin", null).iterator.next()
val second = pattern.getOrElse("next", null).iterator.next()
new LoginWarning(second.getUserId, second.getIp, second.getType)
})
loginFailDataStream.print
env.execute
}
}
