好玩的大數(shù)據(jù)之51:flink編程實(shí)驗(yàn)1(利用maven構(gòu)造flink-quick-start程序,含java和scala版)

一、簡(jiǎn)介


本文利用Maven來(lái)構(gòu)造flink示例flink-quickstart-java和flink-quickstart-scala

maven的安裝和配置,請(qǐng)參見(jiàn)“好玩的大數(shù)據(jù):maven安裝(maven-3.6.3)”

二、獲取源程序


方法1:利用github獲取flink源碼

下載地址:https://github.com/

下載得到:flink-master.zip

搜索flink

github首頁(yè)

點(diǎn)apache/flink

apache/flink

點(diǎn)擊code=>Download ZIP

下載flink源碼

解壓縮:flink-master.zip

找到flink-quickstart子目錄

方法2:到flink官網(wǎng)獲取flink源碼

下載地址:https://flink.apache.org/

? ? ? ? 首頁(yè)=>download=>Apache Flink 1.11.1=>Apache Flink 1.11.1 Source Release=>https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.11.1/flink-1.11.1-src.tgz

下載得到:flink-1.11.1-src.tgz

對(duì)比從flink官網(wǎng)和github下載的源碼基本一樣

flink官網(wǎng)和github下載的源碼對(duì)比

解壓縮:flink-master.zip

找到flink-quickstart子目錄

flink-quickstart

flink-quickstart-java子目錄結(jié)構(gòu)

flink-quickstart-java子目錄結(jié)構(gòu)

flink-quickstart-scala子目錄結(jié)構(gòu)

flink-quickstart-scala子目錄結(jié)構(gòu)??

方法3.利用mvn生成程序框架

java版本

mvn archetype:generate \

? ? ? -DarchetypeGroupId=org.apache.flink? ? ? ? ? ? ? \

? ? ? -DarchetypeArtifactId=flink-quickstart-java? ? ? \

? ? ? -DarchetypeVersion=1.11.1

中間會(huì)問(wèn)以下問(wèn)題:可以隨便填

? ? groupId:Group1

? ? artifactId:Flink-QuickStart-Java

? ? version:默認(rèn),直接回車

? ? package:默認(rèn),直接回車

? ? Y::默認(rèn),直接回車

也可以用非交互模式,自動(dòng)執(zhí)行,不需要回答

mvn archetype:generate\

-DgroupId=Group1\

-DartifactId=Flink-QuickStart-Java\

-DarchetypeGroupId=org.apache.flink\

-DarchetypeArtifactId=flink-quickstart-java\

-DinteractiveMode=false\

-DarchetypeVersion=1.11.1


執(zhí)行完畢后會(huì)生成

Flink-QuickStart-Java

Flink-QuickStart-Java下的目錄結(jié)構(gòu)

Flink-QuickStart-Java下的目錄結(jié)構(gòu)


scala版本

mvn archetype:generate \

? ? ? -DarchetypeGroupId=org.apache.flink? ? ? ? ? ? ? \

? ? ? -DarchetypeArtifactId=flink-quickstart-scala? ? ? \

? ? ? -DarchetypeVersion=1.11.1

中間會(huì)問(wèn)以下問(wèn)題:可以隨便填

groupId:Group1

artifactId:Flink-QuickStart-Scala

version:默認(rèn),直接回車

package:默認(rèn),直接回車

Y::默認(rèn),直接回車

執(zhí)行完畢后會(huì)生成

也可以用非交互模式

mvn archetype:generate\

-DgroupId=Group1\

-DartifactId=Flink-QuickStart-Scala\

-DarchetypeGroupId=org.apache.flink\

-DarchetypeArtifactId=flink-quickstart-scala\

-DinteractiveMode=false\

-DarchetypeVersion=1.11.1

Flink-QuickStart-Scala

Flink-QuickStart-Scala下的目錄結(jié)構(gòu)

Flink-QuickStart-Scala下的目錄結(jié)構(gòu)?

三、在pom.xml中增加依賴(可選)


有些已經(jīng)有了就不必加了,這里只作為例子

#flink-java

<dependency>

? <groupId>org.apache.flink</groupId>

? <artifactId>flink-java</artifactId>

? <version>1.11.1</version>

? <scope>provided</scope>

</dependency>

#flink-streaming-java_2.11

<dependency>

? <groupId>org.apache.flink</groupId>

? <artifactId>flink-streaming-java_2.11</artifactId>

? <version>1.11.1</version>

? <scope>provided</scope>

</dependency>

#flink-scala_2.11

<dependency>

? <groupId>org.apache.flink</groupId>

? <artifactId>flink-scala_2.11</artifactId>

? <version>1.11.1</version>

? <scope>provided</scope>

</dependency>

#flink-streaming-scala_2.11

<dependency>

? <groupId>org.apache.flink</groupId>

? <artifactId>flink-streaming-scala_2.11</artifactId>

? <version>1.11.1</version>

? <scope>provided</scope>

</dependency>

#flink-streaming-scala_2.11

#flink-connector-kafka-0.10_2.11

<dependency>

