Flink Checkpoint 和 Large State 調(diào)優(yōu)

Overview

為了使 Flink 應(yīng)用程序能夠可靠地大規(guī)模運(yùn)行,必須滿足兩個(gè)條件:

  • 應(yīng)用程序需要能夠可靠地獲取 Checkpoint

  • 在發(fā)生故障后,需要足夠的資源追上(catch up)輸入數(shù)據(jù)流

監(jiān)控 State 和 Checkpoint

監(jiān)控 Checkpoint 行為的最簡(jiǎn)單方法是通過 WebUI 界面。有兩個(gè) Checkpoint Metric 最值得關(guān)注的是:

  • 當(dāng)觸發(fā) checkpoint 的時(shí)間一直很高時(shí),Operator 收到第一個(gè) checkpoint barrier 的時(shí)間一直很高,這意味著 checkpoint barriers 需要很長(zhǎng)時(shí)間才能從 Source 到 Operator。這通常表明系統(tǒng)在恒定背壓(backpressure)下工作。
  • 對(duì)齊持續(xù)時(shí)間。在 Exactly-once 語義下,有多個(gè)輸入的 Operator,已經(jīng)接收到 barrier 的通道將被阻止接收進(jìn)一步的數(shù)據(jù),直到所有剩余的通道趕上并接收到它們的 barrier 的持續(xù)時(shí)間。

理想情況下,這兩個(gè)值都應(yīng)該是低值,持續(xù)出現(xiàn)較高的值意味著 checkpoint barrier 在 job graph 中緩慢移動(dòng),通常是由于 backpressure 存在(沒有足夠的資源來處理記錄)。也可以通過增加處理記錄的端到端延遲來觀察。

調(diào)整 Checkpoint

應(yīng)用程序可以配置固定時(shí)間間隔觸發(fā) checkpoint。當(dāng)一個(gè) checkpoint 的完成時(shí)間長(zhǎng)于固定間隔時(shí),在進(jìn)行中的 checkpoint 完成之前不會(huì)觸發(fā)下一個(gè)(默認(rèn)情況下,下一個(gè) checkpoint 將在正在進(jìn)行的 checkpoint 完成后立即觸發(fā))。

當(dāng) checkpoint 結(jié)束的時(shí)間經(jīng)常超過固定間隔時(shí),系統(tǒng)會(huì)不斷地觸發(fā) checkpoint(完成后立即啟動(dòng)新)。這可能意味著在兩個(gè) checkpoint 之間,Operator 處理進(jìn)展過少,并且 checkpoint 占用了過多的資源。此行為對(duì)使用異步 checkpoint 的流應(yīng)用程序的影響較小,但仍可能對(duì)整體應(yīng)用程序性能產(chǎn)生影響。

為了防止這種情況,應(yīng)用程序可以定義一個(gè) checkpoint 的最小間隔(在最新 checkpoint 結(jié)束和下一個(gè) checkpoint 開始前必須經(jīng)過的最小時(shí)間間隔。):

StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

下圖說明了這是如何影響 checkpoint 的,避免了 checkpoint 持續(xù)不斷的進(jìn)行。

可以配置應(yīng)用程序允許同時(shí)進(jìn)行多個(gè) checkpoint。當(dāng)手動(dòng)觸發(fā) savepoint 時(shí),可能與正在進(jìn)行的 checkpoint 同時(shí)進(jìn)行。

調(diào)整 RocksDB

許多大規(guī)模 Flink 流計(jì)算應(yīng)用程序的 State 存儲(chǔ)使用的是 RocksDB state Backend。擴(kuò)展性遠(yuǎn)遠(yuǎn)超過主內(nèi)存,并可靠地存儲(chǔ)大的 keyed state。

RocksDB 的性能會(huì)因配置而異,下面介紹一些使用 RocksDB state Backend 的最佳實(shí)踐。

增量 Checkpoint

