描述
- 異步讀取外部數(shù)據(jù)源,并使用這些外部數(shù)據(jù)對(duì)主流數(shù)據(jù)進(jìn)行必要的轉(zhuǎn)換
- 要求外部數(shù)據(jù)源支持異步讀取
- 要求代碼中的client支持發(fā)起異步請(qǐng)求
輸入
DataStream
輸出
DataStream
用法
import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
class MyAsyncFunc extends RichAsyncFunction[input數(shù)據(jù)類型, output數(shù)據(jù)類型] {
lazy val client = ... // 建立外部數(shù)據(jù)源的客戶端
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) // 指定executor,可選
override def asyncInvoke(input: input數(shù)據(jù)類型, resultFuture: ResultFuture[output數(shù)據(jù)類型]): Unit = {
val requryResult = ... // 異步查詢請(qǐng)求返回的結(jié)果,如Future
resultFuture.complete(Iterable(requryResult獲取到的數(shù)據(jù),數(shù)據(jù)類型需要和output數(shù)據(jù)類型保持一致)) // 把異步請(qǐng)求的數(shù)據(jù)轉(zhuǎn)發(fā)給flink的異步框架,complete表示主動(dòng)完成flink的異步請(qǐng)求
}
}
val resultStream: DataStream[...] = AsyncDataStream.unorderedWait(dataStream, new MyAsyncFunc, 異步請(qǐng)求超時(shí)時(shí)間, TimeUnit.超時(shí)時(shí)間的單位, 異步并發(fā)數(shù))
示例
使用lettuce庫異步讀取redis的hash結(jié)構(gòu)數(shù)據(jù)關(guān)聯(lián)到dataStream中
依賴
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.3.RELEASE</version>
</dependency>
class MyAsyncFunc extends RichAsyncFunction[(String, String), (String, String, String, String)] {
lazy val client: RedisClient = RedisClient.create(RedisURI.builder()
.withHost("...") // redis host
.withPort(...) // redis port
.withPassword("...".toCharArray) // redis password
.build())
lazy val conn: StatefulRedisConnection[String, String] = client.connect()
lazy val async: RedisAsyncCommands[String, String] = conn.async()
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(input: (String, String), resultFuture: ResultFuture[(String, String, String, String)]): Unit = {
val category_name_fut: RedisFuture[String] = async.hget(s"category.${input._1}", "category_name")
val item_name_fut: RedisFuture[String] = async.hget(s"item.${input._2}", "item_name")
// 使用thenCombineAsync合并兩個(gè)redisFuture,如果查詢單個(gè)維度的話直接在redisFuture后執(zhí)行thenAccept即可
category_name_fut.thenCombineAsync(item_name_fut,
(t: String, u: String) => {
t + "," + u // BiFunction的返回類型需要與category_name_fut的值類型一致
})
.thenAccept(x => resultFuture.complete(Iterable((input._1, x.toString.split(",")(0), input._2, x.toString.split(",")(1)))))
}
}
val resultStream = AsyncDataStream.unorderedWait(dataStream, new MyAsyncFunc, 10000, TimeUnit.MILLISECONDS, 100)
輸入:(1,1)
輸出: (1,熟食,1,對(duì)夾)