2021-01-18-Flink-23(Flink Join)

1.startNewChain

public class Test3 {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-17
     * Time: 22:04
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return Tuple2.of(s, 1);
            }
        }).setParallelism(1);
        SingleOutputStreamOperator<Tuple2<String, Integer>> filter = map.filter(x -> x.f0.startsWith("a")).startNewChain();
        KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = filter.keyBy(0);
        keyBy.sum(1).print().setParallelism(1);
        environment.execute("job");
    }
}
image.png

2.disableChaining

image.png

3.join

public class Join {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-18
     * Time: 21:22
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        DataStreamSource<String> source1 = environment.socketTextStream("localhost", 9999);

        //添加水位線
        SingleOutputStreamOperator<String> watermarks = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });
        SingleOutputStreamOperator<String> watermarks1 = source1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });

        SingleOutputStreamOperator<Tuple3<Long, String, Integer>> map = watermarks.map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
            @Override
            public Tuple3<Long, String, Integer> map(String s) throws Exception {
                String[] split = s.split(",");
                return Tuple3.of(Long.parseLong(split[0]), split[1], Integer.parseInt(split[2]));
            }
        });

        SingleOutputStreamOperator<Tuple3<Long, String, Integer>> map1 = watermarks1.map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
            @Override
            public Tuple3<Long, String, Integer> map(String s) throws Exception {
                String[] split = s.split(",");
                return Tuple3.of(Long.parseLong(split[0]), split[1], Integer.parseInt(split[2]));
            }
        });
        map.join(map)
                //條件
                .where(new KeySelector<Tuple3<Long, String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple3<Long, String, Integer> longStringIntegerTuple3) throws Exception {
                        return longStringIntegerTuple3.f1;
                    }
                    //條件
                }).equalTo(new KeySelector<Tuple3<Long, String, Integer>, String>() {
            @Override
            public String getKey(Tuple3<Long, String, Integer> longStringIntegerTuple3) throws Exception {
                return longStringIntegerTuple3.f1;
            }
            //窗口
        }).window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple6<Long, String, Integer, Long, String, Integer>>() {
                    @Override
                    public Tuple6<Long, String, Integer, Long, String, Integer> join(Tuple3<Long, String, Integer> l1, Tuple3<Long, String, Integer> l2) throws Exception {
                        return Tuple6.of(l1.f0, l1.f1, l2.f2, l2.f0, l2.f1, l2.f2);
                    }
                }).print();

        environment.execute("joinjob");

    }
}

4.LeftJoin/RightJoin

public class LeftJoin {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-18
     * Time: 22:58
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        DataStreamSource<String> source1 = environment.socketTextStream("localhost", 9999);


        SingleOutputStreamOperator<String> watermarks = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });

        SingleOutputStreamOperator<String> watermarks1 = source1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });

        SingleOutputStreamOperator<Tuple3<Long, String, Integer>> map = watermarks.map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
            @Override
            public Tuple3<Long, String, Integer> map(String s) throws Exception {
                String[] split = s.split(",");
                return Tuple3.of(Long.parseLong(split[0]), split[1], Integer.parseInt(split[2]));
            }
        });

        SingleOutputStreamOperator<Tuple3<Long, String, Integer>> map1 = watermarks1.map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
            @Override
            public Tuple3<Long, String, Integer> map(String s) throws Exception {
                String[] split = s.split(",");
                return Tuple3.of(Long.parseLong(split[0]), split[1], Integer.parseInt(split[2]));
            }
        });

        map.coGroup(map1)
                .where(new KeySelector<Tuple3<Long, String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple3<Long, String, Integer> longStringIntegerTuple3) throws Exception {
                        return longStringIntegerTuple3.f1;
                    }
                })
                .equalTo(new KeySelector<Tuple3<Long, String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple3<Long, String, Integer> longStringIntegerTuple3) throws Exception {
                        return longStringIntegerTuple3.f1;
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple6<Long, String, Integer, Long, String, Integer>>() {
                    @Override
                    public void coGroup(Iterable<Tuple3<Long, String, Integer>> left, Iterable<Tuple3<Long, String, Integer>> right, Collector<Tuple6<Long, String, Integer, Long, String, Integer>> collector) throws Exception {
                        //實(shí)現(xiàn)左外連接
                        //設(shè)置一個標(biāo)簽,只要右邊的為空就設(shè)置為false
                        //如果是右外連接則改變順序即可
                        for (Tuple3<Long, String, Integer> first : left) {
                            boolean flag = true;

                            for (Tuple3<Long, String, Integer> second : right) {
                                collector.collect(Tuple6.of(first.f0, first.f1, first.f2, second.f0, second.f1, second.f2));
                                flag = false;
                            }
                            if (flag) {
                                collector.collect(Tuple6.of(first.f0, first.f1, first.f2, null, null, null));
                            }
                        }
                    }
                })
                .print().setParallelism(1);
        environment.execute("leftjoin");

    }
}

5.intervalJoin

