Flink 升級1.12版本的坑

Flink 使用介紹相關(guān)文檔目錄

Flink 使用介紹相關(guān)文檔目錄

Job提交出現(xiàn)異常:No ExecutorFactory found to execute the application

詳細(xì)的報(bào)錯(cuò)如下所示:

Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
    at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1931)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1836)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
    at com.paultech.CEPProblem.main(CEPProblem.java:73)

原因分析:

經(jīng)過排查,發(fā)現(xiàn)是從Flink 1.11開始,flink-streaming-java中的flink-clients依賴被移除。運(yùn)行的時(shí)候需要添加這個(gè)依賴,如下所示:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

其中scala.binary.version和flink.version屬性修改為實(shí)際項(xiàng)目中使用的版本

StackOverflow中相關(guān)問題鏈接:https://stackoverflow.com/questions/63032060/upgraded-flink-from-1-10-to-1-11-met-error-no-executorfactory-found-to-execute

Flink CEP 作業(yè)執(zhí)行結(jié)果異常

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TemperatureEvent> input = env.fromElements(new TemperatureEvent(1, "Device01", 22.0),
        new TemperatureEvent(1, "Device01", 27.1), new TemperatureEvent(2, "Device01", 28.1),
        new TemperatureEvent(1, "Device01", 22.2), new TemperatureEvent(3, "Device01", 22.1),
        new TemperatureEvent(1, "Device02", 22.3), new TemperatureEvent(4, "Device02", 22.1),
        new TemperatureEvent(1, "Device02", 22.4), new TemperatureEvent(5, "Device02", 22.7),
        new TemperatureEvent(1, "Device02", 27.0), new TemperatureEvent(6, "Device02", 30.0));


Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start")
        .subtype(TemperatureEvent.class)
        .where(new SimpleCondition<TemperatureEvent>() {
            @Override
            public boolean filter(TemperatureEvent subEvent) {
                if (subEvent.getTemperature() >= 26.0) {
                    return true;
                }
                return false;
            }
        }).where(new SimpleCondition<TemperatureEvent>() {
            @Override
            public boolean filter(TemperatureEvent subEvent) {
                if (subEvent.getMachineName().equals("Device02")) {
                    return true;
                }
                return false;
            }
        }).within(Time.seconds(10));

DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
        .select(
                new RichPatternSelectFunction<TemperatureEvent, Alert>() {
                    /**

                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        System.out.println(getRuntimeContext().getUserCodeClassLoader());
                    }

                    @Override
                    public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
                        return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));
                    }
                });

patternStream.print();

env.execute("CEP on Temperature Sensor");

TemperatureEvent 和 Alert類為Java bean,不再貼出它們的代碼

這段代碼在1.12版本之前執(zhí)行會輸出:

Alert{message='Temperature Rise Detected: [TemperatureEvent{id=1, machineName='Device02', temperature=27.0}] on machine name: [TemperatureEvent{id=1, machineName='Device02', temperature=27.0}]'}

Alert{message='Temperature Rise Detected: [TemperatureEvent{id=6, machineName='Device02', temperature=30.0}] on machine name: [TemperatureEvent{id=6, machineName='Device02', temperature=30.0}]'}

但是在Flink 1.12版本中運(yùn)行,patternStream.print()這一行沒有任何輸出。debug了CepOperator一直到StreamOneInputProcessor,發(fā)現(xiàn)均沒有元素輸入,甚是奇怪。

第一反應(yīng)可能是Time characteristic設(shè)置的問題。于是在創(chuàng)建env的后面添加了一行:

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

然后發(fā)現(xiàn)這個(gè)調(diào)用在Flink 1.12版本中被廢棄了。原因是Flink 1.12中的流處理默認(rèn)的時(shí)間特征改為TimeCharacteristic.EventTime,也就是說從之前默認(rèn)的processing time改為了event time。這就是問題所在。對于默認(rèn)的event time,F(xiàn)link需要等待到10秒內(nèi)的元素都到齊(下一個(gè)元素的event time為10秒后),才會打印出結(jié)果。然而,10秒鐘之后的元素還沒有到來時(shí)Flink 程序就退出了,因此不會有任何輸出。

要解決這個(gè)問題,我們需要知道1.12版本中CEP設(shè)置時(shí)間特征為ProcessingTime的方式。嘗試使用前邊提到的配置方法無效。

如何在CEP處理流程中使用ProcessingTime呢?我們翻閱PatternStream的源代碼,發(fā)現(xiàn)有如下兩個(gè)方法:

/**
 * Sets the time characteristic to processing time.
 */
public PatternStream<T> inProcessingTime() {
    return new PatternStream<>(builder.inProcessingTime());
}

/**
 * Sets the time characteristic to event time.
 */
public PatternStream<T> inEventTime() {
    return new PatternStream<>(builder.inEventTime());
}

顯然,這兩個(gè)方法分別是用來設(shè)置ProcessingTime和EventTime的。將本篇開始的代碼修改,如下所示:

DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
        .inProcessingTime() // 注意,這里是關(guān)鍵,顯式指定使用processing time
        .select(
                new RichPatternSelectFunction<TemperatureEvent, Alert>() {
                    /**

                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        System.out.println(getRuntimeContext().getUserCodeClassLoader());
                    }

                    @Override
                    public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
                        return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));
                    }
                });

再次運(yùn)行,發(fā)現(xiàn)打印出了預(yù)期結(jié)果。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容