1.為什么需要異步IO
flink在做實(shí)時(shí)處理時(shí),有時(shí)候需要和外部數(shù)據(jù)交互,但是通常情況下這個(gè)交互過(guò)程是同步的,這樣就會(huì)產(chǎn)生大量的等待時(shí)間;而異步操作可以在單個(gè)函數(shù)實(shí)例中同時(shí)處理多個(gè)請(qǐng)求,并且同時(shí)接收相應(yīng)。這樣等待時(shí)間就平均分?jǐn)偟搅硕鄠€(gè)請(qǐng)求上,大大減少了請(qǐng)求的等待時(shí)長(zhǎng),可以提高實(shí)時(shí)處理的吞吐量。

2.使用flink異步IO的先決條件
- 需要所連接的數(shù)據(jù)庫(kù)支持異步客戶(hù)端
- 在沒(méi)有異步客戶(hù)端的情況下,可以通過(guò)創(chuàng)建多個(gè)客戶(hù)端并使用線(xiàn)程池處理同步調(diào)用來(lái)嘗試將同步客戶(hù)端轉(zhuǎn)變?yōu)橛邢薜牟l(fā)客戶(hù)端
3. flink異步IO的使用步驟
- 實(shí)現(xiàn)AsyncFunction接口;
- 一個(gè)回調(diào),該函數(shù)取回操作的結(jié)果,然后將結(jié)果傳遞給ResultFuture;
- 在DataStream上應(yīng)用異步IO操作。
4. 使用示例
import scala.concurrent._
import ExecutionContext.Implicits.global
/**
* 使用scala并發(fā)包的Future模擬一個(gè)異步客戶(hù)端
*/
class DatabaseClient {
def query: Future[Long] = Future {
System.currentTimeMillis() / 1000
}
}
/** 'AsyncFunction' 的一個(gè)實(shí)現(xiàn),向數(shù)據(jù)庫(kù)發(fā)送異步請(qǐng)求并設(shè)置回調(diào)
* 改編自官網(wǎng)實(shí)例
*/
class AsyncDatabaseRequest extends AsyncFunction[Int, (Int, Long)] {
/** The database specific client that can issue concurrent requests with callbacks */
lazy val client: DatabaseClient = new DatabaseClient
/** The context used for the future callbacks */
implicit lazy val executor: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(str: Int,
resultFuture: ResultFuture[(Int, Long)]): Unit = {
// issue the asynchronous request, receive a future for the result
val resultFutureRequested: Future[Long] = client.query
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
resultFutureRequested.onSuccess {
case result: Long => resultFuture.complete(Iterable((str, result)))
}
}
}
object AsynchronousIOTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data: immutable.Seq[Int] = Range(1, 10)
// 創(chuàng)建數(shù)據(jù)流
val dataStream: DataStream[Int] = env.fromCollection(data)
// 使用異步IO
val asyn = AsyncDataStream.unorderedWait(
dataStream,//執(zhí)行異步操作的DataStream
new AsyncDatabaseRequest,//
1000, TimeUnit.MILLISECONDS, //超時(shí)時(shí)間
100 // 進(jìn)行中的異步請(qǐng)求的最大數(shù)量
)
asyn.print()
env.execute("AsynchronousIOTest")
}
}
結(jié)果順序
AsyncDataStream 有兩個(gè)靜態(tài)方法,orderedWait 和 unorderedWait,對(duì)應(yīng)了兩種輸出模式:有序和無(wú)序。
- 有序:消息的發(fā)送順序與接受到的順序相同(包括 watermark ),也就是先進(jìn)先出。
- 無(wú)序:
在 ProcessingTime 的情況下,完全無(wú)序,先返回的結(jié)果先發(fā)送。
在 EventTime 的情況下,watermark 不能超越消息,消息也不能超越 watermark,也就是說(shuō) watermark 定義的順序的邊界。在兩個(gè) watermark 之間的消息的發(fā)送是無(wú)序的,但是在watermark之后的消息不能先于該watermark之前的消息發(fā)送。
超時(shí)處理
當(dāng)異步IO請(qǐng)求超時(shí)時(shí),默認(rèn)情況下會(huì)引發(fā)異常并重新啟動(dòng)作業(yè)。如果要處理超時(shí),可以重寫(xiě)AsyncFunction#timeout方法。
override def timeout(input: Int,
resultFuture: ResultFuture[(Int, Long)]): Unit =
super.timeout(input, resultFuture)
容錯(cuò)保證
異步IO運(yùn)算符提供完全一次的容錯(cuò)保證。它在檢查點(diǎn)中存儲(chǔ)正在進(jìn)行的異步請(qǐng)求的記錄,并在從故障中恢復(fù)時(shí)恢復(fù)/重新觸發(fā)請(qǐng)求。
其他資料
關(guān)于flink異步IO的更多信息可以參考flink官網(wǎng)或者Flink 原理與實(shí)現(xiàn):Aysnc I/O這篇文章。
Futures和事件驅(qū)動(dòng)編程的知識(shí)可以參考《AKKA入門(mén)與實(shí)踐》這本書(shū)第二章的內(nèi)容:Actor與并發(fā)。
參考資料:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/asyncio.html
http://wuchong.me/blog/2017/05/17/flink-internals-async-io/
《AKKA入門(mén)與實(shí)踐》