public class MyProcessJoinFunction extends ProcessJoinFunction<
        Tuple3<Long, String, String>, //第一個數(shù)據(jù)流(左流)輸入的數(shù)據(jù)類型
        Tuple3<Long, String, String>, //第二個數(shù)據(jù)流(右流)輸入的數(shù)據(jù)類型
        Tuple6<Long, String, String, Long, String, String>> { //join后輸出的數(shù)據(jù)類型
    @Override
    public void processElement(Tuple3<Long, String, String> left, //左流輸入的一條數(shù)據(jù)
                               Tuple3<Long, String, String> right, //右流輸入的一條數(shù)據(jù)
                               Context ctx, //上下文信息,可以獲取各個流的timestamp和側(cè)流輸出的output
                               //用來輸出join上的數(shù)據(jù)的Collector
                               Collector<Tuple6<Long, String, String, Long, String, String>> out)
            throws Exception {
        //將join上的數(shù)據(jù)添加到Collector中輸出
        out.collect(Tuple6.of(left.f0, left.f1, left.f2, right.f0, right.f1, right.f2));
    }
}
public class IntervalJoin {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //1000,A,1
        DataStreamSource<String> leftLines = env.socketTextStream("localhost", 8888);
        //2000,A,2
        DataStreamSource<String> rightLines = env.socketTextStream("localhost", 9999);

        //提取第一個流中數(shù)據(jù)的EventTime
        DataStream<String> leftWaterMarkStream = leftLines
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(String line) {
                        return Long.parseLong(line.split(",")[0]);
                    }
                });
        //提取第二個流中數(shù)據(jù)的EventTime
        DataStream<String> rightWaterMarkStream = rightLines
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(String line) {
                        return Long.parseLong(line.split(",")[0]);
                    }
                });
        //對第一個流整理成tuple3
        DataStream<Tuple3<Long, String, String>> leftStream = leftWaterMarkStream.map(
                new MapFunction<String, Tuple3<Long, String, String>>() {
                    @Override
                    public Tuple3<Long, String, String> map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
                    }
                }
        );
        //對第二個流整理成tuple3
        DataStream<Tuple3<Long, String, String>> rightStream = rightWaterMarkStream.map(
                new MapFunction<String, Tuple3<Long, String, String>>() {
                    @Override
                    public Tuple3<Long, String, String> map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
                    }
                }
        );
        DataStream<Tuple6<Long, String, String, Long, String, String>> joinedStream = leftStream
                .keyBy(t -> t.f1) //指定第一個流分組KeySelector
                .intervalJoin(rightStream.keyBy(t -> t.f1)) //調(diào)用intervalJoin方法并指定第二個流的分組KeySelector
                .between(Time.seconds(-1), Time.seconds(1)) //設(shè)置join的時間區(qū)間范圍為當(dāng)前數(shù)據(jù)時間±1秒
                .upperBoundExclusive() //默認(rèn)join時間范圍為前后都包括的閉區(qū)間,現(xiàn)在設(shè)置為前閉后開區(qū)間
                .process(new MyProcessJoinFunction()); //調(diào)用process方法中傳入自定義的MyProcessJoinFunction
        joinedStream.print(); //調(diào)用print sink 輸出結(jié)果
        env.execute("IntervalJoinDemo");
    }
}

6.任務(wù)槽和資源

每個TaskManager是一個JVM進(jìn)程,并且可以在單獨(dú)的線程中執(zhí)行一個或多個子任務(wù)。為了控制worker接受多少個任務(wù),worker具有所謂的任務(wù)槽【至少有一個】。
每個任務(wù)槽代表TaskManager的資源的固定子集。例如,具有三個插槽的TaskManager會將其托管內(nèi)存的1/3專用于每個插槽。分配資源意味著子任務(wù)不會與其他作業(yè)的子任務(wù)競爭托管內(nèi)存,而是具有一定數(shù)量的保留托管內(nèi)存。請注意,此處沒有CPU隔離。當(dāng)前插槽僅將任務(wù)的托管內(nèi)存分開。
通過調(diào)整任務(wù)槽的數(shù)量,用戶可以定義子任務(wù)如何相互隔離。每個TaskManager具有一個插槽,這意味著每個任務(wù)組都在單獨(dú)的JVM上運(yùn)行。具有多個插槽意味著更多子任務(wù)共享同一個JVM。同一JVM中的任務(wù)共享TCP連接【通過多路復(fù)用】和心跳消息。他們還可以共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),從而減少每個任務(wù)的開銷。圖解如下:

image.png

默認(rèn)情況下,F(xiàn)link允許子任務(wù)共享插槽,即使它們是不同任務(wù)的子任務(wù)也是如此,只要它們來自同一任務(wù)即可。結(jié)果是一個插槽可以容納整個作業(yè)流水線。允許此插槽共享有兩個主要好處:
Flink集群所需的任務(wù)槽與作業(yè)中使用的最高并行度恰好一樣多。無需計(jì)算一個程序總共包含多少個任務(wù)。
更容易獲得更好的資源利用率。如果沒有插槽共享,則非密集型source/map子任務(wù)將阻塞與資源密集型窗口子任務(wù)一樣多的資源。通過插槽共享,可以充分利用插槽資源,同時確保沉重的子任務(wù)在TaskManager之間公平分配

image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容