四種優(yōu)化 Apache Flink 應(yīng)用程序的方法

使用 Flink Tuples

當你使用類似于?groupBy,?join, 或者?keyBy?算子時,F(xiàn)link 提供了多種用于在你的數(shù)據(jù)集上選擇 key 的方法。你可以使用 key 選擇函數(shù),如下:

// Join movies and ratings datasets

movies.join(ratings)

? ? ? ? // Use movie id as a key in both cases

? ? ? ? .where(new KeySelector() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public String getKey(Movie m) throws Exception {

? ? ? ? ? ? ? ? return m.getId();

? ? ? ? ? ? }

? ? ? ? })

? ? ? ? .equalTo(new KeySelector() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public String getKey(Rating r) throws Exception {

? ? ? ? ? ? ? ? return r.getMovieId();

? ? ? ? ? ? }

? ? ? ? })

你甚至可以指定 POJO 類型中一個 field 的名字:

movies.join(ratings)

? ? // Use same fields as in the previous example

? ? .where("id")

? ? .equalTo("movieId")

但是如果你現(xiàn)在使用的是 Flink 元組類型(tuple types)的數(shù)據(jù),你可以簡單地指定將要作為 key 的字段在元組中的位置:

DataSet> movies = ...

DataSet> ratings = ...

movies.join(ratings)

? ? // Specify fields positions in tuples

? ? .where(0)

? ? .equalTo(1)

這種方法在 Flink 中將會獲得最佳的性能,但是可讀性方面呢?這是不是意味著你的代碼看起來像下面那樣:

DataSet> result = movies.join(ratings)

? ? .where(0)

? ? .equalTo(0)

? ? .with(new JoinFunction, Tuple2,

? ? ? ? ? ? ? ? ? ? ? Tuple3>() {

? ? ? ? // What is happening here?

? ? ? ? @Override

? ? ? ? public Tuple3 join(Tuple2 first,

? ? ? ? ? ? ? ? ? ? ? Tuple2 second) throws Exception {

? ? ? ? ? ? // Some tuples are joined with some other tuples and some fields are returned???

? ? ? ? ? ? return new Tuple3<>(first.f0, first.f1, second.f1);

? ? ? ? }

? ? });

在這種情況下,提高可讀性的常見方法是創(chuàng)建一個繼承自?TupleX?的類,并且實現(xiàn)其中的?getters?和?setters。下面是 Flink Gelly 類庫中?Edge?類的實現(xiàn),其中有三個 fileds,所以它直接繼承了?Tuple3?類:

public class Edge extends Tuple3 {

? ? public Edge(K source, K target, V value) {

? ? ? ? this.f0 = source;

? ? ? ? this.f1 = target;

? ? ? ? this.f2 = value;

? ? }


? ? // Getters and setters for readability

? ? public void setSource(K source) {

? ? ? ? this.f0 = source;

? ? }

? ? public K getSource() {

? ? ? ? return this.f0;

? ? }


? ? // Also has getters and setters for other fields

? ? ...

}

重用 Flink 對象

另外一種可以提升 Flink 應(yīng)用程序性能的方法是在用戶自定義函數(shù)返回數(shù)據(jù)時使用可變對象(mutable objects),請看看下面的例子:

stream