在減少 checkpoint 所需時(shí)間方面,開啟增量 checkpoint 應(yīng)該是首要考慮因素之一。與完全 checkpoint 相比,增量 checkpoint 可以顯著減少時(shí)間,因?yàn)橹挥涗浥c前一次完成的 checkpoint 相比所做的更改。

Timer 存儲(chǔ)選擇

定時(shí)器(Timer)默人存儲(chǔ)在 RocksDB 中,當(dāng) Job 只有很少的 Timer 時(shí),放在堆上存儲(chǔ)可以提高性能。

請(qǐng)小心使用此功能,因?yàn)榛诙训?Timer 可能會(huì)增加 checkpoint 時(shí)間,并且無法在內(nèi)存之外擴(kuò)展。

調(diào)整 RocksDB 內(nèi)存

RocksDB State Backend 的性能在很大程度上取決于其可用的內(nèi)存量。為了提高性能,增加內(nèi)存會(huì)有很大幫助,或者調(diào)整內(nèi)存使用。

默認(rèn)情況,RocksDB State Backend 使用 Flink 托管內(nèi)存用于 RocksDBs buffer 和 cache(state.backend.rocksdb.memory.managed: true)。 要調(diào)整與內(nèi)存相關(guān)的性能問題,以下步驟可能會(huì)有所幫助:

  • 增加托管內(nèi)存的大小,這通常會(huì)改善很多情況,并且不會(huì)增加調(diào)優(yōu) RocksDB 底層配置的復(fù)雜性。

    特別是對(duì)于大 Container/進(jìn)程大小,除非應(yīng)用程序邏輯本身需要大量 JVM 堆內(nèi)存,否則總內(nèi)存中的大部分通常都可以放到 RocksDB 使用(默認(rèn)的托管內(nèi)存比例 0.4 是保守的)。

  • RocksDB 中 write buffer 的數(shù)量取決于應(yīng)用程序中的 State 數(shù)量。每個(gè) State 對(duì)應(yīng)一個(gè) ColumnFamily(需要獨(dú)立的 write buffer)。因此,具有大量 State 的應(yīng)用程序通常需要更多內(nèi)存才能獲得相同的性能。

  • 通過設(shè)置 state.backend.RocksDB.memory.managed:false,可以嘗試比較 RocksDB with managed memory 和 RocksDB with per column family memory 的性能。

    不使用托管內(nèi)存意味著 RocksDB 按照應(yīng)用程序中的 State 數(shù)量按比例分配內(nèi)存。

  • 如果應(yīng)用程序有大量狀態(tài),并且頻繁的 MemTable 刷新(寫入端瓶頸),如果不能提供更多內(nèi)存,那么可以增加進(jìn)入寫入緩沖區(qū)的內(nèi)存比率(state.backend.rocksdb.memory.write buffer ratio)。

  • 一個(gè)高級(jí)選項(xiàng)(面向 RocksDB 專家)可以減少具有許多狀態(tài)的設(shè)置中的 MemTable 刷新次數(shù),是通過 RocksDBOptionsFactory 調(diào)整 RocksDB 的 Columnfamily 設(shè)置

public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {

    @Override
    public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        // 當(dāng)一個(gè) Operator 中有多個(gè)狀態(tài)時(shí),增加后臺(tái)最大刷新線程數(shù)
        // 這意味著在一個(gè) RocksDB 實(shí)例中會(huì)有多個(gè) Columnfamily
        return currentOptions.setMaxBackgroundFlushes(4);
    }

    @Override
    public ColumnFamilyOptions createColumnOptions(
        ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        // 將 arena 塊大小從默認(rèn)的8MB減少到1MB。
        return currentOptions.setArenaBlockSize(1024 * 1024);
    }

    @Override
    public OptionsFactory configure(Configuration configuration) {
        return this;
    }
}

容量規(guī)劃

