1.startNewChain
public class Test3 {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-01-17
* Time: 22:04
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s, 1);
}
}).setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> filter = map.filter(x -> x.f0.startsWith("a")).startNewChain();
KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = filter.keyBy(0);
keyBy.sum(1).print().setParallelism(1);
environment.execute("job");
}
}

2.disableChaining

3.join
public class Join {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-01-18
* Time: 21:22
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
DataStreamSource<String> source1 = environment.socketTextStream("localhost", 9999);
//添加水位線
SingleOutputStreamOperator<String> watermarks = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String element) {
return Long.parseLong(element.split(",")[0]);
}
});
SingleOutputStreamOperator<String> watermarks1 = source1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String element) {
return Long.parseLong(element.split(",")[0]);
}
});
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> map = watermarks.map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
@Override
public Tuple3<Long, String, Integer> map(String s) throws Exception {
String[] split = s.split(",");
return Tuple3.of(Long.parseLong(split[0]), split[1], Integer.parseInt(split[2]));
}
});
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> map1 = watermarks1.map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
@Override
public Tuple3<Long, String, Integer> map(String s) throws Exception {
String[] split = s.split(",");
return Tuple3.of(Long.parseLong(split[0]), split[1], Integer.parseInt(split[2]));
}
});
map.join(map)
//條件
.where(new KeySelector<Tuple3<Long, String, Integer>, String>() {
@Override
public String getKey(Tuple3<Long, String, Integer> longStringIntegerTuple3) throws Exception {
return longStringIntegerTuple3.f1;
}
//條件
}).equalTo(new KeySelector<Tuple3<Long, String, Integer>, String>() {
@Override
public String getKey(Tuple3<Long, String, Integer> longStringIntegerTuple3) throws Exception {
return longStringIntegerTuple3.f1;
}
//窗口
}).window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple6<Long, String, Integer, Long, String, Integer>>() {
@Override
public Tuple6<Long, String, Integer, Long, String, Integer> join(Tuple3<Long, String, Integer> l1, Tuple3<Long, String, Integer> l2) throws Exception {
return Tuple6.of(l1.f0, l1.f1, l2.f2, l2.f0, l2.f1, l2.f2);
}
}).print();
environment.execute("joinjob");
}
}
4.LeftJoin/RightJoin
public class LeftJoin {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-01-18
* Time: 22:58
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
DataStreamSource<String> source1 = environment.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<String> watermarks = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String element) {
return Long.parseLong(element.split(",")[0]);
}
});
SingleOutputStreamOperator<String> watermarks1 = source1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String element) {
return Long.parseLong(element.split(",")[0]);
}
});
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> map = watermarks.map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
@Override
public Tuple3<Long, String, Integer> map(String s) throws Exception {
String[] split = s.split(",");
return Tuple3.of(Long.parseLong(split[0]), split[1], Integer.parseInt(split[2]));
}
});
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> map1 = watermarks1.map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
@Override
public Tuple3<Long, String, Integer> map(String s) throws Exception {
String[] split = s.split(",");
return Tuple3.of(Long.parseLong(split[0]), split[1], Integer.parseInt(split[2]));
}
});
map.coGroup(map1)
.where(new KeySelector<Tuple3<Long, String, Integer>, String>() {
@Override
public String getKey(Tuple3<Long, String, Integer> longStringIntegerTuple3) throws Exception {
return longStringIntegerTuple3.f1;
}
})
.equalTo(new KeySelector<Tuple3<Long, String, Integer>, String>() {
@Override
public String getKey(Tuple3<Long, String, Integer> longStringIntegerTuple3) throws Exception {
return longStringIntegerTuple3.f1;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple6<Long, String, Integer, Long, String, Integer>>() {
@Override
public void coGroup(Iterable<Tuple3<Long, String, Integer>> left, Iterable<Tuple3<Long, String, Integer>> right, Collector<Tuple6<Long, String, Integer, Long, String, Integer>> collector) throws Exception {
//實(shí)現(xiàn)左外連接
//設(shè)置一個標(biāo)簽,只要右邊的為空就設(shè)置為false
//如果是右外連接則改變順序即可
for (Tuple3<Long, String, Integer> first : left) {
boolean flag = true;
for (Tuple3<Long, String, Integer> second : right) {
collector.collect(Tuple6.of(first.f0, first.f1, first.f2, second.f0, second.f1, second.f2));
flag = false;
}
if (flag) {
collector.collect(Tuple6.of(first.f0, first.f1, first.f2, null, null, null));
}
}
}
})
.print().setParallelism(1);
environment.execute("leftjoin");
}
}
5.intervalJoin
public class MyProcessJoinFunction extends ProcessJoinFunction<
Tuple3<Long, String, String>, //第一個數(shù)據(jù)流(左流)輸入的數(shù)據(jù)類型
Tuple3<Long, String, String>, //第二個數(shù)據(jù)流(右流)輸入的數(shù)據(jù)類型
Tuple6<Long, String, String, Long, String, String>> { //join后輸出的數(shù)據(jù)類型
@Override
public void processElement(Tuple3<Long, String, String> left, //左流輸入的一條數(shù)據(jù)
Tuple3<Long, String, String> right, //右流輸入的一條數(shù)據(jù)
Context ctx, //上下文信息,可以獲取各個流的timestamp和側(cè)流輸出的output
//用來輸出join上的數(shù)據(jù)的Collector
Collector<Tuple6<Long, String, String, Long, String, String>> out)
throws Exception {
//將join上的數(shù)據(jù)添加到Collector中輸出
out.collect(Tuple6.of(left.f0, left.f1, left.f2, right.f0, right.f1, right.f2));
}
}
public class IntervalJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//1000,A,1
DataStreamSource<String> leftLines = env.socketTextStream("localhost", 8888);
//2000,A,2
DataStreamSource<String> rightLines = env.socketTextStream("localhost", 9999);
//提取第一個流中數(shù)據(jù)的EventTime
DataStream<String> leftWaterMarkStream = leftLines
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String line) {
return Long.parseLong(line.split(",")[0]);
}
});
//提取第二個流中數(shù)據(jù)的EventTime
DataStream<String> rightWaterMarkStream = rightLines
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String line) {
return Long.parseLong(line.split(",")[0]);
}
});
//對第一個流整理成tuple3
DataStream<Tuple3<Long, String, String>> leftStream = leftWaterMarkStream.map(
new MapFunction<String, Tuple3<Long, String, String>>() {
@Override
public Tuple3<Long, String, String> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
}
);
//對第二個流整理成tuple3
DataStream<Tuple3<Long, String, String>> rightStream = rightWaterMarkStream.map(
new MapFunction<String, Tuple3<Long, String, String>>() {
@Override
public Tuple3<Long, String, String> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
}
);
DataStream<Tuple6<Long, String, String, Long, String, String>> joinedStream = leftStream
.keyBy(t -> t.f1) //指定第一個流分組KeySelector
.intervalJoin(rightStream.keyBy(t -> t.f1)) //調(diào)用intervalJoin方法并指定第二個流的分組KeySelector
.between(Time.seconds(-1), Time.seconds(1)) //設(shè)置join的時間區(qū)間范圍為當(dāng)前數(shù)據(jù)時間±1秒
.upperBoundExclusive() //默認(rèn)join時間范圍為前后都包括的閉區(qū)間,現(xiàn)在設(shè)置為前閉后開區(qū)間
.process(new MyProcessJoinFunction()); //調(diào)用process方法中傳入自定義的MyProcessJoinFunction
joinedStream.print(); //調(diào)用print sink 輸出結(jié)果
env.execute("IntervalJoinDemo");
}
}
6.任務(wù)槽和資源
每個TaskManager是一個JVM進(jìn)程,并且可以在單獨(dú)的線程中執(zhí)行一個或多個子任務(wù)。為了控制worker接受多少個任務(wù),worker具有所謂的任務(wù)槽【至少有一個】。
每個任務(wù)槽代表TaskManager的資源的固定子集。例如,具有三個插槽的TaskManager會將其托管內(nèi)存的1/3專用于每個插槽。分配資源意味著子任務(wù)不會與其他作業(yè)的子任務(wù)競爭托管內(nèi)存,而是具有一定數(shù)量的保留托管內(nèi)存。請注意,此處沒有CPU隔離。當(dāng)前插槽僅將任務(wù)的托管內(nèi)存分開。
通過調(diào)整任務(wù)槽的數(shù)量,用戶可以定義子任務(wù)如何相互隔離。每個TaskManager具有一個插槽,這意味著每個任務(wù)組都在單獨(dú)的JVM上運(yùn)行。具有多個插槽意味著更多子任務(wù)共享同一個JVM。同一JVM中的任務(wù)共享TCP連接【通過多路復(fù)用】和心跳消息。他們還可以共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),從而減少每個任務(wù)的開銷。圖解如下:

默認(rèn)情況下,F(xiàn)link允許子任務(wù)共享插槽,即使它們是不同任務(wù)的子任務(wù)也是如此,只要它們來自同一任務(wù)即可。結(jié)果是一個插槽可以容納整個作業(yè)流水線。允許此插槽共享有兩個主要好處:
Flink集群所需的任務(wù)槽與作業(yè)中使用的最高并行度恰好一樣多。無需計(jì)算一個程序總共包含多少個任務(wù)。
更容易獲得更好的資源利用率。如果沒有插槽共享,則非密集型source/map子任務(wù)將阻塞與資源密集型窗口子任務(wù)一樣多的資源。通過插槽共享,可以充分利用插槽資源,同時確保沉重的子任務(wù)在TaskManager之間公平分配
