Flink實(shí)戰(zhàn)| Flink+Redis實(shí)時防刷接口作弊

隨著人口紅利的慢慢削減,互聯(lián)網(wǎng)產(chǎn)品的廝殺愈加激烈,大家開始看好下沉市場的潛力,拼多多,趣頭條等廠商通過拉新獎勵,購物優(yōu)惠等政策率先搶占用戶,壯大起來。其他各廠商也緊隨其后,紛紛推出自己產(chǎn)品的極速版,如今日頭條極速版,騰訊新聞極速版等,也通過拉新獎勵,閱讀獎勵等政策來吸引用戶。

對于這類APP,實(shí)時風(fēng)控是必不可少的,一個比較常見的實(shí)時風(fēng)控場景就是防刷接口作弊。刷接口是黑產(chǎn)的一種作弊手段,APP上的各種操作,一般都會對應(yīng)后臺的某個接口,用戶操作APP數(shù)據(jù)就會通過接口上報到后臺,但如果黑產(chǎn)通過破解獲取到了APP的新增用戶接口,那他們就能跳過登陸APP步驟直接調(diào)后臺接口構(gòu)造虛假數(shù)據(jù)牟利了。對于這類業(yè)務(wù),我們可以通過Flink + Redis來實(shí)現(xiàn)實(shí)時防刷接口的功能。數(shù)據(jù)流圖如下所示:



刷接口作弊一般是越過登陸APP操作,直接調(diào)Server端的接口發(fā)數(shù)據(jù),這些用戶在APP的上報日志里面就不存在,那我們可以通過Flink將APP實(shí)時上報上來的新增用戶寫入Redis中,然后Server端將接口上報上來的用戶與Redis里的用戶進(jìn)行比對,如果不在Redis里面則判為刷接口用戶。

對于這個需求,得要求實(shí)時計算引擎能達(dá)到毫秒級延遲,否則會造成用戶的誤判和影響用戶體驗(yàn)。為此我們選擇了Flink作為實(shí)時計算引擎。

主要代碼邏輯如下:

//配置flink運(yùn)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//val env = StreamExecutionEnvironment.createLocalEnvironment()
env.enableCheckpointing(1000 * 60 * 5)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000 * 60 * 3)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setStateBackend(new FsStateBackend(checkPointPath))
env.getConfig.setLatencyTrackingInterval(1000)
env.getConfig.registerTypeWithKryoSerializer(classOf[Log], classOf[ProtobufSerializer])
env.setStreamTimeCharacteristic(EventTime)
env.setParallelism(parallel)
env.getConfig.setLatencyTrackingInterval(1000)

//kafka source,實(shí)時消費(fèi)kafka中日志解析出用戶id
val stream = env.addSource(new FlinkKafkaConsumer010[Array[Log]](topic, new LogDeserializationSchema(), properties))
val data = stream.flatMap(x => x)
  .map(log =>{
    val userid = log.getUid.getUuid
    val current_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
    (userid,current_time)
  }).filter(record=>{
  val userid = record._1
  var flag = false
  if(userid != null && !"".equals(userid)){
    flag = true
  }
  flag
})

//redis sink,將APP上報日志的用戶id寫入redis供server端匹配
data.addSink(new RedisSink[(String, String)](getJedisClusterConfig, new RedisSinkMapper))
env.execute("newsinfo_active_userid_to_redis")

其中比較重要的幾點(diǎn):

1 構(gòu)造kafka source

val stream = env.addSource(new FlinkKafkaConsumer010[Array[Log]](topic, new LogDeserializationSchema(), properties))

一般APP上報的都是序列化的數(shù)據(jù),我們需要定義反序列化方法,LogDeserializationSchema 是一個protobuf類型的反序列化方法。

//將kafka中的數(shù)據(jù)解析為google protobuf 的Log,一個message可能包含多條Log
class LogDeserializationSchema extends AbstractDeserializationSchema[Array[Log]] {
  override def deserialize(message: Array[Byte]): Array[Log] = {
    val data = ArrayBuffer[Log]()
    val input = new ByteArrayInputStream(message)
    while (input.available() > 0) {
      try {
        data += Log.parseDelimitedFrom(input)
      } catch {
        case _: Throwable =>
      }
    }
    input.close()
    data.toArray
  }
}

2 redis sink

這里用的是網(wǎng)上開源的flink-connector-redis依賴庫。
更多相關(guān)內(nèi)容見:http://bahir.apache.org/docs/flink/current/flink-streaming-redis

Maven依賴如下

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1-SNAPSHOT</version>
</dependency>

Redis Sink 提供用于向Redis發(fā)送數(shù)據(jù)的接口的類。接收器可以使用三種不同的方法與不同類型的Redis環(huán)境進(jìn)行通信:
單Redis服務(wù)器
Redis集群
Redis Sentinel

Redis Sink 核心類是 RedisMappe 是一個接口,使用時我們要編寫自己的redis操作類實(shí)現(xiàn)這個接口中的三個方法,如下所示:

class RedisExampleMapper extends RedisMapper[(String, String)]{
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
  }

  override def getKeyFromData(data: (String, String)): String = data._1

  override def getValueFromData(data: (String, String)): String = data._2
}
val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))

使用RedisCommand設(shè)置數(shù)據(jù)結(jié)構(gòu)類型時和redis結(jié)構(gòu)對應(yīng)關(guān)系。

以上我們利用 Flink + Redis 實(shí)時了一個基本的實(shí)時防刷接口模型。

訂閱關(guān)注微信公眾號《大數(shù)據(jù)技術(shù)進(jìn)階》,及時獲取更多大數(shù)據(jù)架構(gòu)和應(yīng)用相關(guān)技術(shù)文章!


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 我叫純,父母希望我永遠(yuǎn)純純的。后來都變了樣! 我外婆有三個孩子,我媽是最窮的。 小時候我很崇拜大舅和小姨,覺得他們...
    小星T_T閱讀 178評論 0 0
  • 時光流逝匆匆,歲月已不安好,過了這場高考,有些人真的不會再見到了。 羨慕之前那個年輕勇敢的自己,遇到喜歡的人可以大...
    沐雨muyu閱讀 338評論 0 0
  • 今天是我們在意大利的第三天,我和媽媽起床了。我掙扎著從溫暖的床上起來,走到冰涼的地板上,迅速的把我的衣服拿起來并穿...
    許小熊閱讀 655評論 0 12

友情鏈接更多精彩內(nèi)容