Flink 使用介紹相關(guān)文檔目錄
Job提交出現(xiàn)異常:No ExecutorFactory found to execute the application
詳細(xì)的報(bào)錯(cuò)如下所示:
Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1931)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1836)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
at com.paultech.CEPProblem.main(CEPProblem.java:73)
原因分析:
經(jīng)過排查,發(fā)現(xiàn)是從Flink 1.11開始,flink-streaming-java中的flink-clients依賴被移除。運(yùn)行的時(shí)候需要添加這個(gè)依賴,如下所示:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
其中scala.binary.version和flink.version屬性修改為實(shí)際項(xiàng)目中使用的版本
StackOverflow中相關(guān)問題鏈接:https://stackoverflow.com/questions/63032060/upgraded-flink-from-1-10-to-1-11-met-error-no-executorfactory-found-to-execute
Flink CEP 作業(yè)執(zhí)行結(jié)果異常
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<TemperatureEvent> input = env.fromElements(new TemperatureEvent(1, "Device01", 22.0),
new TemperatureEvent(1, "Device01", 27.1), new TemperatureEvent(2, "Device01", 28.1),
new TemperatureEvent(1, "Device01", 22.2), new TemperatureEvent(3, "Device01", 22.1),
new TemperatureEvent(1, "Device02", 22.3), new TemperatureEvent(4, "Device02", 22.1),
new TemperatureEvent(1, "Device02", 22.4), new TemperatureEvent(5, "Device02", 22.7),
new TemperatureEvent(1, "Device02", 27.0), new TemperatureEvent(6, "Device02", 30.0));
Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start")
.subtype(TemperatureEvent.class)
.where(new SimpleCondition<TemperatureEvent>() {
@Override
public boolean filter(TemperatureEvent subEvent) {
if (subEvent.getTemperature() >= 26.0) {
return true;
}
return false;
}
}).where(new SimpleCondition<TemperatureEvent>() {
@Override
public boolean filter(TemperatureEvent subEvent) {
if (subEvent.getMachineName().equals("Device02")) {
return true;
}
return false;
}
}).within(Time.seconds(10));
DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
.select(
new RichPatternSelectFunction<TemperatureEvent, Alert>() {
/**
*/
private static final long serialVersionUID = 1L;
@Override
public void open(Configuration parameters) throws Exception {
System.out.println(getRuntimeContext().getUserCodeClassLoader());
}
@Override
public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));
}
});
patternStream.print();
env.execute("CEP on Temperature Sensor");
TemperatureEvent 和 Alert類為Java bean,不再貼出它們的代碼
這段代碼在1.12版本之前執(zhí)行會輸出:
Alert{message='Temperature Rise Detected: [TemperatureEvent{id=1, machineName='Device02', temperature=27.0}] on machine name: [TemperatureEvent{id=1, machineName='Device02', temperature=27.0}]'}
Alert{message='Temperature Rise Detected: [TemperatureEvent{id=6, machineName='Device02', temperature=30.0}] on machine name: [TemperatureEvent{id=6, machineName='Device02', temperature=30.0}]'}
但是在Flink 1.12版本中運(yùn)行,patternStream.print()這一行沒有任何輸出。debug了CepOperator一直到StreamOneInputProcessor,發(fā)現(xiàn)均沒有元素輸入,甚是奇怪。
第一反應(yīng)可能是Time characteristic設(shè)置的問題。于是在創(chuàng)建env的后面添加了一行:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
然后發(fā)現(xiàn)這個(gè)調(diào)用在Flink 1.12版本中被廢棄了。原因是Flink 1.12中的流處理默認(rèn)的時(shí)間特征改為TimeCharacteristic.EventTime,也就是說從之前默認(rèn)的processing time改為了event time。這就是問題所在。對于默認(rèn)的event time,F(xiàn)link需要等待到10秒內(nèi)的元素都到齊(下一個(gè)元素的event time為10秒后),才會打印出結(jié)果。然而,10秒鐘之后的元素還沒有到來時(shí)Flink 程序就退出了,因此不會有任何輸出。
要解決這個(gè)問題,我們需要知道1.12版本中CEP設(shè)置時(shí)間特征為ProcessingTime的方式。嘗試使用前邊提到的配置方法無效。
如何在CEP處理流程中使用ProcessingTime呢?我們翻閱PatternStream的源代碼,發(fā)現(xiàn)有如下兩個(gè)方法:
/**
* Sets the time characteristic to processing time.
*/
public PatternStream<T> inProcessingTime() {
return new PatternStream<>(builder.inProcessingTime());
}
/**
* Sets the time characteristic to event time.
*/
public PatternStream<T> inEventTime() {
return new PatternStream<>(builder.inEventTime());
}
顯然,這兩個(gè)方法分別是用來設(shè)置ProcessingTime和EventTime的。將本篇開始的代碼修改,如下所示:
DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
.inProcessingTime() // 注意,這里是關(guān)鍵,顯式指定使用processing time
.select(
new RichPatternSelectFunction<TemperatureEvent, Alert>() {
/**
*/
private static final long serialVersionUID = 1L;
@Override
public void open(Configuration parameters) throws Exception {
System.out.println(getRuntimeContext().getUserCodeClassLoader());
}
@Override
public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));
}
});
再次運(yùn)行,發(fā)現(xiàn)打印出了預(yù)期結(jié)果。