本節(jié)討論如何決定一個(gè) Flink 作業(yè)應(yīng)該使用多少資源才能可靠地運(yùn)行。容量規(guī)劃的基本經(jīng)驗(yàn)法則是:

  • 正常操作應(yīng)具有足夠的容量,以避免在恒定背壓下操作。
  • 在常規(guī)無背壓運(yùn)行程序所需的資源之上提供一些額外的資源。用來在應(yīng)用程序恢復(fù)時(shí)快速處理恢復(fù)期間積累的輸入數(shù)據(jù),這取決于恢復(fù)操作通常需要多長(zhǎng)時(shí)間(取決于故障轉(zhuǎn)移時(shí)需要加載到新 TaskManager 中的狀態(tài)的大?。┮约耙蠊收匣謴?fù)的速度。
  • 暫時(shí)的背壓通常是可以接受的,在負(fù)載峰值期間、Catchup 階段或外部系統(tǒng)出現(xiàn)臨時(shí)響應(yīng)慢時(shí)。
  • 某些操作(如大型窗口)會(huì)導(dǎo)致其下游操作符的負(fù)載存在毛刺(spiky):在構(gòu)建窗口時(shí),下游 Operator 可能是空閑的,在發(fā)出窗口數(shù)據(jù)時(shí),下游才開始工作。下游并行性的規(guī)劃需要考慮窗口發(fā)出的量以及處理這種峰值的速度。

壓縮

Flink 為所有 checkpoint 和 savepoint 提供可選的壓縮(默認(rèn)值:off)。目前,壓縮總是使用 snappy compression algorithm(version 1.1.4) 但計(jì)劃在未來支持自定義壓縮算法。壓縮的粒度是 keyed state 的 key-group,每個(gè) key-group 可以單獨(dú)壓縮,這對(duì)于縮放程序非常重要。

壓縮可以通過 ExecutionConfig 開啟

ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setUseSnapshotCompression(true);

壓縮選項(xiàng)對(duì)增量快照(RocksDB)沒有影響。

任務(wù)本地恢復(fù)

Motivation

在 Flink 的 checkpoint 中,每個(gè) Task 都會(huì)生成一個(gè) State snapshot,然后將其寫入分布式存儲(chǔ)。每個(gè) Task 通過發(fā)送一個(gè)描述 State 在分布式存儲(chǔ)中的位置的句柄來確認(rèn) State 成功寫入 JobManager。JobManager 依次從所有 Task 收集句柄,并將綁定到到 checkpoint 對(duì)象中。

在恢復(fù)的情況下,JobManager 打開最新的 checkpoint 對(duì)象并將句柄發(fā)送回相應(yīng)的 Task,然后這些 Task 可以從分布式存儲(chǔ)中恢復(fù) State。使用分布式存儲(chǔ)來存儲(chǔ) State 有兩個(gè)重要的優(yōu)點(diǎn)。首先,存儲(chǔ)是容錯(cuò)的,其次,分布式存儲(chǔ)中的所有 State 對(duì)所有節(jié)點(diǎn)都是可訪問的,并且可以很容易地重新分配(例如,用于重新縮放)。

然而,使用遠(yuǎn)程分布式存儲(chǔ)也有一個(gè)很大的缺點(diǎn):所有 Task 都必須通過網(wǎng)絡(luò)從遠(yuǎn)程位置讀取其狀態(tài)。在一些情況下,恢復(fù)可以將 Task 重新安排到與上一次運(yùn)行相同的 TaskManager 中,但仍然要讀取遠(yuǎn)程狀態(tài)。這可能會(huì)導(dǎo)致大狀態(tài)的恢復(fù)時(shí)間長(zhǎng)。

Approach

任務(wù)本地 State 恢復(fù)是針對(duì)這一類問題,主要思想如下:對(duì)于每個(gè) checkpoint,每個(gè) Task 不僅將 State snapshot 寫入分布式存儲(chǔ),而且還將 state snapshot 的輔助副本保存在該 Task 所在的本地存儲(chǔ)中(例如,本地磁盤或內(nèi)存中)。State 的主存儲(chǔ)必須仍然是分布式存儲(chǔ),因?yàn)楸镜卮鎯?chǔ)不能確保節(jié)點(diǎn)故障下的持久性,也不能為其他節(jié)點(diǎn)提供重新分發(fā) State 的訪問。

