Flink - CEP分析攻擊行為

在這樣一種場(chǎng)景,用戶(hù)的登錄行為數(shù)據(jù)都會(huì)以LoginEvent的行式記錄下來(lái),每次失敗或者成功以及錯(cuò)誤都會(huì)記錄下來(lái),一般客戶(hù)端都會(huì)進(jìn)行檢驗(yàn),正常的用戶(hù)不可能在一秒鐘之內(nèi)登錄錯(cuò)誤多次,這時(shí)候我就得懷疑這部分?jǐn)?shù)據(jù)是不是機(jī)器對(duì)用戶(hù)的密碼進(jìn)行暴力破解,如果有需要我們得將這些攻擊IP進(jìn)行封鎖。

Flink - CEP 優(yōu)點(diǎn)

復(fù)雜性:多個(gè)流join,窗口聚合,事件序列或patterns檢測(cè)
低延遲:秒或毫秒級(jí)別,比如做信用卡盜刷檢測(cè),或攻擊檢測(cè)
高吞吐:每秒上萬(wàn)條消息

執(zhí)行流程

Flink-CEP監(jiān)控

用戶(hù)的登錄日志數(shù)據(jù)會(huì)以實(shí)時(shí)的方式傳遞給Flink,常用的有Kafka,MQ等消息中間件。

接著使用Flink-CEP進(jìn)行模式匹配,匹配到了就會(huì)發(fā)出告警處理。

案例

用戶(hù)登錄日志格式如下表

時(shí)間 用戶(hù)編號(hào) IP地扯 登錄類(lèi)型
2018-11-19T12:12:12 1 192.168.0.1 fail
2018-11-19T12:12:12 1 192.168.0.1 fail
2018-11-19T12:12:12 1 192.168.0.1 fail
2018-11-19T12:12:12 2 192.168.10,10 success

依賴(lài)

compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: "1.6.2"
compile group: 'org.apache.flink', name: 'flink-cep-scala_2.11', version: "1.6.2"

使用流式處理環(huán)境,并模擬上面的登錄日志

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<LoginEvent> loginEventStream = env.fromCollection(Arrays.asList(
                new LoginEvent("1","192.168.0.1","fail"),
                new LoginEvent("1","192.168.0.1","fail"),
                new LoginEvent("1","192.168.0.1","fail"),
                new LoginEvent("2","192.168.10,10","success")
        ));

定義一個(gè)登錄日志的POJO

public static class LoginEvent implements Serializable {
        private String userId;//用戶(hù)ID
        private String ip;//登錄IP
        private String type;//登錄類(lèi)型

        public LoginEvent() {
        }

        public LoginEvent(String userId, String ip, String type) {
            this.userId = userId;
            this.ip = ip;
            this.type = type;
        }
        // gets sets
}

再定義一個(gè)告警的POJO

public static class LoginWarning implements Serializable {
        private String userId;
        private String type;
        private String ip;

        public LoginWarning() {
        }

        public LoginWarning(String userId, String type, String ip) {
            this.userId = userId;
            this.type = type;
            this.ip = ip;
        }
}

開(kāi)始定義匹配模式,首先為第條日志定義一個(gè)first名稱(chēng),如果滿(mǎn)足第一個(gè)where條件,則進(jìn)入下一個(gè)監(jiān)聽(tīng)事件second,如果在 1秒 鐘之內(nèi)兩個(gè)模式都滿(mǎn)足,則成為loginFail告警。

Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>
                begin("begin")
                .where(new IterativeCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent, Context context) throws Exception {
                        return loginEvent.getType().equals("fail");
                    }
                })
                .next("next")
                .where(new IterativeCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent, Context context) throws Exception {
                        return loginEvent.getType().equals("fail");
                    }
                })
                .within(Time.seconds(1));

登錄失敗事件模式定義好了之后,我們就可以把它應(yīng)用到數(shù)據(jù)流當(dāng)中了。

因?yàn)槲覀兪轻槍?duì)用戶(hù)這個(gè)維度進(jìn)行監(jiān)聽(tīng)的,所以我們需要對(duì)用戶(hù)進(jìn)行分組,以便可以鎖定用戶(hù)IP。

PatternStream<LoginEvent> patternStream = CEP.pattern(
                loginEventStream.keyBy(LoginEvent::getUserId),
                loginFailPattern);

所匹配的到事件會(huì)以一個(gè)Map<String, List<LoginEvent>>返回,key為事件名稱(chēng),value為匹配的數(shù)據(jù)列表。

DataStream<LoginWarning> loginFailDataStream = patternStream.select((Map<String, List<LoginEvent>> pattern) -> {
            List<LoginEvent> first = pattern.get("begin");
            List<LoginEvent> second = pattern.get("next");

我們可以將告警事件直接打印出來(lái)

loginFailDataStream.print();

最終會(huì)產(chǎn)生如下兩條告警

6> LoginWarning{userId='1', type='192.168.0.2', ip='fail'}
6> LoginWarning{userId='1', type='192.168.0.3', ip='fail'}

完整項(xiàng)目代碼 github傳送門(mén)

附送Scala版本

Scala 版本就是那么簡(jiǎn)潔明了

object FlinkLoginFail {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val loginEventStream = env.fromCollection(List(
      new LoginEvent("1", "192.168.0.1", "fail"),
      new LoginEvent("1", "192.168.0.2", "fail"),
      new LoginEvent("1", "192.168.0.3", "fail"),
      new LoginEvent("2", "192.168.10,10", "success")
    ))

    val loginFailPattern = Pattern.begin[LoginEvent]("begin")
      .where(_.getType.equals("fail"))
      .next("next")
      .where(_.getType.equals("fail"))
      .within(Time.seconds(1))

    val patternStream = CEP.pattern(loginEventStream, loginFailPattern)
    
    val loginFailDataStream = patternStream
      .select((pattern: Map[String, Iterable[LoginEvent]]) => {
        val first = pattern.getOrElse("begin", null).iterator.next()
        val second = pattern.getOrElse("next", null).iterator.next()

        new LoginWarning(second.getUserId, second.getIp, second.getType)
      })

    loginFailDataStream.print

    env.execute
  }

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容