Flink實戰(zhàn)—Flink SQL在Batch場景的Demo

最近工作會用到Flink SQL,周末學習了一下,寫個demo做記錄,全部代碼請參考Github.

基于的Flink版本是1.9.1,使用的是java8開發(fā)。

本例是Flink SQL在Batch場景下的應用,目標是從students、scores表中讀取學生的信息,計算班級平均分。

1. 準備數(shù)據(jù)

students.txt 保存學生信息:id,name,classname

1 張三 1班
2 李四 1班
3 王五 2班
4 趙六 2班
5 郭大寶 2班

scores.txt 保存成績:id,chinese,math,english

1 100 90 80
2 97 87 74
3 70 50 43
4 100 99 99
5 80 81 82

2. 創(chuàng)建工程

根據(jù)官網的提示,通過mvn創(chuàng)建flink項目

   $ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.9.0

創(chuàng)建后使用IDEA打開,項目結構如圖,把創(chuàng)建好的兩份數(shù)據(jù)保存在resources中.


1586602165374.jpg

編輯pom.xml,主要是引入一些flink的依賴:

<dependencies>
<!--flink core-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--flink-table-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--kafka-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>

3. 實現(xiàn)功能

創(chuàng)建SQLBatch的JAVA類,實現(xiàn)功能。

package com.cmbc.flink;
?
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
?
import static org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE;
?
?
public class SQLBatch {
    public static void main(String[] args) throws Exception {
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
?
        // read files
        DataSet<String> s_students = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/students.txt");
        DataSet<String> s_score = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/scores.txt");
?
        // prepare data
        DataSet<Tuple3<Integer, String, String>> students = s_students.map(new MapFunction<String, Tuple3<Integer, String, String>>() {
            @Override
            public Tuple3<Integer, String, String> map(String s) throws Exception {
                String[] line = s.split(" ");
                return new Tuple3<Integer, String, String>(Integer.valueOf(line[0]), line[1], line[2]);
            }
        });
?
        DataSet<Tuple4<Integer, Integer, Integer, Integer>> score = s_score.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
            @Override
            public Tuple4<Integer, Integer, Integer, Integer> map(String s) throws Exception {
                String[] line = s.split(" ");
                return new Tuple4<Integer, Integer, Integer, Integer>(Integer.valueOf(line[0]), Integer.valueOf(line[1]),
                        Integer.valueOf(line[2]), Integer.valueOf(line[3]));
            }
        });
?
        // join data
        DataSet<Tuple6<Integer, String, String, Integer, Integer, Integer>> data = students.join(score)
                .where(0)
                .equalTo(0)
                .projectFirst(0,1,2)
                .projectSecond(1,2,3);
?
?
        // register to a table
        tEnv.registerDataSet("Data", data, "id, name, classname, chinese, math, english");
?
?
        // do sql
        Table sqlQuery = tEnv.sqlQuery("SELECT classname, AVG(chinese) as avg_chinese, AVG(math) as avg_math, AVG(english) as avg_english, " +
                "AVG(chinese + math + english) as avg_total " +
                "FROM Data " +
                "GROUP BY classname " +
                "ORDER BY avg_total"
        );
?
        // to sink
        DataSet<Info> result = tEnv.toDataSet(sqlQuery, Info.class);
        result.writeAsText("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/info.txt", OVERWRITE);
        tEnv.execute("do flink sql demo in batch");
?
    }
?
    public static class Info {
        public String classname;
        public Integer avg_chinese;
        public Integer avg_math;
        public Integer avg_english;
        public Integer avg_total;
?
        public Info() {
        }
?
        public Info(String classname, Integer avg_chinese, Integer avg_math, Integer avg_english, Integer avg_total) {
            this.classname = classname;
            this.avg_chinese = avg_chinese;
            this.avg_math = avg_math;
            this.avg_english = avg_english;
            this.avg_total = avg_total;
        }
?
        @Override
        public String toString() {
            return
                    "classname=" + classname +
                    ", avg_chinese=" + avg_chinese +
                    ", avg_math=" + avg_math +
                    ", avg_english=" + avg_english +
                    ", avg_total=" + avg_total +
                    "";
        }
    }
}

功能比較簡單,簡單說一下:

  • 初始化flink env
  • 讀取文件數(shù)據(jù),這里讀取student.txt、scores.txt兩張表
  • 數(shù)據(jù)預處理,這里通過id字段將兩個表的數(shù)據(jù)join出dataset
  • 將dataset映射成table,并執(zhí)行sql
  • 數(shù)據(jù)保存

4. 運行和結果

  • 啟動flink on local的模式 ,在flink的安裝路徑下找到腳本start-cluster.sh
  • mvn打Jar包:mvn clean package,或者在idea里完成這一步,jar包位置在項目target路徑下
  • 執(zhí)行腳本:
flink run -c com.cmbc.flink.SQLBatch flinksql-1.0-SNAPSHOT.jar
  • 結果


    1586602913833.jpg

5. 參考

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容