對(duì)于每個(gè)可以重新安排到上一個(gè)位置進(jìn)行恢復(fù)的 Task,可以從本地輔助副本恢復(fù) State,并避免遠(yuǎn)程讀取的開銷??紤]到許多故障不是節(jié)點(diǎn)故障,節(jié)點(diǎn)故障通常一次只影響一個(gè)或極少數(shù)節(jié)點(diǎn),在恢復(fù)過程中,大多數(shù) Task 很可能返回到其以前的位置,并發(fā)現(xiàn)其本地 State 完好無損,可以有效地縮短恢復(fù)時(shí)間。

需要注意的是,根據(jù)所選的 state backend 和 checkpoint 策略,在創(chuàng)建和存儲(chǔ)本地輔助副本時(shí),每個(gè) checkpoint 可能需要一些額外的成本。在大多數(shù)情況下,實(shí)現(xiàn)只需將對(duì)分布式存儲(chǔ)的寫入復(fù)制到本地文件。

image.png

主副本和輔助副本的關(guān)系

  • 對(duì)于 checkpoint,主副本必須成功并且生成輔助本地副本失敗不會(huì)使 checkpoint 失敗。如果無法創(chuàng)建主副本,即使已成功創(chuàng)建輔助副本,checkpoint 也被認(rèn)為失敗。

  • 只有主副本由 JobManager 確認(rèn)和管理。輔助副本由 TaskManager 擁有,生命周期可以獨(dú)立于主副本。例如,可以將 3 個(gè)最新 checkpoint 的歷史記錄保留為主副本,并且只保留最新 checkpoint 的本地副本。

  • 對(duì)于恢復(fù),如果有匹配的輔助副本可用,F(xiàn)link 將始終嘗試從任務(wù)本地 State 先還原。如果在從輔助副本恢復(fù)期間出現(xiàn)任何問題,F(xiàn)link 將透明地重試,從主副本恢復(fù)。僅當(dāng)主副本和(可選)輔助副本都恢復(fù)失敗時(shí),恢復(fù)才會(huì)失敗。

  • 任務(wù)本地副本可能只包含完整 State 的一部分(例如,寫入本地文件時(shí)出現(xiàn)異常)。在這種情況下,F(xiàn)link 將首先嘗試在本地恢復(fù)本地部分,無法恢復(fù)的 State 是從主副本恢復(fù)的。

  • 任務(wù)本地副本可以具有與主副本不同的格式。

  • 如果 TaskManager 丟失,則其所有任務(wù)的本地副本都將丟失。

配置任務(wù)本地恢復(fù)

任務(wù)本地恢復(fù)在默認(rèn)情況下是停用的,可以通過 Flink 的配置開啟(state.backend.local-recovery 指定為 false 或 true,還可以在 Job 上設(shè)置 CheckpointingOptions.LOCAL_RECOVERY)。

Allocation-preserving scheduling

任務(wù)本地恢復(fù)假設(shè)在失敗情況下保持分配的 Task 調(diào)度,其原理如下:每個(gè) Task 都會(huì)記住之前分配的 Slot,在恢復(fù)過程中會(huì)請(qǐng)求完全相同的 Slot 進(jìn)行重啟。如果 Slot 不可用,任務(wù)將從 Resource Manager 請(qǐng)求一個(gè)全新的 Slot。

如果一個(gè) TaskManager 不再可用,則之前分配該 TaskManager 上的 Task 必須在其他的 TaskManager 上運(yùn)行,但是不會(huì)讓其他可以在原 Slot 上恢復(fù)的 Task 改變位置。在這種策略下,會(huì)讓盡可能多的 Task 在原 Slot 上啟動(dòng),并從本地恢復(fù) State。


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容