Hello Word 入門(mén)Flink開(kāi)發(fā)部署

Flink是一款非常適合做流批處理的計(jì)算框架,F(xiàn)link1.10.0 更是完美整合了阿里的BLink、支持了yarn模式下的跨Task資源共享,并強(qiáng)化了對(duì)hive的支持,下面我們通過(guò)一個(gè)簡(jiǎn)單的例子了解一下Flink的環(huán)境開(kāi)發(fā)

Maven 創(chuàng)建項(xiàng)目

Flink 支持 Maven 直接構(gòu)建模版項(xiàng)目,你在終端使用該命令:

mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.10.0

在執(zhí)行的過(guò)程中它會(huì)提示你輸入 groupId、artifactId、和 package 名,你按照要求輸入就行,最后就可以成功創(chuàng)建一個(gè)項(xiàng)目。

image.png

進(jìn)入到目錄你就可以看到已經(jīng)創(chuàng)建了項(xiàng)目,里面結(jié)構(gòu)如下:

[root@10-9-12-255 eqxiu-flink]# tree
.
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── eqxiu
        │           ├── BatchJob.java
        │           └── StreamingJob.java
        └── resources
            └── log4j.properties

6 directories, 4 files


該項(xiàng)目中包含了兩個(gè)類(lèi) BatchJob 和 StreamingJob,另外還有一個(gè) log4j.properties 配置文件,然后你就可以將該項(xiàng)目導(dǎo)入到 IDEA 了。

你可以在該目錄下執(zhí)行 mvn clean package 就可以編譯該項(xiàng)目,編譯成功后在 target 目錄下會(huì)生成一個(gè) Job 的 Jar 包,但是這個(gè) Job 還不能執(zhí)行,因?yàn)?StreamingJob 這個(gè)類(lèi)中的 main 方法里面只是簡(jiǎn)單的創(chuàng)建了 StreamExecutionEnvironment 環(huán)境,然后就執(zhí)行 execute 方法,這在 Flink 中是不算一個(gè)可執(zhí)行的 Job 的,因此如果你提交到 Flink UI 上也是會(huì)報(bào)錯(cuò)的。

上傳 Jar:

image.png

運(yùn)行報(bào)錯(cuò):

image.png
Server Response Message:
Internal server error.

我們查看 Flink Job Manager 的日志可以看到:

image.png
2020-03-27 14:36:30,150 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute.

因?yàn)?execute 方法之前我們是需要補(bǔ)充我們 Job 的一些算子操作的,所以報(bào)錯(cuò)還是很正常的,本文下面將會(huì)提供完整代碼。

IDEA 創(chuàng)建項(xiàng)目

一般我們項(xiàng)目可能是由多個(gè) Job 組成,并且代碼也都是在同一個(gè)工程下面進(jìn)行管理,上面那種適合單個(gè) Job 執(zhí)行,但如果多人合作的時(shí)候還是得在同一個(gè)工程下面進(jìn)行項(xiàng)目的創(chuàng)建,每個(gè) Flink Job 一個(gè) module,下面我們將來(lái)講解下如何利用 IDEA 創(chuàng)建 Flink 項(xiàng)目。

接下來(lái)我們需要在父工程的 pom.xml 中加入如下屬性(含編碼、Flink 版本、JDK 版本、Scala 版本、Maven 編譯版本):

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!--Flink 版本-->
    <flink.version>1.10.0</flink.version>
    <!--JDK 版本-->
    <java.version>1.8</java.version>
    <!--Scala 2.11 版本-->
    <scala.binary.version>2.11</scala.binary.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

然后加入依賴(lài):

<dependencies>
    <!-- Apache Flink dependencies -->
    <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>

    <!-- Add logging framework, to produce console output when running in the IDE. -->
    <!-- These dependencies are excluded from the application JAR by default. -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>
</dependencies>

上面依賴(lài)中 flink-java 和 flink-streaming-java 是我們 Flink 必備的核心依賴(lài),為什么設(shè)置 scope 為 provided 呢(默認(rèn)是 compile)?

