Flink實戰(zhàn)(四) - DataSet API編程

1 你將學(xué)到

◆ DataSet API開發(fā)概述

◆ 計數(shù)器

◆ DataSource

◆ 分布式緩存

◆ Transformation

◆ Sink

2 Data Set API 簡介

Flink中的DataSet程序是實現(xiàn)數(shù)據(jù)集轉(zhuǎn)換(例如,過濾,映射,連接,分組)的常規(guī)程序.

最初從某些Source源創(chuàng)建數(shù)據(jù)集(例如,通過讀取文件或從本地集合創(chuàng)建)

結(jié)果通過sink返回,接收器可以例如將數(shù)據(jù)寫入(分布式)文件或標(biāo)準(zhǔn)輸出(例如命令行終端)

image

Flink程序可以在各種環(huán)境中運行,單機運行或嵌入其他程序中

執(zhí)行可以在本地JVM中執(zhí)行,也可以在集群機器上執(zhí)行.

  • 有關(guān)Flink API基本概念的介紹,請參閱本系列的上一篇

Flink實戰(zhàn)(三) - 編程模型及核心概念

為了創(chuàng)建自己的Flink DataSet程序,鼓勵從Flink程序的解剖開始,逐步添加自己的轉(zhuǎn)換!

3 測試環(huán)境

image
image

4 Data Sources簡介

數(shù)據(jù)源創(chuàng)建初始數(shù)據(jù)集,例如來自文件或Java集合。創(chuàng)建數(shù)據(jù)集的一般機制是在InputFormat后面抽象的

Flink附帶了幾種內(nèi)置格式,可以從通用文件格式創(chuàng)建數(shù)據(jù)集。其中許多都在ExecutionEnvironment上有快捷方法。

4.1 基于文件

  • readTextFile(path)/ TextInputFormat
    按行讀取文件并將它們作為字符串返回
  • readTextFileWithValue(path)/ TextValueInputFormat
    按行讀取文件并將它們作為StringValues返回。 StringValues是可變字符串
  • readCsvFile(path)/ CsvInputFormat
    解析逗號(或其他字符)分隔字段的文件。返回元組,案例類對象或POJO的DataSet。支持基本的java類型及其Value對應(yīng)的字段類型
  • readFileOfPrimitives(path,delimiter)/ PrimitiveInputFormat
    使用給定的分隔符解析新行(或其他char序列)分隔的原始數(shù)據(jù)類型(如String或Integer)的文件
  • readSequenceFile(Key,Value,path)/ SequenceFileInputFormat
    創(chuàng)建JobConf并從類型為SequenceFileInputFormat,Key class和Value類的指定路徑中讀取文件,并將它們作為Tuple2 <Key,Value>返回。

4.2 基于集合

  • fromCollection(Iterable) - 從Iterable創(chuàng)建數(shù)據(jù)集。 Iterable返回的所有元素必須屬于同一類型
  • fromCollection(Iterator) - 從迭代器創(chuàng)建數(shù)據(jù)集。該類指定迭代器返回的元素的數(shù)據(jù)類型
  • fromElements(elements:_ *) - 根據(jù)給定的對象序列創(chuàng)建數(shù)據(jù)集。所有對象必須屬于同一類型
  • fromParallelCollection(SplittableIterator) - 并行地從迭代器創(chuàng)建數(shù)據(jù)集。該類指定迭代器返回的元素的數(shù)據(jù)類型
  • generateSequence(from,to) - 并行生成給定時間間隔內(nèi)的數(shù)字序列。

4.3 通用

  • readFile(inputFormat,path)/ FileInputFormat
    接受文件輸入格式
  • createInput(inputFormat)/ InputFormat

    接受通用輸入格式5 從集合創(chuàng)建DataSet5.1 Scala實現(xiàn)
    image

5.2 Java實現(xiàn)

image

6 從文件/文件夾創(chuàng)建DataSet

6.1 Scala實現(xiàn)

文件

image
image

文件夾

image
image

Java實現(xiàn)

image
image

7 從csv文件創(chuàng)建Dataset

image

7.1 Scala實現(xiàn)

  • 注意忽略第一行


    image
