[TOC]
Flink的Transformation轉(zhuǎn)換主要包括四種:單數(shù)據(jù)流基本轉(zhuǎn)換、基于Key的分組轉(zhuǎn)換、多數(shù)據(jù)流轉(zhuǎn)換和數(shù)據(jù)重分布轉(zhuǎn)換。本文主要介紹基于Key的分組轉(zhuǎn)換,
數(shù)據(jù)類型的轉(zhuǎn)化
對數(shù)據(jù)分組主要是為了進行后續(xù)的聚合操作,即對同組數(shù)據(jù)進行聚合分析。keyBy會將一個DataStream轉(zhuǎn)化為一個KeyedStream,聚合操作會將KeyedStream轉(zhuǎn)化為DataStream。如果聚合前每個元素數(shù)據(jù)類型是T,聚合后的數(shù)據(jù)類型仍為T。

keyBy
絕大多數(shù)情況,我們要根據(jù)事件的某種屬性或數(shù)據(jù)的某個字段進行分組,對一個分組內(nèi)的數(shù)據(jù)進行處理。如下圖所示,keyBy算子根據(jù)元素的形狀對數(shù)據(jù)進行分組,相同形狀的元素被分到了一起,可被后續(xù)算子統(tǒng)一處理。比如,多支股票數(shù)據(jù)流處理時,可以根據(jù)股票代號進行分組,然后對同一股票代號的數(shù)據(jù)統(tǒng)計其價格變動。又如,電商用戶行為日志把所有用戶的行為都記錄了下來,如果要分析某一個用戶行為,需要先按用戶ID進行分組。

keyBy算子將DataStream轉(zhuǎn)換成一個KeyedStream。KeyedStream是一種特殊的DataStream,事實上,KeyedStream繼承了DataStream,DataStream的各元素隨機分布在各Task Slot中,KeyedStream的各元素按照Key分組,分配到各Task Slot中。我們需要向keyBy算子傳遞一個參數(shù),以告知Flink以什么字段作為Key進行分組。
我們可以使用數(shù)字位置來指定Key:
val dataStream: DataStream[(Int, Double)] = senv.fromElements((1, 1.0), (2, 3.2), (1, 5.5), (3, 10.0), (3, 12.5))
// 使用數(shù)字位置定義Key 按照第一個字段進行分組
val keyedStream = dataStream.keyBy(0)
也可以使用字段名來指定Key,比如StockPrice里的股票代號symbol:
val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream.keyBy(_.symbol)
一旦按照Key分組后,我們后續(xù)可以按照Key進行時間窗口的處理和狀態(tài)的創(chuàng)建和更新。數(shù)據(jù)流里包含相同Key的數(shù)據(jù)都可以訪問和修改相同的狀態(tài)
aggregation
常見的聚合操作有sum、max、min等,這些聚合操作統(tǒng)稱為aggregation。aggregation需要一個參數(shù)來指定按照哪個字段進行聚合。跟keyBy相似,我們可以使用數(shù)字位置來指定對哪個字段進行聚合,也可以使用字段名。
與批處理不同,這些聚合函數(shù)是對流數(shù)據(jù)進行數(shù)據(jù),流數(shù)據(jù)是依次進入Flink的,聚合操作是對之前流入的數(shù)據(jù)進行統(tǒng)計聚合。sum算子的功能對該字段進行加和,并將結(jié)果保存在該字段上。min操作無法確定其他字段的數(shù)值。
val tupleStream = senv.fromElements(
(0, 0, 0), (0, 1, 1), (0, 2, 2),
(1, 0, 6), (1, 1, 7), (1, 2, 8)
)
// 按第一個字段分組,對第二個字段求和,打印出來的結(jié)果如下:
// (0,0,0)
// (0,1,0)
// (0,3,0)
// (1,0,6)
// (1,1,6)
// (1,3,6)
val sumStream = tupleStream.keyBy(0).sum(1).print()
max算子對該字段求最大值,并將結(jié)果保存在該字段上。對于其他字段,該操作并不能保證其數(shù)值。
// 按第一個字段分組,對第三個字段求最大值max,打印出來的結(jié)果如下:
// (0,0,0)
// (0,0,1)
// (0,0,2)
// (1,0,6)
// (1,0,7)
// (1,0,8)
val maxStream = tupleStream.keyBy(0).max(2).print()
maxBy算子對該字段求最大值,maxBy與max的區(qū)別在于,maxBy同時保留其他字段的數(shù)值,即maxBy可以得到數(shù)據(jù)流中最大的元素。
// 按第一個字段分組,對第三個字段求最大值maxBy,打印出來的結(jié)果如下:
// (0,0,0)
// (0,1,1)
// (0,2,2)
// (1,0,6)
// (1,1,7)
// (1,2,8)
val maxByStream = tupleStream.keyBy(0).maxBy(2).print()
同樣,min和minBy的區(qū)別在于,min算子對某字段求最小值,minBy返回具有最小值的元素。
其實,這些aggregation操作里已經(jīng)封裝了狀態(tài)數(shù)據(jù),比如,sum算子內(nèi)部記錄了當(dāng)前的和,max算子內(nèi)部記錄了當(dāng)前的最大值。由于內(nèi)部封裝了狀態(tài)數(shù)據(jù),而且狀態(tài)數(shù)據(jù)并不會被清理,因此一定要避免在一個無限數(shù)據(jù)流上使用aggregation。