Flink學(xué)習(xí)筆記之七AsycIO

1.什么是異步IO

,當(dāng)請(qǐng)求外部系統(tǒng)或者耗時(shí)操作,需要異步IO


屏幕快照 2019-03-21 下午3.01.01.png

2.AsyncDataStream

屏幕快照 2019-03-21 下午3.02.27.png
private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
private static final long serialVersionUID = 2098635244857937717L;

private transient ExecutorService executorService;

/**
 * The result of multiplying sleepFactor with a random float is used to pause
 * the working thread in the thread pool, simulating a time consuming async operation.
 */
private final long sleepFactor;

/**
 * The ratio to generate an exception to simulate an async error. For example, the error
 * may be a TimeoutException while visiting HBase.
 */
private final float failRatio;

private final long shutdownWaitTS;

SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
    this.sleepFactor = sleepFactor;
    this.failRatio = failRatio;
    this.shutdownWaitTS = shutdownWaitTS;
}

@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);

    executorService = Executors.newFixedThreadPool(30);
}

@Override
public void close() throws Exception {
    super.close();
    ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
}

@Override
public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {
    // 沒(méi)有連接數(shù)據(jù)庫(kù)查詢,所以模擬異步操作
    executorService.submit(() -> {
        // wait for while to simulate async operation here
        long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor);
        try {
            Thread.sleep(sleep);

            if (ThreadLocalRandom.current().nextFloat() < failRatio) {
                resultFuture.completeExceptionally(new Exception("wahahahaha..."));
            } else {
                resultFuture.complete(
                        Collections.singletonList("key-" + (input % 10)));
            }
        } catch (InterruptedException e) {
            resultFuture.complete(new ArrayList<>(0));
        }
    });
}
}

3.實(shí)現(xiàn)原理

屏幕快照 2019-03-21 下午3.04.41.png

有序


屏幕快照 2019-03-21 下午3.05.07.png

processing time無(wú)序


屏幕快照 2019-03-21 下午3.05.15.png

event time無(wú)序
屏幕快照 2019-03-21 下午3.05.25.png

4.快照恢復(fù)

屏幕快照 2019-03-21 下午3.07.38.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容