一、簡(jiǎn)介
本文利用Maven來(lái)構(gòu)造flink示例flink-quickstart-java和flink-quickstart-scala
maven的安裝和配置,請(qǐng)參見(jiàn)“好玩的大數(shù)據(jù):maven安裝(maven-3.6.3)”
二、獲取源程序
方法1:利用github獲取flink源碼
下載地址:https://github.com/
下載得到:flink-master.zip
搜索flink

點(diǎn)apache/flink

點(diǎn)擊code=>Download ZIP

解壓縮:flink-master.zip
找到flink-quickstart子目錄
方法2:到flink官網(wǎng)獲取flink源碼
下載地址:https://flink.apache.org/
? ? ? ? 首頁(yè)=>download=>Apache Flink 1.11.1=>Apache Flink 1.11.1 Source Release=>https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.11.1/flink-1.11.1-src.tgz
下載得到:flink-1.11.1-src.tgz
對(duì)比從flink官網(wǎng)和github下載的源碼基本一樣

解壓縮:flink-master.zip
找到flink-quickstart子目錄

flink-quickstart-java子目錄結(jié)構(gòu)

flink-quickstart-scala子目錄結(jié)構(gòu)

方法3.利用mvn生成程序框架
java版本
mvn archetype:generate \
? ? ? -DarchetypeGroupId=org.apache.flink? ? ? ? ? ? ? \
? ? ? -DarchetypeArtifactId=flink-quickstart-java? ? ? \
? ? ? -DarchetypeVersion=1.11.1
中間會(huì)問(wèn)以下問(wèn)題:可以隨便填
? ? groupId:Group1
? ? artifactId:Flink-QuickStart-Java
? ? version:默認(rèn),直接回車
? ? package:默認(rèn),直接回車
? ? Y::默認(rèn),直接回車
也可以用非交互模式,自動(dòng)執(zhí)行,不需要回答
mvn archetype:generate\
-DgroupId=Group1\
-DartifactId=Flink-QuickStart-Java\
-DarchetypeGroupId=org.apache.flink\
-DarchetypeArtifactId=flink-quickstart-java\
-DinteractiveMode=false\
-DarchetypeVersion=1.11.1
執(zhí)行完畢后會(huì)生成

Flink-QuickStart-Java下的目錄結(jié)構(gòu)

scala版本
mvn archetype:generate \
? ? ? -DarchetypeGroupId=org.apache.flink? ? ? ? ? ? ? \
? ? ? -DarchetypeArtifactId=flink-quickstart-scala? ? ? \
? ? ? -DarchetypeVersion=1.11.1
中間會(huì)問(wèn)以下問(wèn)題:可以隨便填
groupId:Group1
artifactId:Flink-QuickStart-Scala
version:默認(rèn),直接回車
package:默認(rèn),直接回車
Y::默認(rèn),直接回車
執(zhí)行完畢后會(huì)生成
也可以用非交互模式
mvn archetype:generate\
-DgroupId=Group1\
-DartifactId=Flink-QuickStart-Scala\
-DarchetypeGroupId=org.apache.flink\
-DarchetypeArtifactId=flink-quickstart-scala\
-DinteractiveMode=false\
-DarchetypeVersion=1.11.1

Flink-QuickStart-Scala下的目錄結(jié)構(gòu)

三、在pom.xml中增加依賴(可選)
有些已經(jīng)有了就不必加了,這里只作為例子
#flink-java
<dependency>
? <groupId>org.apache.flink</groupId>
? <artifactId>flink-java</artifactId>
? <version>1.11.1</version>
? <scope>provided</scope>
</dependency>
#flink-streaming-java_2.11
<dependency>
? <groupId>org.apache.flink</groupId>
? <artifactId>flink-streaming-java_2.11</artifactId>
? <version>1.11.1</version>
? <scope>provided</scope>
</dependency>
#flink-scala_2.11
<dependency>
? <groupId>org.apache.flink</groupId>
? <artifactId>flink-scala_2.11</artifactId>
? <version>1.11.1</version>
? <scope>provided</scope>
</dependency>
#flink-streaming-scala_2.11
<dependency>
? <groupId>org.apache.flink</groupId>
? <artifactId>flink-streaming-scala_2.11</artifactId>
? <version>1.11.1</version>
? <scope>provided</scope>
</dependency>
#flink-streaming-scala_2.11
#flink-connector-kafka-0.10_2.11
<dependency>
? ? <groupId>org.apache.flink</groupId>
? ? <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
? ? <version>1.11.1</version>
四、編譯、打包
#java版本
? ? ? ? cd Flink-QuickStart-Java
? ? ? ? mvn clean package

