Operators
Operator 可翻譯成算子,即:將一個或多個數(shù)據(jù)流轉(zhuǎn)換成一個新的數(shù)據(jù)流的計算過程。用戶可以將多個算子組合使用來實現(xiàn)復雜數(shù)據(jù)流的轉(zhuǎn)換邏輯。
常見 Operators
官方支持的數(shù)據(jù)流轉(zhuǎn)換類型文檔
Map
DataStream -> DataStream
接受一個元素,然后生成一個元素。下面的代碼將源數(shù)據(jù)數(shù)值加倍生成一個新數(shù)據(jù):
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
Filter
DataStream -> DataStream
用一個布爾型的函數(shù)來評估數(shù)據(jù)流中的每個元素,如果評估結(jié)果為真則保留,否則丟棄。下面的代碼過濾出數(shù)值為0的元素:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
KeyBy
DataStream → KeyedStream
邏輯上將一個數(shù)據(jù)流拆成幾個互不相交的分區(qū)。擁有相同 key 的記錄被分配到同個分區(qū)內(nèi)。內(nèi)部通過哈希分區(qū)的方式實現(xiàn)。區(qū)分 key 的方式有多種。下面的代碼返回一個 KeyedStream,這個 KeyedStream 可以在將來某個場景提供 keyed state 屬性接口。
dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple
注意:以下類型不能被當成 key
- 本身是 POJO 類型但沒有重寫 hashCode() 方法,并且依賴 Object.hashCode() 實現(xiàn)。
- 是一個包含任意類型的數(shù)組
Aggregations
KeyedStream → DataStream
在 keyed data stream 上進行聚合操作。其中 min 與 minBy 的區(qū)別是,前者返回具體的值,后者返回該元素。如:
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
通過上面介紹,想必對 Operators 有了一定了解,就是 Flink 實現(xiàn)了的一系列轉(zhuǎn)換數(shù)據(jù)的接口,各接口接收的數(shù)據(jù)源類型不同,處理邏輯不同,產(chǎn)出的數(shù)據(jù)類型也不同,但都能在數(shù)據(jù)源上執(zhí)行一定處理邏輯。
接下來聊一聊 Chaining。
Task chaining 和資源組
在 task 執(zhí)行過程中,連續(xù)執(zhí)行的幾個算子往往會隨機分配到不同的線程處理,這增加了線程間交換與緩沖的開銷,通過調(diào)用鏈接接口,可以把連續(xù)的算子強行安排到同一個線程上處理以提高 task 的執(zhí)行性能。默認情況下,F(xiàn)link 會盡可能將多個算子連接起來(如兩個連續(xù)的 map 轉(zhuǎn)換)。
當然,F(xiàn)link 還提供許多細粒度的鏈接控制 API,需要注意的是,調(diào)用這些 API 時必須緊跟在某個 Operator 之后,而不能直接作用于一個數(shù)據(jù)流,原因是這些 API 都依賴于之前的轉(zhuǎn)換 Operator,例如:
-
someStream.map(...).startNewChain():是允許的,可以開啟一個新的鏈 -
someStream.startNewChain():是不允許的,該 API 未跟在某個 Operator 后面
注意:用戶可以通過調(diào)用接口
StreamExecutionEnvironment.disableOperatorChaining()來禁止整個 job 的鏈接操作。
Flink 中的 resource group 其實就是一個 slot,是整個集群的最小調(diào)度單位,屬于 TaskManagers,每個 TaskManager 所擁有的 slot 數(shù)默認為1,在集群啟動時,可以通過改變配置 taskmanager.numberOfTaskSlots 來增加,slot 越多,意味著該 TaskManager 能夠同時處理的 task 越多。
通過調(diào)用不同的鏈接接口,我們可以把不同的算子隔離分配到不同的 slots 中:
開啟新鏈
接口:startNewChain()
用例:someStream.filter(...).map(...).startNewChain().map(...);
解釋:開啟一個新的鏈,將接口前后的算子分派到一個獨立的 slot 上,這不包括 filter 這個算子,因為他未與 startNewChain()直接相連。
關(guān)閉鏈接
接口:disableChaining()
用例:someStream.map(...).disableChaining();
解釋:由于 Flink 會盡可能將多個 Operator 鏈接起來,即分配到同個 slot 上處理,如果你想關(guān)閉這個機制,除了前面提到的調(diào)用StreamExecutionEnvironment.disableOperatorChaining()關(guān)閉整個 job 的鏈接機制之外,還可以在該算子之后調(diào)用接口disableChaining()來僅取消鏈接這個算子。
設(shè)置 slot sharing group
接口:slotSharingGroup()
用例:someStream.filter(...).slotSharingGroup("name");
解釋:在 Operator 后調(diào)用此接口,可該 Operator 進行分組,同分組內(nèi)的 Operator 執(zhí)行時會被 Flink 安排到同一個 slot 中,非本分組內(nèi)的其他 Operators 將會被分配到其他 slots 中。默認的 slot sharing group 叫“deafult”。