1、導(dǎo)入依賴
<!-- 使用table api 引入的依賴,使用橋接器和底層datastream api連接支持-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--如果需要在本地運(yùn)行table api和sql 還需要引入一下依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--如果想實(shí)現(xiàn)自定義的數(shù)據(jù)格式來做序列化,需要引入一下依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!--連接外部數(shù)據(jù)格式解析,采用csv方式來解析-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
1.1、從文件中輸入
路徑:input/clicks.txt
Bob,./test/111,1000
Bob,./test/222,1000
Bob,./test/333,1000
Bob,./test/444,1000
2、表轉(zhuǎn)流輸出
package com.flinktest.wc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class CommApiTest3 {
public static void main(String[] args) throws Exception{
// 創(chuàng)建執(zhí)行環(huán)境的兩種方式,流方式 & 表方式
// 1 創(chuàng)建執(zhí)行環(huán)境(流方式創(chuàng)建)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2 創(chuàng)建執(zhí)行環(huán)境(表方式創(chuàng)建) 基于alibaba 的 blink planner實(shí)現(xiàn)
// EnvironmentSettings settings = EnvironmentSettings.newInstance()
// .inStreamingMode()
// .useBlinkPlanner()
// .build();
// TableEnvironment tableEnv = TableEnvironment.create(settings);
// 3 創(chuàng)建一張連接器表(輸入表)
String createInDDL = "CREATE TABLE clickTable (" +
"user_name STRING, " +
"url STRING, " +
"ts BIGINT " +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'input/clicks.txt'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createInDDL);
// 執(zhí)行聚合統(tǒng)計(jì)查詢轉(zhuǎn)換
Table eggResult = tableEnv.sqlQuery("select user_name,COUNT(url) as cnt from clickTable group by user_name");
// table 轉(zhuǎn)流輸出,聚合統(tǒng)計(jì)是動(dòng)態(tài)表,所以使用Changelog的方式才能輸出
tableEnv.toChangelogStream(eggResult).print("egg");
env.execute();
}
}