是因?yàn)?Flink 其實(shí)在自己的安裝目錄中 lib 文件夾里的 lib/flink-dist_2.11-1.10.0.jar 已經(jīng)包含了這些必備的 Jar 了,所以我們?cè)诮o自己的 Flink Job 添加依賴(lài)的時(shí)候最后打成的 Jar 包可不希望又將這些重復(fù)的依賴(lài)打進(jìn)去。有兩個(gè)好處:

  • 減小了我們打的 Flink Job Jar 包容量大小
  • 不會(huì)因?yàn)榇蛉氩煌姹镜?Flink 核心依賴(lài)而導(dǎo)致類(lèi)加載沖突等問(wèn)題

但是問(wèn)題又來(lái)了,我們需要在 IDEA 中調(diào)試運(yùn)行我們的 Job,如果將 scope 設(shè)置為 provided 的話,是會(huì)報(bào)錯(cuò)的:

Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/ExecutionConfig$GlobalJobParameters
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    at java.lang.Class.getMethod0(Class.java:3018)
    at java.lang.Class.getMethod(Class.java:1784)
    at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
    at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more

默認(rèn) scope 為 compile 的話,本地調(diào)試的話就不會(huì)出錯(cuò)了。

測(cè)試發(fā)現(xiàn):當(dāng) scope 為 provided 時(shí) Jar 包才 7.5k,而為 compile 時(shí) Jar 包就 45M 了,你要想想這才只是一個(gè)簡(jiǎn)單的 WordCount 程序呢,差別就這么大。當(dāng)我們把 Flink Job 打成一個(gè) fat Jar 時(shí),上傳到 UI 的時(shí)間就能夠很明顯的對(duì)比出來(lái)(Jar 包越小上傳的時(shí)間越短),所以把 scope 設(shè)置為 provided 還是很有必要的。

有人就會(huì)想了,那這不是和上面有沖突了嗎?假如我既想打出來(lái)的 Jar 包要小,又想能夠在本地 IDEA 中進(jìn)行運(yùn)行和調(diào)試 Job ?這里我提供一種方法:在父工程中的 pom.xml 引入如下 profiles。

<profiles>
    <profile>
        <id>add-dependencies-for-IDEA</id>

        <activation>
            <property>
                <name>idea.version</name>
            </property>
        </activation>

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>compile</scope>
            </dependency>
        </dependencies>
    </profile>
</profiles>

當(dāng)你在 IDEA 中運(yùn)行 Job 的時(shí)候,它會(huì)給你引入 flink-java、flink-streaming-java,且 scope 設(shè)置為 compile,但是你是打成 Jar 包的時(shí)候它又不起作用。如果你加了這個(gè) profile 還是報(bào)錯(cuò)的話,那么可能是 IDEA 中沒(méi)有識(shí)別到,你可以在 IDEA 的中查看下面兩個(gè)配置確定一下(配置其中一個(gè)即可以起作用)。

1、查看 Maven 中的該 profile 是否已經(jīng)默認(rèn)勾選上了,如果沒(méi)有勾選上,則手動(dòng)勾選一下才會(huì)起作用

image

2、Include dependencies with "Provided" scope 是否勾選,如果未勾選,則手動(dòng)勾選后才起作用

image.png

流計(jì)算 WordCount 應(yīng)用程序代碼

回到正題,利用 IDEA 創(chuàng)建好 WordCount 應(yīng)用后,我們開(kāi)始編寫(xiě)代碼。

Main 類(lèi)

public class Main {
    public static void main(String[] args) throws Exception {
        //創(chuàng)建流運(yùn)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
        env.fromElements(WORDS)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] splits = value.toLowerCase().split("\\W+");

                        for (String split : splits) {
                            if (split.length() > 0) {
                                out.collect(new Tuple2<>(split, 1));
                            }
                        }
                    }
                })
                .keyBy(0)
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                        return new Tuple2<>(value1.f0, value1.f1 + value1.f1);
                    }
                })
                .print();
        //Streaming 程序必須加這個(gè)才能啟動(dòng)程序,否則不會(huì)有結(jié)果
        env.execute("## word count streaming demo");
    }

    private static final String[] WORDS = new String[]{
            "To be, or not to be,--that is the question:--",
            "Whether 'tis nobler in the mind to suffer"
    };
}

