Flink 使用介紹相關(guān)文檔目錄
背景
數(shù)據(jù)分流在Flink中叫做旁路輸出(side output)。Flink的工作流程可以理解為是一條流水線。我們編寫的程序是流水線上的各個工序。然而,在一些場景中,我們將一部分滿足特定條件的數(shù)據(jù)從主線分出去,形成旁路輸出,好比是流水線中發(fā)現(xiàn)不合格的產(chǎn)品必須要剔除一樣。本篇為大家?guī)鞦link使用旁路輸出的方法。
OutputTag
Flink可以支持1個或多個side output。不同的side output使用OutputTag區(qū)分。OutputTag是side output數(shù)據(jù)流的標記,和數(shù)據(jù)流嚴格對應(yīng)。
OutputTag使用如下方式創(chuàng)建:
public static final OutputTag<X> someTag = new OutputTag<X>("tag-name"){};
其中泛型X為旁路輸出流的數(shù)據(jù)類型,tag-name為tag的名稱。
需要注意的是我們必須使用匿名內(nèi)部類的方式創(chuàng)建OutputTag。下面這種寫法是錯誤的:
public static final OutputTag<X> someTag = new OutputTag<X>("tag-name");
這是因為Flink內(nèi)部使用clazz.getGenericSuperclass()方式獲取父類的Type,返回的類型是ParameterizedType,從中可以獲取到泛型類型(代碼位于TypeExtractor::getParameterType)。如果不使用匿名內(nèi)部類則無法獲取到泛型類型,會引發(fā)錯誤。
遲到數(shù)據(jù)分流
下面這個例子是將遲到的數(shù)據(jù)單獨從旁路輸出。代碼和分析如下所示:
public class OutOfOrderDemo {
// 創(chuàng)建tag
public static final OutputTag<Tuple2<String, Integer>> lateTag = new OutputTag<Tuple2<String, Integer>>("late"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 示例數(shù)據(jù),其中D亂序,I來遲(H到來的時候認為15000ms之前的數(shù)據(jù)已經(jīng)到齊)
SingleOutputStreamOperator<Tuple2<String, Integer>> source = executionEnvironment.fromElements(
new Tuple2<>("A", 0),
new Tuple2<>("B", 1000),
new Tuple2<>("C", 2000),
new Tuple2<>("D", 7000),
new Tuple2<>("E", 3000),
new Tuple2<>("F", 4000),
new Tuple2<>("G", 5000),
new Tuple2<>("H", 20000),
new Tuple2<>("I", 8000)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forGenerator(new WatermarkGeneratorSupplier<Tuple2<String, Integer>>() {
// 這里自定義WatermarkGenerator的原因是Flink按照運行時間周期發(fā)送watermark,但我們的例子是單次執(zhí)行的,可以認為數(shù)據(jù)是一瞬間到來
// 因此我們改寫為每到來一條數(shù)據(jù)發(fā)送一次watermark,watermark的時間戳為數(shù)據(jù)的事件事件減去5000毫秒,意思是最多容忍數(shù)據(jù)來遲5000毫秒
@Override
public WatermarkGenerator<Tuple2<String, Integer>> createWatermarkGenerator(Context context) {
return new WatermarkGenerator<Tuple2<String, Integer>>() {
@Override
public void onEvent(Tuple2<String, Integer> event, long eventTimestamp, WatermarkOutput output) {
long watermark = eventTimestamp - 5000L < 0 ? 0L : eventTimestamp - 5000L;
output.emitWatermark(new Watermark(watermark));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
}
};
}
// 取第二個字段為watermark
}).withTimestampAssigner((element, timestamp) -> element.f1));
// 窗口大小5秒,允許延遲5秒
// watermark和allowedLateness的區(qū)別是,watermark決定了什么時候窗口數(shù)據(jù)觸發(fā)計算,allowedLateness決定什么數(shù)據(jù)被認為是lateElement,從而發(fā)送到sideOutput
// 設(shè)置side output tag
source.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(5)).sideOutputLateData(lateTag).process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>() {
@Override
public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
Iterator<Tuple2<String, Integer>> iterator = elements.iterator();
System.out.println("--------------------");
while(iterator.hasNext()) {
System.out.println(iterator.next());
}
}
// 打印sideoutput流內(nèi)容
}).getSideOutput(lateTag).process(new ProcessFunction<Tuple2<String, Integer>, Object>() {
@Override
public void processElement(Tuple2<String, Integer> value, ProcessFunction<Tuple2<String, Integer>, Object>.Context ctx, Collector<Object> out) throws Exception {
System.out.println("Late element: " + value);
}
});
executionEnvironment.execute();
}
}
從執(zhí)行結(jié)果可以發(fā)現(xiàn)打印出了Late element: (I,8000)。表明I為遲到的元素,它被發(fā)往旁路輸出。
條件分流
除了遲到數(shù)據(jù)輸出之外,用戶還可以根據(jù)自定義條件將數(shù)據(jù)分流。下面的例子根據(jù)一個整數(shù)數(shù)據(jù)流元素的奇偶性,分發(fā)到不同的side output中。代碼如下:
public class SplitDemo {
public static final OutputTag<Integer> evenTag = new OutputTag<Integer>("even"){};
public static final OutputTag<Integer> oddTag = new OutputTag<Integer>("odd"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> source = executionEnvironment.fromElements(1, 2, 3, 4, 5);
SingleOutputStreamOperator<Integer> process = source.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, ProcessFunction<Integer, Integer>.Context ctx, Collector<Integer> out) throws Exception {
if (value % 2 == 0) {
// 這里不使用out.collect,而是使用ctx.output
// 這個方法多了一個參數(shù),可以指定output tag,從而實現(xiàn)數(shù)據(jù)分流
ctx.output(evenTag, value);
} else {
ctx.output(oddTag, value);
}
}
});
// 依賴OutputTag獲取對應(yīng)的旁路輸出
DataStream<Integer> evenStream = process.getSideOutput(evenTag);
DataStream<Integer> oddStream = process.getSideOutput(oddTag);
// 分別打印兩個旁路輸出流中的數(shù)據(jù)
evenStream.process(new ProcessFunction<Integer, String>() {
@Override
public void processElement(Integer value, ProcessFunction<Integer, String>.Context ctx, Collector<String> out) throws Exception {
out.collect("Even: " + value);
}
}).print();
oddStream.process(new ProcessFunction<Integer, String>() {
@Override
public void processElement(Integer value, ProcessFunction<Integer, String>.Context ctx, Collector<String> out) throws Exception {
out.collect("Odd: " + value);
}
}).print();
executionEnvironment.execute();
}
}
條件分流在實際開發(fā)中用途非常廣泛。比如將數(shù)據(jù)分類發(fā)往不同的下游sink。也可以將不符合規(guī)范無法處理的異常數(shù)據(jù)通過旁路輸出收集起來,便于問題的收集和定位。
本博客為作者原創(chuàng),歡迎大家參與討論和批評指正。如需轉(zhuǎn)載請注明出處。