如何理解 Flink 中的 算子(operator)與鏈接(chain)?

Operators

Operator 可翻譯成算子,即:將一個或多個數(shù)據(jù)流轉(zhuǎn)換成一個新的數(shù)據(jù)流的計算過程。用戶可以將多個算子組合使用來實現(xiàn)復雜數(shù)據(jù)流的轉(zhuǎn)換邏輯。

常見 Operators

官方支持的數(shù)據(jù)流轉(zhuǎn)換類型文檔

Map

DataStream -> DataStream
接受一個元素,然后生成一個元素。下面的代碼將源數(shù)據(jù)數(shù)值加倍生成一個新數(shù)據(jù):

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

Filter

DataStream -> DataStream
用一個布爾型的函數(shù)來評估數(shù)據(jù)流中的每個元素,如果評估結(jié)果為真則保留,否則丟棄。下面的代碼過濾出數(shù)值為0的元素:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

KeyBy

DataStream → KeyedStream
邏輯上將一個數(shù)據(jù)流拆成幾個互不相交的分區(qū)。擁有相同 key 的記錄被分配到同個分區(qū)內(nèi)。內(nèi)部通過哈希分區(qū)的方式實現(xiàn)。區(qū)分 key 的方式有多種。下面的代碼返回一個 KeyedStream,這個 KeyedStream 可以在將來某個場景提供 keyed state 屬性接口。

dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple

注意:以下類型不能被當成 key

  • 本身是 POJO 類型但沒有重寫 hashCode() 方法,并且依賴 Object.hashCode() 實現(xiàn)。
  • 是一個包含任意類型的數(shù)組

Aggregations

KeyedStream → DataStream
在 keyed data stream 上進行聚合操作。其中 minminBy 的區(qū)別是,前者返回具體的值,后者返回該元素。如:

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

通過上面介紹,想必對 Operators 有了一定了解,就是 Flink 實現(xiàn)了的一系列轉(zhuǎn)換數(shù)據(jù)的接口,各接口接收的數(shù)據(jù)源類型不同,處理邏輯不同,產(chǎn)出的數(shù)據(jù)類型也不同,但都能在數(shù)據(jù)源上執(zhí)行一定處理邏輯。
接下來聊一聊 Chaining。

Task chaining 和資源組

在 task 執(zhí)行過程中,連續(xù)執(zhí)行的幾個算子往往會隨機分配到不同的線程處理,這增加了線程間交換與緩沖的開銷,通過調(diào)用鏈接接口,可以把連續(xù)的算子強行安排到同一個線程上處理以提高 task 的執(zhí)行性能。默認情況下,F(xiàn)link 會盡可能將多個算子連接起來(如兩個連續(xù)的 map 轉(zhuǎn)換)。

當然,F(xiàn)link 還提供許多細粒度的鏈接控制 API,需要注意的是,調(diào)用這些 API 時必須緊跟在某個 Operator 之后,而不能直接作用于一個數(shù)據(jù)流,原因是這些 API 都依賴于之前的轉(zhuǎn)換 Operator,例如:

  • someStream.map(...).startNewChain():是允許的,可以開啟一個新的鏈
  • someStream.startNewChain():是不允許的,該 API 未跟在某個 Operator 后面

注意:用戶可以通過調(diào)用接口 StreamExecutionEnvironment.disableOperatorChaining() 來禁止整個 job 的鏈接操作。

Flink 中的 resource group 其實就是一個 slot,是整個集群的最小調(diào)度單位,屬于 TaskManagers,每個 TaskManager 所擁有的 slot 數(shù)默認為1,在集群啟動時,可以通過改變配置 taskmanager.numberOfTaskSlots 來增加,slot 越多,意味著該 TaskManager 能夠同時處理的 task 越多。

通過調(diào)用不同的鏈接接口,我們可以把不同的算子隔離分配到不同的 slots 中:

開啟新鏈

接口:startNewChain()
用例:someStream.filter(...).map(...).startNewChain().map(...);
解釋:開啟一個新的鏈,將接口前后的算子分派到一個獨立的 slot 上,這不包括 filter 這個算子,因為他未與 startNewChain()直接相連。

關(guān)閉鏈接

接口:disableChaining()
用例:someStream.map(...).disableChaining();
解釋:由于 Flink 會盡可能將多個 Operator 鏈接起來,即分配到同個 slot 上處理,如果你想關(guān)閉這個機制,除了前面提到的調(diào)用StreamExecutionEnvironment.disableOperatorChaining()關(guān)閉整個 job 的鏈接機制之外,還可以在該算子之后調(diào)用接口disableChaining()來僅取消鏈接這個算子。

設(shè)置 slot sharing group

接口:slotSharingGroup()
用例:someStream.filter(...).slotSharingGroup("name");
解釋:在 Operator 后調(diào)用此接口,可該 Operator 進行分組,同分組內(nèi)的 Operator 執(zhí)行時會被 Flink 安排到同一個 slot 中,非本分組內(nèi)的其他 Operators 將會被分配到其他 slots 中。默認的 slot sharing group 叫“deafult”。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容