pom.xml 文件中引入 build 插件并且要替換成你自己項(xiàng)目里面的 mainClass:

<build>
    <plugins>
        <!-- Java Compiler -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>${java.version}</source>
                <target>${java.version}</target>
            </configuration>
        </plugin>

        <!-- 使用 maven-shade 插件創(chuàng)建一個(gè)包含所有必要的依賴(lài)項(xiàng)的 fat Jar -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>org.apache.flink:force-shading</exclude>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <!--注意:這里一定要換成你自己的 Job main 方法的啟動(dòng)類(lèi)-->
                                <mainClass>com.eqxiu.StreamingJob</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

注意:上面這個(gè) build 插件要記得加,否則打出來(lái)的 jar 包是不完整的,提交運(yùn)行會(huì)報(bào) ClassNotFoundException,該問(wèn)題是初學(xué)者很容易遇到的問(wèn)題,很多人咨詢(xún)過(guò)筆者這個(gè)問(wèn)題。

WordCount 應(yīng)用程序運(yùn)行

本地 IDE 運(yùn)行

編譯好 WordCount 程序后,我們?cè)?IDEA 中右鍵 run main 方法就可以把 Job 運(yùn)行起來(lái),結(jié)果如下圖:

image.png

圖中的就是將每個(gè) word 和對(duì)應(yīng)的個(gè)數(shù)一行一行打印出來(lái),在本地 IDEA 中運(yùn)行沒(méi)有問(wèn)題,我們接下來(lái)使用命令 mvn clean package 打包成一個(gè) Jar (eqxiu-flink-1.0-SNAPSHOT.jar) 然后將其上傳到 Flink UI 上運(yùn)行一下看下效果。

UI 運(yùn)行 Job

http://localhost:8081/#/submit 頁(yè)面上傳 eqxiu-flink-1.0-SNAPSHOT.jar 后,然后點(diǎn)擊 Submit 后就可以運(yùn)行了。

運(yùn)行 Job 的 UI 如下:

image.png

Job 的結(jié)果在 Task Manager 的 Stdout 中:

image.png

WordCount 應(yīng)用程序代碼分析

我們已經(jīng)將 WordCount 程序代碼寫(xiě)好了并且也在 IDEA 中和 Flink UI 上運(yùn)行了 Job,并且程序運(yùn)行的結(jié)果都是正常的。

那么我們來(lái)分析一下這個(gè) WordCount 程序代碼:

1、創(chuàng)建好 StreamExecutionEnvironment(流程序的運(yùn)行環(huán)境)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2、給流程序的運(yùn)行環(huán)境設(shè)置全局的配置(從參數(shù) args 獲取)

env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));

3、構(gòu)建數(shù)據(jù)源,WORDS 是個(gè)字符串?dāng)?shù)組

env.fromElements(WORDS)

4、將字符串進(jìn)行分隔然后收集,組裝后的數(shù)據(jù)格式是 (word、1),1 代表 word 出現(xiàn)的次數(shù)為 1

flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] splits = value.toLowerCase().split("\\W+");

        for (String split : splits) {
            if (split.length() > 0) {
                out.collect(new Tuple2<>(split, 1));
            }
        }
    }
})

5、根據(jù) word 關(guān)鍵字進(jìn)行分組(0 代表對(duì)第一個(gè)字段分組,也就是對(duì) word 進(jìn)行分組)

keyBy(0)

6、對(duì)單個(gè) word 進(jìn)行計(jì)數(shù)操作

reduce(new ReduceFunction<Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
    }
})

7、打印所有的數(shù)據(jù)流,格式是 (word,count),count 代表 word 出現(xiàn)的次數(shù)

print()

8、開(kāi)始執(zhí)行 Job

env.execute("## word count streaming demo");
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容