1 你將學(xué)到
◆ DataSet API開發(fā)概述
◆ 計數(shù)器
◆ DataSource
◆ 分布式緩存
◆ Transformation
◆ Sink
2 Data Set API 簡介
Flink中的DataSet程序是實現(xiàn)數(shù)據(jù)集轉(zhuǎn)換(例如,過濾,映射,連接,分組)的常規(guī)程序.
最初從某些Source源創(chuàng)建數(shù)據(jù)集(例如,通過讀取文件或從本地集合創(chuàng)建)
結(jié)果通過sink返回,接收器可以例如將數(shù)據(jù)寫入(分布式)文件或標(biāo)準(zhǔn)輸出(例如命令行終端)

Flink程序可以在各種環(huán)境中運行,單機運行或嵌入其他程序中
執(zhí)行可以在本地JVM中執(zhí)行,也可以在集群機器上執(zhí)行.
- 有關(guān)Flink API基本概念的介紹,請參閱本系列的上一篇
為了創(chuàng)建自己的Flink DataSet程序,鼓勵從Flink程序的解剖開始,逐步添加自己的轉(zhuǎn)換!
3 測試環(huán)境


4 Data Sources簡介
數(shù)據(jù)源創(chuàng)建初始數(shù)據(jù)集,例如來自文件或Java集合。創(chuàng)建數(shù)據(jù)集的一般機制是在InputFormat后面抽象的
Flink附帶了幾種內(nèi)置格式,可以從通用文件格式創(chuàng)建數(shù)據(jù)集。其中許多都在ExecutionEnvironment上有快捷方法。
4.1 基于文件
- readTextFile(path)/ TextInputFormat
按行讀取文件并將它們作為字符串返回 - readTextFileWithValue(path)/ TextValueInputFormat
按行讀取文件并將它們作為StringValues返回。 StringValues是可變字符串 - readCsvFile(path)/ CsvInputFormat
解析逗號(或其他字符)分隔字段的文件。返回元組,案例類對象或POJO的DataSet。支持基本的java類型及其Value對應(yīng)的字段類型 - readFileOfPrimitives(path,delimiter)/ PrimitiveInputFormat
使用給定的分隔符解析新行(或其他char序列)分隔的原始數(shù)據(jù)類型(如String或Integer)的文件 - readSequenceFile(Key,Value,path)/ SequenceFileInputFormat
創(chuàng)建JobConf并從類型為SequenceFileInputFormat,Key class和Value類的指定路徑中讀取文件,并將它們作為Tuple2 <Key,Value>返回。
4.2 基于集合
- fromCollection(Iterable) - 從Iterable創(chuàng)建數(shù)據(jù)集。 Iterable返回的所有元素必須屬于同一類型
- fromCollection(Iterator) - 從迭代器創(chuàng)建數(shù)據(jù)集。該類指定迭代器返回的元素的數(shù)據(jù)類型
- fromElements(elements:_ *) - 根據(jù)給定的對象序列創(chuàng)建數(shù)據(jù)集。所有對象必須屬于同一類型
- fromParallelCollection(SplittableIterator) - 并行地從迭代器創(chuàng)建數(shù)據(jù)集。該類指定迭代器返回的元素的數(shù)據(jù)類型
- generateSequence(from,to) - 并行生成給定時間間隔內(nèi)的數(shù)字序列。
4.3 通用
- readFile(inputFormat,path)/ FileInputFormat
接受文件輸入格式 -
createInput(inputFormat)/ InputFormat
接受通用輸入格式5 從集合創(chuàng)建DataSet5.1 Scala實現(xiàn)
image
5.2 Java實現(xiàn)

6 從文件/文件夾創(chuàng)建DataSet
6.1 Scala實現(xiàn)
文件


文件夾


Java實現(xiàn)


7 從csv文件創(chuàng)建Dataset

7.1 Scala實現(xiàn)
-
注意忽略第一行
image