? ? <groupId>org.apache.flink</groupId>

? ? <artifactId>flink-connector-kafka-0.10_2.11</artifactId>

? ? <version>1.11.1</version>

四、編譯、打包


#java版本

? ? ? ? cd Flink-QuickStart-Java

? ? ? ? mvn clean package

java版本

編譯打包生成的target目錄

編譯打包生成的target目錄

#scala版本

????????cd?Flink-QuickStart-Scala

????????mvn clean package

scala版本

編譯打包生成的target目錄

編譯打包生成的target目錄

五、提交作業(yè)


#java版本

flink run -m master:8081 -c?Group1.BatchJob?Flink-QuickStart-Java-1.0-SNAPSHOT.jar

出錯(cuò)了

出錯(cuò)了

#scala版本

flink run -m master:8081 -c Group1.BatchJob Flink-QuickStart-Scala-1.0-SNAPSHOT.jar

出錯(cuò)了

出錯(cuò)了


六、排錯(cuò)、再次提交作業(yè)


從flink.apache.org下載flink-1.11.1-src.tgz,(下載地址:https://www.apache.org/dyn/closer.lua/flink/flink-1.11.1/flink-1.11.1-src.tgz)

也可以從https://github.com/中搜flink,然后點(diǎn)擊apache/flink下載源碼

參考上面“獲取源碼的方法1和2”

將flink-1.11.1-src.tgz解壓縮后,

1.#java版本

? ? a)復(fù)制和修改文件

找到子目錄flink-1.11.1-src/flink-1.11.1/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount

copy文件

將其下的WordCount.java文件及util文件夾復(fù)制到

????????Flink-QuickStart-Java/src/main/java/Group1

? 修改WordCount.java

把WordCount.java中的

? ? 1.將package org.apache.flink.examples.java.wordcount改為

? ?????????package Group1;

? ? 2.將import org.apache.flink.examples.java.wordcount.util.WordCountData;

????????改為

????????????????import Group1.util.WordCountData;

? 修改util/WordCountData.java

????????把package org.apache.flink.examples.java.wordcount.util改為

? ? ? ? ? ? ? package Group1.util;? ? ? ? ? ??

????b)重新編譯和打包

? ? ? ? 回到Flink-QuickStart-Java目錄下,執(zhí)行:

? ? ? ? ? ? ? ? mvn clean package

重新編譯和打包

????c)提交作業(yè)

flink run -m master:8081 -c Group1.WordCount target/Flink-QuickStart-Java-1.0-SNAPSHOT.jar --input $FLINK_HOME/LICENSE --output /home/hadoop/tmp/wordcount-java-result001.txt

cat /home/hadoop/tmp/wordcount-java-result001.txt

成功了!

#scala版本

?a)復(fù)制和修改文件

找到子目錄flink-1.11.1-src/flink-1.11.1/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount

將WordCount.scala復(fù)制到Flink-QuickStart-Scala/src/main/scala/Group1

修改WordCount.scala

把WordCount.scala中的

? ? 1.將package org.apache.flink.examples.scala.wordcount改為

? ?????????package Group1

? ? 2.將import org.apache.flink.examples.java.wordcount.util.WordCountData;

????????改為

????????????????import Group1.util.WordCountData;

找到子目錄flink-1.11.1-src/flink-1.11.1/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount

將其下的util文件夾復(fù)制到

????????Flink-QuickStart-Scala/src/main/scala/Group1

? 修改util/WordCountData.java

????????把package org.apache.flink.examples.java.wordcount.util改為

? ? ? ? ? ? ? package Group1.util

????b)重新編譯和打包

? ? ? ? 回到Flink-QuickStart-Scala目錄下,執(zhí)行:

? ? ? ? ? ? ? ? mvn clean package

重新編譯和打包

c)提交作業(yè)

flink run -m master:8081 -c Group1.WordCount target/Flink-QuickStart-Scala-1.0-SNAPSHOT.jar --input $FLINK_HOME/LICENSE --output /home/hadoop/tmp/wordcount-java-result002.txt

cat /home/hadoop/tmp/wordcount-java-result002.txt?

也成功了!

也可以到hdfs里面試試身手

hadoop fs -put?$FLINK_HOME/LICENSE /mylab/mydata

hadoop fs -ls /mylab/mydata

#java版本

flink run -m master:8081 -c?Group1.WordCount target/Flink-QuickStart-Java-1.0-SNAPSHOT.jar --input?hdfs:///mylab/mydata/LICENSE --output hdfs:///mylab/mydata/wordcount-result001.txt

hadoop fs -ls /mylab/mydata

hadoop fs -cat /mylab/mydata/wordcount-result001.txt


#scala版本

flink run -m master:8081 -c?Group1.WordCount?target/Flink-QuickStart-Scala-1.0-SNAPSHOT.jar?--input?hdfs:///mylab/mydata/LICENSE --output hdfs:///mylab/mydata/wordcount-result002.txt

hadoop fs -ls /mylab/mydata

hadoop fs -cat /mylab/mydata/wordcount-result002.txt

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容