? ? .apply(new WindowFunction, String, TimeWindow>() {

? ? ? ? @Override

? ? ? ? public void apply(String userName, TimeWindow timeWindow,

? ? ? ? ? ? ? ? Iterable iterable,

? ? ? ? ? ? ? ? Collector> collector) throws Exception {

? ? ? ? ? ? long changesCount = ...

? ? ? ? ? ? // A new Tuple instance is created on every execution

? ? ? ? ? ? collector.collect(new Tuple2<>(userName, changesCount));

? ? ? ? }

? ? }

正如你所看到的,在我們每次調(diào)用?apply?函數(shù)的時候,我們都會創(chuàng)建一個 Tuple2 類型的實例,這將會給垃圾回收造成很大的壓力。解決這個問題的一種方法就是反復使用相同的實例:

stream

? ? .apply(new WindowFunction, String, TimeWindow>() {

? ? ? ? // Create an instance that we will reuse on every call

? ? ? ? private Tuple2 result = new Tuple<>();


? ? ? ? @Override

? ? ? ? public void apply(String userName, TimeWindow timeWindow,

? ? ? ? ? ? ? ? ? ? ? ? ? Iterable iterable,

? ? ? ? ? ? ? ? ? ? ? ? ? Collector> collector) throws Exception {

? ? ? ? ? ? long changesCount = ...


? ? ? ? ? ? // Set fields on an existing object instead of creating a new one

? ? ? ? ? ? result.f0 = userName;

? ? ? ? ? ? // Auto-boxing!! A new Long value may be created

? ? ? ? ? ? result.f1 = changesCount;


? ? ? ? ? ? // Reuse the same Tuple2 object

? ? ? ? ? ? collector.collect(result);

? ? ? ? }

? ? }

上面的代碼性能會好些。雖然我們在每次調(diào)用的時候只創(chuàng)建了一個?Tuple2?實例,但是我們還間接地創(chuàng)建了 Long 類型的實例。為了解決這個問題, Flink 內(nèi)部提供了一系列 value classes,比如:IntValue,?LongValue,?StringValue,?FloatValue?等。這些類的重點是為內(nèi)置類型提供了可變版本,所以我們可以在用戶自定義函數(shù)中重用這些類型,下面就是如何使用的例子:

stream

? ? .apply(new WindowFunction, String, TimeWindow>() {

? ? ? ? // Create a mutable count instance

? ? ? ? private LongValue count = new IntValue();

? ? ? ? // Assign mutable count to the tuple

? ? ? ? private Tuple2 result = new Tuple<>("", count);


? ? ? ? @Override

? ? ? ? // Notice that now we have a different return type

? ? ? ? public void apply(String userName, TimeWindow timeWindow,

? ? ? ? ? ? ? ? ? Iterable iterable,

? ? ? ? ? ? ? ? ? Collector> collector) throws Exception {

? ? ? ? ? ? long changesCount = ...


? ? ? ? ? ? // Set fields on an existing object instead of creating a new one

? ? ? ? ? ? result.f0 = userName;

? ? ? ? ? ? // Update mutable count value

? ? ? ? ? ? count.setValue(changesCount);


? ? ? ? ? ? // Reuse the same tuple and the same LongValue instance

? ? ? ? ? ? collector.collect(result);

? ? ? ? }

? ? }

上面這些使用習慣在 Flink 類庫中被普遍使用,比如 Flink Gelly。

使用函數(shù)注解

另一種優(yōu)化 Flink 應(yīng)用程序的方法是提供一些關(guān)于用戶自定義函數(shù)如何對輸入數(shù)據(jù)進行處理的信息。由于 Flink 無法解析和理解你的代碼,所以你提供一些關(guān)鍵的信息將會幫助 Flink 創(chuàng)建一個更加高效的執(zhí)行計劃。我們可以使用三種注解:

@ForwardedFields?– 指定輸入數(shù)據(jù)中哪些字段保持不變并且在輸出值中使用(specifies what fields in an input value were left unchanged and are used in an output value.)。

@NotForwardedFields?– 指定在輸出中相同位置未保留的字段(specifies fields which were not preserved in the same positions in the output.)。

@ReadFields?– 指定哪些字段在計算結(jié)果的時候用到。你只能指定那些在計算中使用的字段,而不是僅僅將數(shù)據(jù)拷貝到輸出中的字段。(specifies what fields were used to compute a result value. You should only specify fields that were used in computations and not merely copied to the output.)

我們來看看如何使用?ForwardedFields?注釋:

// Specify that the first element is copied without any changes

@ForwardedFields("0")

class MyFunction implements MapFunction, Tuple2> {

? ? @Override

? ? public Tuple2 map(Tuple2 value) {

? ? ? // Copy first field without change

? ? ? ? return new Tuple2<>(value.f0, value.f1 + 123);

? ? }

}

上面的注釋意味著輸入元組的第一個元素將不會改變,而且在返回元組中同樣處在第一個位置。

如果你沒有改變一個元素,只不過簡單地將它移到不同的位置上,你同樣可以使用 ForwardedFields 注釋來實現(xiàn)。下面例子中,我們簡單地將輸入元組的位置互相交換,并且直接返回:

// 1st element goes into the 2nd position, and 2nd element goes into the 1st position

@ForwardedFields("0->1; 1->0")

class SwapArguments implements MapFunction, Tuple2> {

? ? @Override

? ? public Tuple2 map(Tuple2 value) {

? ? ? // Swap elements in a tuple

? ? ? ? return new Tuple2<>(value.f1, value.f0);

? ? }

}

上面例子中提到的注釋只能應(yīng)用到只有一個輸入?yún)?shù)的函數(shù)中,比如?map?或者?flatMap。如果你有兩個輸入?yún)?shù)的函數(shù),你可以分別使用?ForwardedFieldsFirst?和?ForwardedFieldsSecond?注釋來為第一和第二個參數(shù)指定一些信息。