-
includedFields參數(shù)使用image
-
定義一個POJOimage8 從遞歸文件夾的內(nèi)容創(chuàng)建DataSetimage8.1 Scala實現(xiàn)imageimage
9從壓縮文件中創(chuàng)建DataSet
Flink目前支持輸入文件的透明解壓縮,如果它們標(biāo)有適當(dāng)?shù)奈募U展名。 特別是,這意味著不需要進一步配置輸入格式,并且任何FileInputFormat都支持壓縮,包括自定義輸入格式。
壓縮文件可能無法并行讀取,從而影響作業(yè)可伸縮性。
下表列出了當(dāng)前支持的壓縮方法

9.1 Scala實現(xiàn)

10 Transformation
10.1 map
Map轉(zhuǎn)換在DataSet的每個元素上應(yīng)用用戶定義的map函數(shù)。 它實現(xiàn)了一對一的映射,也就是說,函數(shù)必須返回一個元素。
以下代碼將Integer對的DataSet轉(zhuǎn)換為Integers的DataSet:
Scala實現(xiàn)


Java實現(xiàn)

10.2 filter
Scala實現(xiàn)

Java實現(xiàn)

10.3 mapPartition
MapPartition在單個函數(shù)調(diào)用中轉(zhuǎn)換并行分區(qū)。 map-partition函數(shù)將分區(qū)作為Iterable獲取,并且可以生成任意數(shù)量的結(jié)果值。 每個分區(qū)中的元素數(shù)量取決于并行度和先前的操作。
Scala實現(xiàn)

Java實現(xiàn)

10.4 first
Scala實現(xiàn)
10.5 Cross

11 Data Sinks


11.1 Java描述
Data Sinks使用DataSet并用于存儲或返回它們
使用OutputFormat描述數(shù)據(jù)接收器操作
Flink帶有各種內(nèi)置輸出格式,這些格式封裝在DataSet上的操作后面:
- writeAsText()/ TextOutputFormat
將元素按行順序?qū)懭胱址?。通過調(diào)用每個元素的toString()方法獲得字符串。 - writeAsFormattedText()/ TextOutputFormat
按字符串順序?qū)懭朐亍Mㄟ^為每個元素調(diào)用用戶定義的format()方法來獲取字符串。 - writeAsCsv(...)/ CsvOutputFormat
將元組寫為逗號分隔值文件。行和字段分隔符是可配置的。每個字段的值來自對象的toString()方法。 - print()/ printToErr()/ print(String msg)/ printToErr(String msg)
打印標(biāo)準(zhǔn)輸出/標(biāo)準(zhǔn)錯誤流上每個元素的toString()值??蛇x地,可以提供前綴(msg),其前綴為輸出。這有助于區(qū)分不同的打印調(diào)用。如果并行度大于1,則輸出也將以生成輸出的任務(wù)的標(biāo)識符為前綴。 - write()/ FileOutputFormat
自定義文件輸出的方法和基類。支持自定義對象到字節(jié)的轉(zhuǎn)換。 - output()/ OutputFormat
最通用的輸出方法,用于非基于文件的數(shù)據(jù)接收器(例如將結(jié)果存儲在數(shù)據(jù)庫中)。
可以將DataSet輸入到多個操作。程序可以編寫或打印數(shù)據(jù)集,同時對它們執(zhí)行其他轉(zhuǎn)換。
例子
標(biāo)準(zhǔn)數(shù)據(jù)接收方法:
// text data
DataSet<String> textData = // [...]
// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");
// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");
// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);
// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");
// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");
// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
new TextFormatter<Tuple2<Integer, Integer>>() {
public String format (Tuple2<Integer, Integer> value) {
return value.f1 + " - " + value.f0;
}
});
使用自定義輸出格式:
DataSet<Tuple3<String, Integer, Double>> myResult = [...]
// write Tuple DataSet to a relational database
myResult.output(
// build and configure OutputFormat
JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("insert into persons (name, age, height) values (?,?,?)")
.finish()
);
本地排序輸出
可以使用元組字段位置或字段表達式以指定順序在指定字段上對數(shù)據(jù)接收器的輸出進行本地排序。 這適用于每種輸出格式。
以下示例顯示如何使用此功能:
DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]
// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print();
// sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print();
// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...);
// sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...);
// sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);






