使用 Flink Tuples
當你使用類似于?groupBy,?join, 或者?keyBy?算子時,F(xiàn)link 提供了多種用于在你的數(shù)據(jù)集上選擇 key 的方法。你可以使用 key 選擇函數(shù),如下:
// Join movies and ratings datasets
movies.join(ratings)
? ? ? ? // Use movie id as a key in both cases
? ? ? ? .where(new KeySelector() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public String getKey(Movie m) throws Exception {
? ? ? ? ? ? ? ? return m.getId();
? ? ? ? ? ? }
? ? ? ? })
? ? ? ? .equalTo(new KeySelector() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public String getKey(Rating r) throws Exception {
? ? ? ? ? ? ? ? return r.getMovieId();
? ? ? ? ? ? }
? ? ? ? })
你甚至可以指定 POJO 類型中一個 field 的名字:
movies.join(ratings)
? ? // Use same fields as in the previous example
? ? .where("id")
? ? .equalTo("movieId")
但是如果你現(xiàn)在使用的是 Flink 元組類型(tuple types)的數(shù)據(jù),你可以簡單地指定將要作為 key 的字段在元組中的位置:
DataSet> movies = ...
DataSet> ratings = ...
movies.join(ratings)
? ? // Specify fields positions in tuples
? ? .where(0)
? ? .equalTo(1)
這種方法在 Flink 中將會獲得最佳的性能,但是可讀性方面呢?這是不是意味著你的代碼看起來像下面那樣:
DataSet> result = movies.join(ratings)
? ? .where(0)
? ? .equalTo(0)
? ? .with(new JoinFunction, Tuple2,
? ? ? ? ? ? ? ? ? ? ? Tuple3>() {
? ? ? ? // What is happening here?
? ? ? ? @Override
? ? ? ? public Tuple3 join(Tuple2 first,
? ? ? ? ? ? ? ? ? ? ? Tuple2 second) throws Exception {
? ? ? ? ? ? // Some tuples are joined with some other tuples and some fields are returned???
? ? ? ? ? ? return new Tuple3<>(first.f0, first.f1, second.f1);
? ? ? ? }
? ? });
在這種情況下,提高可讀性的常見方法是創(chuàng)建一個繼承自?TupleX?的類,并且實現(xiàn)其中的?getters?和?setters。下面是 Flink Gelly 類庫中?Edge?類的實現(xiàn),其中有三個 fileds,所以它直接繼承了?Tuple3?類:
public class Edge extends Tuple3 {
? ? public Edge(K source, K target, V value) {
? ? ? ? this.f0 = source;
? ? ? ? this.f1 = target;
? ? ? ? this.f2 = value;
? ? }
? ? // Getters and setters for readability
? ? public void setSource(K source) {
? ? ? ? this.f0 = source;
? ? }
? ? public K getSource() {
? ? ? ? return this.f0;
? ? }
? ? // Also has getters and setters for other fields
? ? ...
}
重用 Flink 對象
另外一種可以提升 Flink 應(yīng)用程序性能的方法是在用戶自定義函數(shù)返回數(shù)據(jù)時使用可變對象(mutable objects),請看看下面的例子:
stream
? ? .apply(new WindowFunction, String, TimeWindow>() {
? ? ? ? @Override
? ? ? ? public void apply(String userName, TimeWindow timeWindow,
? ? ? ? ? ? ? ? Iterable iterable,
? ? ? ? ? ? ? ? Collector> collector) throws Exception {
? ? ? ? ? ? long changesCount = ...
? ? ? ? ? ? // A new Tuple instance is created on every execution
? ? ? ? ? ? collector.collect(new Tuple2<>(userName, changesCount));
? ? ? ? }
? ? }
正如你所看到的,在我們每次調(diào)用?apply?函數(shù)的時候,我們都會創(chuàng)建一個 Tuple2 類型的實例,這將會給垃圾回收造成很大的壓力。解決這個問題的一種方法就是反復使用相同的實例:
stream
? ? .apply(new WindowFunction, String, TimeWindow>() {
? ? ? ? // Create an instance that we will reuse on every call
? ? ? ? private Tuple2 result = new Tuple<>();
? ? ? ? @Override
? ? ? ? public void apply(String userName, TimeWindow timeWindow,
? ? ? ? ? ? ? ? ? ? ? ? ? Iterable iterable,
? ? ? ? ? ? ? ? ? ? ? ? ? Collector> collector) throws Exception {
? ? ? ? ? ? long changesCount = ...
? ? ? ? ? ? // Set fields on an existing object instead of creating a new one
? ? ? ? ? ? result.f0 = userName;
? ? ? ? ? ? // Auto-boxing!! A new Long value may be created
? ? ? ? ? ? result.f1 = changesCount;
? ? ? ? ? ? // Reuse the same Tuple2 object
? ? ? ? ? ? collector.collect(result);
? ? ? ? }
? ? }
上面的代碼性能會好些。雖然我們在每次調(diào)用的時候只創(chuàng)建了一個?Tuple2?實例,但是我們還間接地創(chuàng)建了 Long 類型的實例。為了解決這個問題, Flink 內(nèi)部提供了一系列 value classes,比如:IntValue,?LongValue,?StringValue,?FloatValue?等。這些類的重點是為內(nèi)置類型提供了可變版本,所以我們可以在用戶自定義函數(shù)中重用這些類型,下面就是如何使用的例子:
stream
? ? .apply(new WindowFunction, String, TimeWindow>() {
? ? ? ? // Create a mutable count instance
? ? ? ? private LongValue count = new IntValue();
? ? ? ? // Assign mutable count to the tuple
? ? ? ? private Tuple2 result = new Tuple<>("", count);
? ? ? ? @Override
? ? ? ? // Notice that now we have a different return type
? ? ? ? public void apply(String userName, TimeWindow timeWindow,
? ? ? ? ? ? ? ? ? Iterable iterable,
? ? ? ? ? ? ? ? ? Collector> collector) throws Exception {
? ? ? ? ? ? long changesCount = ...
? ? ? ? ? ? // Set fields on an existing object instead of creating a new one
? ? ? ? ? ? result.f0 = userName;
? ? ? ? ? ? // Update mutable count value
? ? ? ? ? ? count.setValue(changesCount);
? ? ? ? ? ? // Reuse the same tuple and the same LongValue instance
? ? ? ? ? ? collector.collect(result);
? ? ? ? }
? ? }
上面這些使用習慣在 Flink 類庫中被普遍使用,比如 Flink Gelly。
使用函數(shù)注解
另一種優(yōu)化 Flink 應(yīng)用程序的方法是提供一些關(guān)于用戶自定義函數(shù)如何對輸入數(shù)據(jù)進行處理的信息。由于 Flink 無法解析和理解你的代碼,所以你提供一些關(guān)鍵的信息將會幫助 Flink 創(chuàng)建一個更加高效的執(zhí)行計劃。我們可以使用三種注解:
@ForwardedFields?– 指定輸入數(shù)據(jù)中哪些字段保持不變并且在輸出值中使用(specifies what fields in an input value were left unchanged and are used in an output value.)。
@NotForwardedFields?– 指定在輸出中相同位置未保留的字段(specifies fields which were not preserved in the same positions in the output.)。
@ReadFields?– 指定哪些字段在計算結(jié)果的時候用到。你只能指定那些在計算中使用的字段,而不是僅僅將數(shù)據(jù)拷貝到輸出中的字段。(specifies what fields were used to compute a result value. You should only specify fields that were used in computations and not merely copied to the output.)
我們來看看如何使用?ForwardedFields?注釋:
// Specify that the first element is copied without any changes
@ForwardedFields("0")
class MyFunction implements MapFunction, Tuple2> {
? ? @Override
? ? public Tuple2 map(Tuple2 value) {
? ? ? // Copy first field without change
? ? ? ? return new Tuple2<>(value.f0, value.f1 + 123);
? ? }
}
上面的注釋意味著輸入元組的第一個元素將不會改變,而且在返回元組中同樣處在第一個位置。
如果你沒有改變一個元素,只不過簡單地將它移到不同的位置上,你同樣可以使用 ForwardedFields 注釋來實現(xiàn)。下面例子中,我們簡單地將輸入元組的位置互相交換,并且直接返回:
// 1st element goes into the 2nd position, and 2nd element goes into the 1st position
@ForwardedFields("0->1; 1->0")
class SwapArguments implements MapFunction, Tuple2> {
? ? @Override
? ? public Tuple2 map(Tuple2 value) {
? ? ? // Swap elements in a tuple
? ? ? ? return new Tuple2<>(value.f1, value.f0);
? ? }
}
上面例子中提到的注釋只能應(yīng)用到只有一個輸入?yún)?shù)的函數(shù)中,比如?map?或者?flatMap。如果你有兩個輸入?yún)?shù)的函數(shù),你可以分別使用?ForwardedFieldsFirst?和?ForwardedFieldsSecond?注釋來為第一和第二個參數(shù)指定一些信息。
下面我們使用?ForwardedFieldsFirst?和?ForwardedFieldsSecond?注釋來為實現(xiàn) JoinFunction 接口的類指定相關(guān)的信息:
// Two fields from the input tuple are copied to the first and second positions of the output tuple
@ForwardedFieldsFirst("0; 1")
// The third field from the input tuple is copied to the third position of the output tuple
@ForwardedFieldsSecond("2")
class MyJoin implements JoinFunction, Tuple2, Tuple3>() {
? ? @Override
? ? public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
? ? ? ? return new Tuple3<>(first.f0, first.f1, second.f1);
? ? }
})
Flink 同樣提供了?NotForwardedFieldsFirst,?NotForwardedFieldsSecond,?ReadFieldsFirst, 和?ReadFirldsSecond?注釋來實現(xiàn)相同的功能。
選擇 Join 類型
如果你為 Flink 提供了一些信息,可以使你的 Join 操作更快,在討論這個是如何工作之前,讓我們先了解 Fliink 是如何運行 Join 操作的。
當 Flink 處理批量數(shù)據(jù)時,集群中的每臺機器只存儲了部分的數(shù)據(jù)。為了執(zhí)行 Join 操作, Apache Flink 需要找到兩個數(shù)據(jù)集所有 key 相同的數(shù)據(jù)。為了做到這一點,F(xiàn)link 首先必須將兩個數(shù)據(jù)集擁有相同 key 的數(shù)據(jù)放在同一臺機器上。這里有兩種實現(xiàn)策略:
Repartition-Repartition strategy:在這種場景下,Join 的兩個數(shù)據(jù)集分別對它們的 key 使用相同的分區(qū)函數(shù)進行分區(qū),并經(jīng)過網(wǎng)絡(luò)發(fā)送數(shù)據(jù)。這就意味著如果數(shù)據(jù)集非常大,這將花費相當一部分時間將數(shù)據(jù)分發(fā)出去。
Broadcast-Forward strategy:這種場景下,大的數(shù)據(jù)集R不做處理,另一個比較小的數(shù)據(jù)集S將全部復制到集群中所有擁有R的一部分數(shù)據(jù)的機器上。
如果你使用一個比較小的數(shù)據(jù)集和一個比較大的數(shù)據(jù)集進行 join 操作,你可以使用 Broadcast-Forward 策略,這個很容易實現(xiàn):
ds1.join(ds2, JoinHint.BROADCAST_HASH_FIRST)
這種寫法表示第一個數(shù)據(jù)集要比第二個數(shù)據(jù)集小的多。
Flink 支持的其他 join 提示有以下幾種:
BROADCAST_HASH_SECOND?– 表示第二個數(shù)據(jù)集比較小
REPARTITION_HASH_FIRST?– 表示第一個數(shù)據(jù)集比較小
REPARTITION_HASH_SECOND?– 表示第二個數(shù)據(jù)集有點小
REPARTITION_SORT_MERGE?– 表示重新分區(qū)兩個數(shù)據(jù)集并使用排序和合并策略(sorting and merging strategy)
OPTIMIZER_CHOOSES?– Flink 優(yōu)化器將決定如何連接數(shù)據(jù)集