flink - operator - RichAsyncFunction

描述
  1. 異步讀取外部數(shù)據(jù)源,并使用這些外部數(shù)據(jù)對(duì)主流數(shù)據(jù)進(jìn)行必要的轉(zhuǎn)換
  2. 要求外部數(shù)據(jù)源支持異步讀取
  3. 要求代碼中的client支持發(fā)起異步請(qǐng)求
輸入

DataStream

輸出

DataStream

用法
import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}

class MyAsyncFunc extends RichAsyncFunction[input數(shù)據(jù)類型, output數(shù)據(jù)類型] {
    lazy val client = ... // 建立外部數(shù)據(jù)源的客戶端

    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) // 指定executor,可選


    override def asyncInvoke(input: input數(shù)據(jù)類型, resultFuture: ResultFuture[output數(shù)據(jù)類型]): Unit = {
      val requryResult = ... // 異步查詢請(qǐng)求返回的結(jié)果,如Future
      resultFuture.complete(Iterable(requryResult獲取到的數(shù)據(jù),數(shù)據(jù)類型需要和output數(shù)據(jù)類型保持一致)) // 把異步請(qǐng)求的數(shù)據(jù)轉(zhuǎn)發(fā)給flink的異步框架,complete表示主動(dòng)完成flink的異步請(qǐng)求
    }
  }
  
val resultStream: DataStream[...] = AsyncDataStream.unorderedWait(dataStream, new MyAsyncFunc, 異步請(qǐng)求超時(shí)時(shí)間, TimeUnit.超時(shí)時(shí)間的單位, 異步并發(fā)數(shù))
示例

使用lettuce庫異步讀取redis的hash結(jié)構(gòu)數(shù)據(jù)關(guān)聯(lián)到dataStream中

依賴
<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>6.1.3.RELEASE</version>
</dependency>
class MyAsyncFunc extends RichAsyncFunction[(String, String), (String, String, String, String)] {
    lazy val client: RedisClient = RedisClient.create(RedisURI.builder()
      .withHost("...") // redis host
      .withPort(...) // redis port
      .withPassword("...".toCharArray) // redis password
      .build())
    lazy val conn: StatefulRedisConnection[String, String] = client.connect()
    lazy val async: RedisAsyncCommands[String, String] = conn.async()

    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(input: (String, String), resultFuture: ResultFuture[(String, String, String, String)]): Unit = {
      val category_name_fut: RedisFuture[String] = async.hget(s"category.${input._1}", "category_name")
      val item_name_fut: RedisFuture[String] = async.hget(s"item.${input._2}", "item_name")

    // 使用thenCombineAsync合并兩個(gè)redisFuture,如果查詢單個(gè)維度的話直接在redisFuture后執(zhí)行thenAccept即可
      category_name_fut.thenCombineAsync(item_name_fut,
        (t: String, u: String) => {
          t + "," + u  // BiFunction的返回類型需要與category_name_fut的值類型一致
        })
        .thenAccept(x => resultFuture.complete(Iterable((input._1, x.toString.split(",")(0), input._2, x.toString.split(",")(1)))))
    }
 }
 
 
val resultStream = AsyncDataStream.unorderedWait(dataStream, new MyAsyncFunc, 10000, TimeUnit.MILLISECONDS, 100)

輸入:(1,1)
輸出: (1,熟食,1,對(duì)夾)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 概述 2019 年是大數(shù)據(jù)實(shí)時(shí)計(jì)算領(lǐng)域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (內(nèi)部的 Flin...
    Yobhel閱讀 1,912評(píng)論 0 33
  • 基礎(chǔ)概念考察 一、 簡單介紹一下 Flink Flink 是一個(gè)框架和分布式處理引擎,用于對(duì)無界和有界數(shù)據(jù)流進(jìn)行有...
    Tim在路上閱讀 874評(píng)論 0 9
  • 概述 2019 年是大數(shù)據(jù)實(shí)時(shí)計(jì)算領(lǐng)域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (內(nèi)部的 Flin...
    王知無閱讀 3,335評(píng)論 2 11
  • 基礎(chǔ)概念考察 一、 簡單介紹一下 Flink Flink 是一個(gè)框架和分布式處理引擎,用于對(duì)無界和有界數(shù)據(jù)流進(jìn)行有...
    Tim在路上閱讀 16,309評(píng)論 0 8
  • 今天青石的票圈出鏡率最高的,莫過于張藝謀的新片終于定檔了。 一張滿溢著水墨風(fēng)的海報(bào)一次次的出現(xiàn)在票圈里,也就是老謀...
    青石電影閱讀 10,891評(píng)論 1 2

友情鏈接更多精彩內(nèi)容