Data Sources
源是程序讀取輸入數(shù)據(jù)的位置??梢允褂?StreamExecutionEnvironment.addSource(sourceFunction) 將源添加到程序。Flink 有許多預(yù)先實現(xiàn)的源函數(shù),也可以通過實現(xiàn) SourceFunction 方法自定義非并行源 ,或通過實現(xiàn) ParallelSourceFunction 或擴(kuò)展 RichParallelSourceFunction 自定義并行源。
有幾個預(yù)定義的流數(shù)據(jù)源可從 StreamExecutionEnvironment 訪問:
基于文件:
-
readTextFile(path)逐行讀取文本文件(文件符合TextInputFormat格式),并作為字符串返回每一行。
readFile(fileInputFormat, path)按指定的文件輸入格式(fileInputFormat)讀取指定路徑的文件。readFile(fileInputFormat, path, watchType, interval, pathFilter)前兩個方法的內(nèi)部調(diào)用方法。根據(jù)給定文件格式(fileInputFormat)讀取指定路徑的文件。根據(jù) watchType,定期監(jiān)聽路徑下的新數(shù)據(jù)(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理當(dāng)前在路徑中的數(shù)據(jù)并退出(FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,可以進(jìn)一步排除正在處理的文件。
基于Socket:
-
socketTextStream從 Socket 讀取,元素可以用分隔符分隔。
基于集合:
fromCollection(Seq)用 Java.util.Collection 對象創(chuàng)建數(shù)據(jù)流。集合中的所有元素必須屬于同一類型。fromCollection(Iterator)用迭代器創(chuàng)建數(shù)據(jù)流。指定迭代器返回的元素的數(shù)據(jù)類型。fromElements(elements: _*)從給定的對象序列創(chuàng)建數(shù)據(jù)流。所有對象必須屬于同一類型。fromParallelCollection(SplittableIterator)并行地從迭代器創(chuàng)建數(shù)據(jù)流。指定迭代器返回的元素的數(shù)據(jù)類型。generateSequence(from, to)并行生成給定間隔的數(shù)字序列。
自定義:
-
addSource附加新的源函數(shù)。例如,要從 Apache Kafka 中讀取,可以使用addSource(new FlinkKafkaConsumer08<>(...))。請詳細(xì)查看 連接器。
DataStream Transformation
轉(zhuǎn)換函數(shù)
Map
DataStream -> DataStream,一個數(shù)據(jù)元生成一個新的數(shù)據(jù)元。
將輸入流的元素翻倍:
dataStream.map { x => x * 2 }
FlatMap
DataStream -> DataStream,一個數(shù)據(jù)元生成多個數(shù)據(jù)元(可以為0)。將句子分割為單詞:
dataStream.flatMap { str => str.split(" ") }
Filter
DataStream -> DataStream,每個數(shù)據(jù)元執(zhí)行布爾函數(shù),只保存函數(shù)返回 true 的數(shù)據(jù)元。過濾掉零值的過濾器:
dataStream.filter { _ != 0 }
KeyBy
DataStream -> KeyedStream,將流劃分為不相交的分區(qū)。具有相同 Keys 的所有記錄在同一分區(qū)。指定 key 的取值:
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce
KeyedStream -> DataStream,KeyedStream 元素滾動執(zhí)行 Reduce。將當(dāng)前數(shù)據(jù)元與最新的一個 Reduce 值組合作為新值發(fā)送。創(chuàng)建 key 的值求和:
keyedStream.reduce { _ + _ }
Fold
KeyedStream -> DataStream,具有初始值的 Reduce。將當(dāng)前數(shù)據(jù)元與最新的一個 Reduce 值組合作為新值發(fā)送。當(dāng)應(yīng)用于序列(1,2,3,4,5)時,發(fā)出序列"start-1","start-1-2","start-1-2-3", ...:
keyedStream.fold("start")((str, i) => { str + "-" + i })
Aggregations
KeyedStream -> DataStream,應(yīng)用于 KeyedStream 上的滾動聚合。min 和 minBy 的區(qū)別是是 min 返回最小值,minBy 具有最小值的數(shù)據(jù)元(max 和 maxBy 同理):
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Window
KeyedStream -> WindowedStream,Windows 可以在已經(jīng)分區(qū)的 KeyedStream 上定義。Windows 根據(jù)某些特征(例如,在最近5秒內(nèi)到達(dá)的數(shù)據(jù))對每個Keys中的數(shù)據(jù)進(jìn)行分組。更多說明參考 Windows 或 譯版。
// Last 5 seconds of data
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
WindowAll
DataStream -> AllWindowedStream,Windows 也可以在 DataStream 上定義。在許多情況下,這是非并行轉(zhuǎn)換。所有記錄將收集在 windowAll 算子的一個任務(wù)中。
// Last 5 seconds of data
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
Window Apply
WindowedStream -> DataStream 或 AllWindowedStream -> DataStream,將函數(shù)應(yīng)用于整個窗口。一個對窗口數(shù)據(jù)求和:
windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
Window Reduce
WindowedStream -> DataStream,Reduce 函數(shù)應(yīng)用于窗口并返回結(jié)果值。
windowedStream.reduce { _ + _ }
Window Fold
WindowedStream -> DataStream,F(xiàn)old 函數(shù)應(yīng)用于窗口并返回結(jié)果值。當(dāng)函數(shù)應(yīng)用于窗口的序列(1,2,3,4,5)時,發(fā)送出 "start-1-2-3-4-5":
val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str + "-" + i })
Aggregations on windows
WindowedStream -> DataStream,聚合窗口的內(nèi)容:
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
Union
DataStream* -> DataStream,兩個或多個數(shù)據(jù)流的合并,創(chuàng)建包含來自所有流的所有數(shù)據(jù)元的新流。如果將數(shù)據(jù)流與自身聯(lián)合,則會在結(jié)果流中獲取兩次數(shù)據(jù)元。
dataStream.union(otherStream1, otherStream2, ...)
Window Join
DataStream,DataStream -> DataStream,Join 連接兩個流,指定 Key 和窗口。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
Window CoGroup
DataStream,DataStream -> DataStream,CoGroup 連接兩個流,指定 Key 和窗口。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
CoGroup 與 Join 的區(qū)別:
CoGroup 會輸出未匹配的數(shù)據(jù),Join 只輸出匹配的數(shù)據(jù)
Connect
DataStream,DataStream -> ConnectedStreams,連接兩個有各自類型的數(shù)據(jù)流。允許兩個流之間的狀態(tài)共享。
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
可用于數(shù)據(jù)流關(guān)聯(lián)配置流
CoMap, CoFlatMap
ConnectedStreams -> DataStream,作用域連接數(shù)據(jù)流(connected data stream)上的 map 和 flatMap:
connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
Split
DataStream -> SplitStream,將數(shù)據(jù)流拆分為兩個或更多個流。
val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
Select
SplitStream -> DataStream,從 SpliteStream 中選擇一個流或多個流。
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
Iterate
DataStream -> IterativeStream -> DataStream,將一個算子的輸出重定向到某個先前的算子,在流中創(chuàng)建 feedback 循環(huán)。這對于定義不斷更新模型的算法特別有用。以下代碼以流開頭并連續(xù)應(yīng)用迭代體。大于0的數(shù)據(jù)元將被發(fā)送回 feedback,其余數(shù)據(jù)元將向下游轉(zhuǎn)發(fā)。
initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}
Extract Timestamps
DataStream -> DataStream,從記錄中提取時間戳,以便使用事件時間窗口。
stream.assignTimestamps (new TimeStampExtractor() {...});
Project
DataStream -> DataStream,作用于元組的轉(zhuǎn)換,從元組中選擇字段的子集。
DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);
分區(qū)函數(shù)
Custom partitioning
DataStream -> DataStream,使用自定義的分區(qū)函數(shù)(Partitioner)為每個數(shù)據(jù)元選擇目標(biāo)分區(qū)和所在任務(wù)。
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
Random partitioning
DataStream -> DataStream,隨機(jī)均勻分布分配數(shù)據(jù)元。
dataStream.shuffle();
Rebalancing (Round-robin partitioning)
DataStream -> DataStream,輪詢?yōu)閿?shù)據(jù)元分區(qū),每個分區(qū)創(chuàng)建相等的負(fù)載。在存在數(shù)據(jù)偏斜時用于性能優(yōu)化。
dataStream.rebalance()
Rescaling
DataStream -> DataStream,根據(jù)上下游的分區(qū)數(shù)量,輪詢?yōu)閿?shù)據(jù)元分區(qū)。
dataStream.rescale();
建議使用
rescale替代rebalance。
例如,上游是5個并發(fā),下游是10個并發(fā)。當(dāng)使用 Rebalance 時,上游每個并發(fā)會輪詢發(fā)給下游10個并發(fā)。當(dāng)使用 Rescale 時,上游每個并發(fā)只需輪詢發(fā)給下游2個并發(fā),能提高網(wǎng)絡(luò)效率。
當(dāng)上游的數(shù)據(jù)比較均勻時,且上下游的并發(fā)數(shù)成比例時,可以使用 Rescale 替換 Rebalance。參數(shù):enable.rescale.shuffling=true,默認(rèn)關(guān)閉。
Broadcasting
DataStream -> DataStream,向每個分區(qū)廣播數(shù)據(jù)元。
dataStream.broadcast()
Task chaining and resource groups
Chaining 兩個后續(xù)轉(zhuǎn)換意味著將它們定位在同一個線程中以獲得更好的性能。Flink 默認(rèn)會鏈接一些算子(例如,連續(xù)兩個的 map 轉(zhuǎn)換)。API可以對鏈接進(jìn)行細(xì)粒度控制:
使用 StreamExecutionEnvironment.disableOperatorChaining() 可以禁用整個工作的算子鏈接。對于更細(xì)粒度的控制,可以使用以下函數(shù)。(這些函數(shù)只能在 DataStream 轉(zhuǎn)換后立即使用。例如,可以使用 someStream.map(...).startNewChain(),但不能使用 someStream.startNewChain())
Resource group 是 Flink 中的一個 slot。如果需要,可以在單獨的 slot 中手動隔離算子。
Start new chain
從這個算子開始,開始一個新的鏈。兩個 map 將被鏈接,filter 將不會在鏈接當(dāng)中。
someStream.filter(...).map(...).startNewChain().map(...)
Disable chaining
不要鏈接 map 算子
someStream.map(...).disableChaining()
Set slot sharing group
設(shè)置算子操作的 slot sharing。將把具有相同 slot sharing 的算子操作放入同一個 slot,同時保證其他 slot 中沒有 slot sharing 的算子操作。可用于隔離 slot。默認(rèn) slot sharing group 的名稱為"default",可以通過調(diào)用 slotSharingGroup("groupName") 將算子操作顯式放入此組中。
someStream.filter(...).slotSharingGroup("name")
Data Sinks
Data Sink 消費 DataStream 并轉(zhuǎn)發(fā)到文件,套接字,外部系統(tǒng)或打印到頁面。Flink 帶有各種內(nèi)置輸出格式,封裝在 DataStreams 上的算子操作后面:
writeAsText() / TextOutputFormat, 按字符串順序?qū)懭胛募Mㄟ^調(diào)用每個元素的toString()方法獲得字符串。writeAsCsv(...) / CsvOutputFormat,將元組寫為逗號分隔的形式寫入文件。行和字段分隔符是可配置的。每個字段的值來自對象的toString()方法。print() / printToErr(),在標(biāo)準(zhǔn)輸出/標(biāo)準(zhǔn)錯誤流上打印每個元素的toString()值。可以定義輸出前綴,這有助于區(qū)分不同的打印調(diào)用。如果并行度大于1,輸出也包含生成輸出的任務(wù)的標(biāo)識符。writeUsingOutputFormat() / FileOutputFormat,自定義文件輸出的方法和基類。支持自定義對象到字節(jié)的轉(zhuǎn)換。writeToSocket,將元素寫入 Socket,使用SerializationSchema進(jìn)行序列化。addSink,調(diào)用自定義接收器函數(shù)。請詳細(xì)查看 連接器。
DataStream 的 write*() 方法主要用于調(diào)試目的。他們沒有參與 Flink checkpoint,這意味著這些函數(shù)通常具有至少一次的語義。刷新到目標(biāo)系統(tǒng)的數(shù)據(jù)取決于 OutputFormat 的實現(xiàn),并非所有發(fā)送到 OutputFormat 的數(shù)據(jù)都會立即顯示在目標(biāo)系統(tǒng)中。此外,在失敗的情況下,這些記錄可能會丟失。
要將流可靠、準(zhǔn)確地傳送到文件系統(tǒng),請使用 flink-connector-filesystem。通過 .addSink(...) 方法的自定義實現(xiàn),可以實現(xiàn)在 checkpoint 中精確一次的語義。
Iterations
迭代流程序?qū)⒑瘮?shù)嵌入到 IterativeStream。由于 DataStream 程序可能永遠(yuǎn)不會完成,因此沒有最大迭代次數(shù)。相反,需要指定流的哪個部分反饋到迭代,哪個部分使用 split 或 filter 轉(zhuǎn)發(fā)到下游。
下面是一個示例迭代,其中主體(重復(fù)的計算部分)是一個簡單的 map 轉(zhuǎn)換,反饋的元素由使用過濾器向下游轉(zhuǎn)發(fā)的元素區(qū)分。
val iteratedStream = someDataStream.iterate(
iteration => {
val iterationBody = iteration.map(/* this is executed many times */)
(iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
})
例如,從一系列整數(shù)中連續(xù)減去1直到它們達(dá)到零的程序:
val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
val iteratedStream = someIntegers.iterate(
iteration => {
val minusOne = iteration.map( v => v - 1)
val stillGreaterThanZero = minusOne.filter (_ > 0)
val lessThanZero = minusOne.filter(_ <= 0)
(stillGreaterThanZero, lessThanZero)
}
)
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html