最近工作會用到Flink SQL,周末學習了一下,寫個demo做記錄,全部代碼請參考Github.
基于的Flink版本是1.9.1,使用的是java8開發(fā)。
本例是Flink SQL在Batch場景下的應用,目標是從students、scores表中讀取學生的信息,計算班級平均分。
1. 準備數(shù)據(jù)
students.txt 保存學生信息:id,name,classname
1 張三 1班
2 李四 1班
3 王五 2班
4 趙六 2班
5 郭大寶 2班
scores.txt 保存成績:id,chinese,math,english
1 100 90 80
2 97 87 74
3 70 50 43
4 100 99 99
5 80 81 82
2. 創(chuàng)建工程
根據(jù)官網的提示,通過mvn創(chuàng)建flink項目
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0
創(chuàng)建后使用IDEA打開,項目結構如圖,把創(chuàng)建好的兩份數(shù)據(jù)保存在resources中.

1586602165374.jpg
編輯pom.xml,主要是引入一些flink的依賴:
<dependencies>
<!--flink core-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink-table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
3. 實現(xiàn)功能
創(chuàng)建SQLBatch的JAVA類,實現(xiàn)功能。
package com.cmbc.flink;
?
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
?
import static org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE;
?
?
public class SQLBatch {
public static void main(String[] args) throws Exception {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
?
// read files
DataSet<String> s_students = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/students.txt");
DataSet<String> s_score = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/scores.txt");
?
// prepare data
DataSet<Tuple3<Integer, String, String>> students = s_students.map(new MapFunction<String, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> map(String s) throws Exception {
String[] line = s.split(" ");
return new Tuple3<Integer, String, String>(Integer.valueOf(line[0]), line[1], line[2]);
}
});
?
DataSet<Tuple4<Integer, Integer, Integer, Integer>> score = s_score.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
@Override
public Tuple4<Integer, Integer, Integer, Integer> map(String s) throws Exception {
String[] line = s.split(" ");
return new Tuple4<Integer, Integer, Integer, Integer>(Integer.valueOf(line[0]), Integer.valueOf(line[1]),
Integer.valueOf(line[2]), Integer.valueOf(line[3]));
}
});
?
// join data
DataSet<Tuple6<Integer, String, String, Integer, Integer, Integer>> data = students.join(score)
.where(0)
.equalTo(0)
.projectFirst(0,1,2)
.projectSecond(1,2,3);
?
?
// register to a table
tEnv.registerDataSet("Data", data, "id, name, classname, chinese, math, english");
?
?
// do sql
Table sqlQuery = tEnv.sqlQuery("SELECT classname, AVG(chinese) as avg_chinese, AVG(math) as avg_math, AVG(english) as avg_english, " +
"AVG(chinese + math + english) as avg_total " +
"FROM Data " +
"GROUP BY classname " +
"ORDER BY avg_total"
);
?
// to sink
DataSet<Info> result = tEnv.toDataSet(sqlQuery, Info.class);
result.writeAsText("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/info.txt", OVERWRITE);
tEnv.execute("do flink sql demo in batch");
?
}
?
public static class Info {
public String classname;
public Integer avg_chinese;
public Integer avg_math;
public Integer avg_english;
public Integer avg_total;
?
public Info() {
}
?
public Info(String classname, Integer avg_chinese, Integer avg_math, Integer avg_english, Integer avg_total) {
this.classname = classname;
this.avg_chinese = avg_chinese;
this.avg_math = avg_math;
this.avg_english = avg_english;
this.avg_total = avg_total;
}
?
@Override
public String toString() {
return
"classname=" + classname +
", avg_chinese=" + avg_chinese +
", avg_math=" + avg_math +
", avg_english=" + avg_english +
", avg_total=" + avg_total +
"";
}
}
}
功能比較簡單,簡單說一下:
- 初始化flink env
- 讀取文件數(shù)據(jù),這里讀取student.txt、scores.txt兩張表
- 數(shù)據(jù)預處理,這里通過id字段將兩個表的數(shù)據(jù)join出dataset
- 將dataset映射成table,并執(zhí)行sql
- 數(shù)據(jù)保存
4. 運行和結果
- 啟動flink on local的模式 ,在flink的安裝路徑下找到腳本start-cluster.sh
- mvn打Jar包:mvn clean package,或者在idea里完成這一步,jar包位置在項目target路徑下
- 執(zhí)行腳本:
flink run -c com.cmbc.flink.SQLBatch flinksql-1.0-SNAPSHOT.jar
-
結果
1586602913833.jpg