image
  • includedFields參數(shù)使用
    image
  • 定義一個POJO
    image

    image
    8 從遞歸文件夾的內(nèi)容創(chuàng)建DataSet
    image
    8.1 Scala實現(xiàn)
    image

9從壓縮文件中創(chuàng)建DataSet

Flink目前支持輸入文件的透明解壓縮,如果它們標(biāo)有適當(dāng)?shù)奈募U展名。 特別是,這意味著不需要進一步配置輸入格式,并且任何FileInputFormat都支持壓縮,包括自定義輸入格式。

壓縮文件可能無法并行讀取,從而影響作業(yè)可伸縮性。

下表列出了當(dāng)前支持的壓縮方法

image

9.1 Scala實現(xiàn)

image

10 Transformation

10.1 map

Map轉(zhuǎn)換在DataSet的每個元素上應(yīng)用用戶定義的map函數(shù)。 它實現(xiàn)了一對一的映射,也就是說,函數(shù)必須返回一個元素。

以下代碼將Integer對的DataSet轉(zhuǎn)換為Integers的DataSet:

Scala實現(xiàn)

image
image

Java實現(xiàn)

image

10.2 filter

Scala實現(xiàn)

image

Java實現(xiàn)

image

10.3 mapPartition

MapPartition在單個函數(shù)調(diào)用中轉(zhuǎn)換并行分區(qū)。 map-partition函數(shù)將分區(qū)作為Iterable獲取,并且可以生成任意數(shù)量的結(jié)果值。 每個分區(qū)中的元素數(shù)量取決于并行度和先前的操作。

Scala實現(xiàn)

image

Java實現(xiàn)

image

10.4 first

Scala實現(xiàn)

10.5 Cross

image

11 Data Sinks

image
image

11.1 Java描述

Data Sinks使用DataSet并用于存儲或返回它們

使用OutputFormat描述數(shù)據(jù)接收器操作

Flink帶有各種內(nèi)置輸出格式,這些格式封裝在DataSet上的操作后面:

  • writeAsText()/ TextOutputFormat
    將元素按行順序?qū)懭胱址?。通過調(diào)用每個元素的toString()方法獲得字符串。
  • writeAsFormattedText()/ TextOutputFormat
    按字符串順序?qū)懭朐亍Mㄟ^為每個元素調(diào)用用戶定義的format()方法來獲取字符串。
  • writeAsCsv(...)/ CsvOutputFormat
    將元組寫為逗號分隔值文件。行和字段分隔符是可配置的。每個字段的值來自對象的toString()方法。
  • print()/ printToErr()/ print(String msg)/ printToErr(String msg)
    打印標(biāo)準(zhǔn)輸出/標(biāo)準(zhǔn)錯誤流上每個元素的toString()值??蛇x地,可以提供前綴(msg),其前綴為輸出。這有助于區(qū)分不同的打印調(diào)用。如果并行度大于1,則輸出也將以生成輸出的任務(wù)的標(biāo)識符為前綴。
  • write()/ FileOutputFormat
    自定義文件輸出的方法和基類。支持自定義對象到字節(jié)的轉(zhuǎn)換。
  • output()/ OutputFormat
    最通用的輸出方法,用于非基于文件的數(shù)據(jù)接收器(例如將結(jié)果存儲在數(shù)據(jù)庫中)。
    可以將DataSet輸入到多個操作。程序可以編寫或打印數(shù)據(jù)集,同時對它們執(zhí)行其他轉(zhuǎn)換。

例子

標(biāo)準(zhǔn)數(shù)據(jù)接收方法:

// text data
DataSet<String> textData = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });

使用自定義輸出格式:

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// write Tuple DataSet to a relational database
myResult.output(
    // build and configure OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                    .setDBUrl("jdbc:derby:memory:persons")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );

本地排序輸出

可以使用元組字段位置或字段表達式以指定順序在指定字段上對數(shù)據(jù)接收器的輸出進行本地排序。 這適用于每種輸出格式。

以下示例顯示如何使用此功能:

DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print();

// sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print();

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...);

// sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...);

// sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);

參考

DataSet Transformations

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容