下面我們使用?ForwardedFieldsFirst?和?ForwardedFieldsSecond?注釋來為實現(xiàn) JoinFunction 接口的類指定相關(guān)的信息:

// Two fields from the input tuple are copied to the first and second positions of the output tuple

@ForwardedFieldsFirst("0; 1")

// The third field from the input tuple is copied to the third position of the output tuple

@ForwardedFieldsSecond("2")

class MyJoin implements JoinFunction, Tuple2, Tuple3>() {

? ? @Override

? ? public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {

? ? ? ? return new Tuple3<>(first.f0, first.f1, second.f1);

? ? }

})

Flink 同樣提供了?NotForwardedFieldsFirst,?NotForwardedFieldsSecond,?ReadFieldsFirst, 和?ReadFirldsSecond?注釋來實現(xiàn)相同的功能。

選擇 Join 類型

如果你為 Flink 提供了一些信息,可以使你的 Join 操作更快,在討論這個是如何工作之前,讓我們先了解 Fliink 是如何運行 Join 操作的。

當 Flink 處理批量數(shù)據(jù)時,集群中的每臺機器只存儲了部分的數(shù)據(jù)。為了執(zhí)行 Join 操作, Apache Flink 需要找到兩個數(shù)據(jù)集所有 key 相同的數(shù)據(jù)。為了做到這一點,F(xiàn)link 首先必須將兩個數(shù)據(jù)集擁有相同 key 的數(shù)據(jù)放在同一臺機器上。這里有兩種實現(xiàn)策略:

Repartition-Repartition strategy:在這種場景下,Join 的兩個數(shù)據(jù)集分別對它們的 key 使用相同的分區(qū)函數(shù)進行分區(qū),并經(jīng)過網(wǎng)絡(luò)發(fā)送數(shù)據(jù)。這就意味著如果數(shù)據(jù)集非常大,這將花費相當一部分時間將數(shù)據(jù)分發(fā)出去。

Broadcast-Forward strategy:這種場景下,大的數(shù)據(jù)集R不做處理,另一個比較小的數(shù)據(jù)集S將全部復制到集群中所有擁有R的一部分數(shù)據(jù)的機器上。

如果你使用一個比較小的數(shù)據(jù)集和一個比較大的數(shù)據(jù)集進行 join 操作,你可以使用 Broadcast-Forward 策略,這個很容易實現(xiàn):

ds1.join(ds2, JoinHint.BROADCAST_HASH_FIRST)

這種寫法表示第一個數(shù)據(jù)集要比第二個數(shù)據(jù)集小的多。

Flink 支持的其他 join 提示有以下幾種:

BROADCAST_HASH_SECOND?– 表示第二個數(shù)據(jù)集比較小

REPARTITION_HASH_FIRST?– 表示第一個數(shù)據(jù)集比較小

REPARTITION_HASH_SECOND?– 表示第二個數(shù)據(jù)集有點小

REPARTITION_SORT_MERGE?– 表示重新分區(qū)兩個數(shù)據(jù)集并使用排序和合并策略(sorting and merging strategy)

OPTIMIZER_CHOOSES?– Flink 優(yōu)化器將決定如何連接數(shù)據(jù)集

本文翻譯自:Four ways to optimize your Flink applications

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,593評論 19 139
  • pyspark.sql模塊 模塊上下文 Spark SQL和DataFrames的重要類: pyspark.sql...
    mpro閱讀 9,920評論 0 13
  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,858評論 0 10
  • 佛曰:受身無間,永遠不死,壽長乃無間地獄中之大劫。在佛經(jīng)中清楚說明了,五逆者死后將,被打落無間地獄。一殺父,二殺母...
    范淼閱讀 364評論 0 1
  • 之前一直在猶豫,徘徊,找不到努力學習的動力。 其實努力就是一種生活方式。要有限的努力,不要過度努力。要時常更新自己...
    漫整掘意閱讀 139評論 0 1

友情鏈接更多精彩內(nèi)容