編譯打包生成的target目錄

#scala版本
????????cd?Flink-QuickStart-Scala
????????mvn clean package

編譯打包生成的target目錄

五、提交作業(yè)
#java版本
flink run -m master:8081 -c?Group1.BatchJob?Flink-QuickStart-Java-1.0-SNAPSHOT.jar
出錯(cuò)了

#scala版本
flink run -m master:8081 -c Group1.BatchJob Flink-QuickStart-Scala-1.0-SNAPSHOT.jar
出錯(cuò)了

六、排錯(cuò)、再次提交作業(yè)
從flink.apache.org下載flink-1.11.1-src.tgz,(下載地址:https://www.apache.org/dyn/closer.lua/flink/flink-1.11.1/flink-1.11.1-src.tgz)
也可以從https://github.com/中搜flink,然后點(diǎn)擊apache/flink下載源碼
參考上面“獲取源碼的方法1和2”
將flink-1.11.1-src.tgz解壓縮后,
1.#java版本
? ? a)復(fù)制和修改文件
找到子目錄flink-1.11.1-src/flink-1.11.1/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount

將其下的WordCount.java文件及util文件夾復(fù)制到
????????Flink-QuickStart-Java/src/main/java/Group1
? 修改WordCount.java
把WordCount.java中的
? ? 1.將package org.apache.flink.examples.java.wordcount改為
? ?????????package Group1;
? ? 2.將import org.apache.flink.examples.java.wordcount.util.WordCountData;
????????改為
????????????????import Group1.util.WordCountData;
? 修改util/WordCountData.java
????????把package org.apache.flink.examples.java.wordcount.util改為
? ? ? ? ? ? ? package Group1.util;? ? ? ? ? ??
????b)重新編譯和打包
? ? ? ? 回到Flink-QuickStart-Java目錄下,執(zhí)行:
? ? ? ? ? ? ? ? mvn clean package

????c)提交作業(yè)
flink run -m master:8081 -c Group1.WordCount target/Flink-QuickStart-Java-1.0-SNAPSHOT.jar --input $FLINK_HOME/LICENSE --output /home/hadoop/tmp/wordcount-java-result001.txt
cat /home/hadoop/tmp/wordcount-java-result001.txt

#scala版本
?a)復(fù)制和修改文件
找到子目錄flink-1.11.1-src/flink-1.11.1/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount
將WordCount.scala復(fù)制到Flink-QuickStart-Scala/src/main/scala/Group1
修改WordCount.scala
把WordCount.scala中的
? ? 1.將package org.apache.flink.examples.scala.wordcount改為
? ?????????package Group1
? ? 2.將import org.apache.flink.examples.java.wordcount.util.WordCountData;
????????改為
????????????????import Group1.util.WordCountData;
找到子目錄flink-1.11.1-src/flink-1.11.1/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount
將其下的util文件夾復(fù)制到
????????Flink-QuickStart-Scala/src/main/scala/Group1
? 修改util/WordCountData.java
????????把package org.apache.flink.examples.java.wordcount.util改為
? ? ? ? ? ? ? package Group1.util
????b)重新編譯和打包
? ? ? ? 回到Flink-QuickStart-Scala目錄下,執(zhí)行:
? ? ? ? ? ? ? ? mvn clean package

c)提交作業(yè)
flink run -m master:8081 -c Group1.WordCount target/Flink-QuickStart-Scala-1.0-SNAPSHOT.jar --input $FLINK_HOME/LICENSE --output /home/hadoop/tmp/wordcount-java-result002.txt
cat /home/hadoop/tmp/wordcount-java-result002.txt?

也可以到hdfs里面試試身手
hadoop fs -put?$FLINK_HOME/LICENSE /mylab/mydata
hadoop fs -ls /mylab/mydata
#java版本
flink run -m master:8081 -c?Group1.WordCount target/Flink-QuickStart-Java-1.0-SNAPSHOT.jar --input?hdfs:///mylab/mydata/LICENSE --output hdfs:///mylab/mydata/wordcount-result001.txt
hadoop fs -ls /mylab/mydata
hadoop fs -cat /mylab/mydata/wordcount-result001.txt

#scala版本
flink run -m master:8081 -c?Group1.WordCount?target/Flink-QuickStart-Scala-1.0-SNAPSHOT.jar?--input?hdfs:///mylab/mydata/LICENSE --output hdfs:///mylab/mydata/wordcount-result002.txt
hadoop fs -ls /mylab/mydata
hadoop fs -cat /mylab/mydata/wordcount-result002.txt
