Flink-1.13.0 Table Api & SQL Java Demo4(表轉(zhuǎn)流輸出)

1、導(dǎo)入依賴

      <!-- 使用table api 引入的依賴,使用橋接器和底層datastream api連接支持-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--如果需要在本地運(yùn)行table api和sql 還需要引入一下依賴-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--如果想實(shí)現(xiàn)自定義的數(shù)據(jù)格式來做序列化,需要引入一下依賴-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--連接外部數(shù)據(jù)格式解析,采用csv方式來解析-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

1.1、從文件中輸入

路徑:input/clicks.txt

Bob,./test/111,1000
Bob,./test/222,1000
Bob,./test/333,1000
Bob,./test/444,1000

2、表轉(zhuǎn)流輸出

package com.flinktest.wc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class CommApiTest3 {
    public static void main(String[] args) throws Exception{
        // 創(chuàng)建執(zhí)行環(huán)境的兩種方式,流方式 & 表方式
        // 1 創(chuàng)建執(zhí)行環(huán)境(流方式創(chuàng)建)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2 創(chuàng)建執(zhí)行環(huán)境(表方式創(chuàng)建) 基于alibaba 的 blink planner實(shí)現(xiàn)
        //        EnvironmentSettings settings = EnvironmentSettings.newInstance()
        //                .inStreamingMode()
        //                .useBlinkPlanner()
        //                .build();
        //        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 3 創(chuàng)建一張連接器表(輸入表)
        String createInDDL = "CREATE TABLE clickTable (" +
                "user_name STRING, " +
                "url STRING, " +
                "ts BIGINT " +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createInDDL);
        // 執(zhí)行聚合統(tǒng)計(jì)查詢轉(zhuǎn)換
        Table eggResult = tableEnv.sqlQuery("select user_name,COUNT(url) as cnt from clickTable group by user_name");
        // table 轉(zhuǎn)流輸出,聚合統(tǒng)計(jì)是動(dòng)態(tài)表,所以使用Changelog的方式才能輸出
        tableEnv.toChangelogStream(eggResult).print("egg");
        env.execute();
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 今日體驗(yàn),客戶服務(wù)方案,標(biāo)準(zhǔn)流程,必須要固化。 核心,標(biāo)準(zhǔn),規(guī)范,夾帶著人情味。 用,任何關(guān)系發(fā)展到最后,就是客情...
    王海博閱讀 117評(píng)論 0 0
  • 沒有父親的父親節(jié) 今天是父親節(jié),看到許多寫父親的文章,還看到群里各種發(fā)紅包,我的心里空落落的。 這是我度過的第一個(gè)...
    2bbfcf735db9閱讀 119評(píng)論 0 0
  • 今日知識(shí)點(diǎn): 1--糖脂轉(zhuǎn)換點(diǎn); 2--登山機(jī)如何使用; 3--團(tuán)體課程,登云梯; ———————————————...
    耳東草西閱讀 262評(píng)論 0 2
  • 1.項(xiàng)目概述 1.1項(xiàng)目背景隨著手機(jī)和互聯(lián)網(wǎng)的普及,物聯(lián)網(wǎng)技術(shù)的不斷發(fā)展,人們生活水平的提高對(duì)生活質(zhì)量的要求也越來...
    明雨均閱讀 165評(píng)論 0 0
  • (2022.06.22 Wed)Markowitz在20世紀(jì)50年代引進(jìn)了均值-方差模型成了現(xiàn)代證券組合理論的基石...
    Mc杰夫閱讀 1,250評(píng)論 0 0

友情鏈接更多精彩內(nèi)容