Flink 異步I/O

1.為什么需要異步IO

flink在做實(shí)時(shí)處理時(shí),有時(shí)候需要和外部數(shù)據(jù)交互,但是通常情況下這個(gè)交互過(guò)程是同步的,這樣就會(huì)產(chǎn)生大量的等待時(shí)間;而異步操作可以在單個(gè)函數(shù)實(shí)例中同時(shí)處理多個(gè)請(qǐng)求,并且同時(shí)接收相應(yīng)。這樣等待時(shí)間就平均分?jǐn)偟搅硕鄠€(gè)請(qǐng)求上,大大減少了請(qǐng)求的等待時(shí)長(zhǎng),可以提高實(shí)時(shí)處理的吞吐量。


flink異步io.png

2.使用flink異步IO的先決條件

  • 需要所連接的數(shù)據(jù)庫(kù)支持異步客戶(hù)端
  • 在沒(méi)有異步客戶(hù)端的情況下,可以通過(guò)創(chuàng)建多個(gè)客戶(hù)端并使用線(xiàn)程池處理同步調(diào)用來(lái)嘗試將同步客戶(hù)端轉(zhuǎn)變?yōu)橛邢薜牟l(fā)客戶(hù)端

3. flink異步IO的使用步驟

  • 實(shí)現(xiàn)AsyncFunction接口;
  • 一個(gè)回調(diào),該函數(shù)取回操作的結(jié)果,然后將結(jié)果傳遞給ResultFuture;
  • 在DataStream上應(yīng)用異步IO操作。

4. 使用示例

import scala.concurrent._
import ExecutionContext.Implicits.global

/**
  * 使用scala并發(fā)包的Future模擬一個(gè)異步客戶(hù)端
  */
class DatabaseClient {
  def query: Future[Long] = Future {
    System.currentTimeMillis() / 1000
  }
}

/** 'AsyncFunction' 的一個(gè)實(shí)現(xiàn),向數(shù)據(jù)庫(kù)發(fā)送異步請(qǐng)求并設(shè)置回調(diào) 
  * 改編自官網(wǎng)實(shí)例
  */

class AsyncDatabaseRequest extends AsyncFunction[Int, (Int, Long)] {

  /** The database specific client that can issue concurrent requests with callbacks */
  lazy val client: DatabaseClient = new DatabaseClient

  /** The context used for the future callbacks */
  implicit lazy val executor: ExecutionContext =
    ExecutionContext.fromExecutor(Executors.directExecutor())

  override def asyncInvoke(str: Int,
                           resultFuture: ResultFuture[(Int, Long)]): Unit = {

    // issue the asynchronous request, receive a future for the result
    val resultFutureRequested: Future[Long] = client.query

    // set the callback to be executed once the request by the client is complete
    // the callback simply forwards the result to the result future
    resultFutureRequested.onSuccess {
      case result: Long => resultFuture.complete(Iterable((str, result)))
    }
  }
}

object AsynchronousIOTest {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val data: immutable.Seq[Int] = Range(1, 10)

    // 創(chuàng)建數(shù)據(jù)流
    val dataStream: DataStream[Int] = env.fromCollection(data)

     // 使用異步IO
     val asyn = AsyncDataStream.unorderedWait(
      dataStream,//執(zhí)行異步操作的DataStream
      new AsyncDatabaseRequest,//
      1000, TimeUnit.MILLISECONDS, //超時(shí)時(shí)間
      100 // 進(jìn)行中的異步請(qǐng)求的最大數(shù)量
    )

    asyn.print()

    env.execute("AsynchronousIOTest")

  }

}
結(jié)果順序

AsyncDataStream 有兩個(gè)靜態(tài)方法,orderedWait 和 unorderedWait,對(duì)應(yīng)了兩種輸出模式:有序和無(wú)序。

  • 有序:消息的發(fā)送順序與接受到的順序相同(包括 watermark ),也就是先進(jìn)先出。
  • 無(wú)序:
    在 ProcessingTime 的情況下,完全無(wú)序,先返回的結(jié)果先發(fā)送。
    在 EventTime 的情況下,watermark 不能超越消息,消息也不能超越 watermark,也就是說(shuō) watermark 定義的順序的邊界。在兩個(gè) watermark 之間的消息的發(fā)送是無(wú)序的,但是在watermark之后的消息不能先于該watermark之前的消息發(fā)送。
超時(shí)處理

當(dāng)異步IO請(qǐng)求超時(shí)時(shí),默認(rèn)情況下會(huì)引發(fā)異常并重新啟動(dòng)作業(yè)。如果要處理超時(shí),可以重寫(xiě)AsyncFunction#timeout方法。

  override def timeout(input: Int,
                       resultFuture: ResultFuture[(Int, Long)]): Unit =
    super.timeout(input, resultFuture)

容錯(cuò)保證

異步IO運(yùn)算符提供完全一次的容錯(cuò)保證。它在檢查點(diǎn)中存儲(chǔ)正在進(jìn)行的異步請(qǐng)求的記錄,并在從故障中恢復(fù)時(shí)恢復(fù)/重新觸發(fā)請(qǐng)求。

其他資料

關(guān)于flink異步IO的更多信息可以參考flink官網(wǎng)或者Flink 原理與實(shí)現(xiàn):Aysnc I/O這篇文章。

Futures和事件驅(qū)動(dòng)編程的知識(shí)可以參考《AKKA入門(mén)與實(shí)踐》這本書(shū)第二章的內(nèi)容:Actor與并發(fā)。


參考資料:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/asyncio.html
http://wuchong.me/blog/2017/05/17/flink-internals-async-io/
《AKKA入門(mén)與實(shí)踐》

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1 概述 流計(jì)算系統(tǒng)中經(jīng)常需要與外部系統(tǒng)進(jìn)行交互,我們通常的做法如向數(shù)據(jù)庫(kù)發(fā)送用戶(hù)a的查詢(xún)請(qǐng)求,然后等待結(jié)果返回,...
    薛定諤的貓Plus閱讀 4,861評(píng)論 0 5
  • 背景 Async I/O 是阿里巴巴貢獻(xiàn)給社區(qū)的一個(gè)呼聲非常高的特性,于1.2版本引入。主要目的是為了解決與外部系...
    尼小摩閱讀 1,180評(píng)論 0 3
  • Flink總結(jié) Flink簡(jiǎn)介 Apache Flink作為一款高吞吐量、低延遲的針對(duì)流數(shù)據(jù)和批數(shù)據(jù)的分布式實(shí)時(shí)處...
    bigdata_er閱讀 10,743評(píng)論 0 10
  • 8:30準(zhǔn)備工作 拿早點(diǎn) 開(kāi)窗 開(kāi)燈 煲水 杯子 點(diǎn)心桌 輕音樂(lè) 8:45接孩子 提醒上廁所、洗手、擦手、拿杯子喝...
    小藕家閱讀 513評(píng)論 0 0
  • 看了成為作家這本書(shū) ,里面呢是介紹要用什么樣的心態(tài)去面對(duì)寫(xiě)作,還有一些如何應(yīng)對(duì)自己在寫(xiě)作中遇到的問(wèn)題。 想要成為一...
    選妃閱讀 275評(píng)論 2 1

友情鏈接更多精彩內(nèi)容