Apache Flink 學(xué)習(xí)筆記(四)

本篇將演示如何使用 Flink SQL 實(shí)現(xiàn)上一篇demo5的功能,上一篇傳送門 Apache Flink 學(xué)習(xí)筆記(三)

Flink SQl 是無限接近關(guān)系型數(shù)據(jù)庫sql語句的抽象模塊,SQLTable API查詢可以無縫混合,SQL查詢是使用sqlQuery()方法指定的TableEnvironment,該方法返回SQL查詢的結(jié)果為Table。

直接上代碼demo6

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.util.Date;

/**
 * Flink SQL
 */
public class Demo6 {
    private static final String APP_NAME = "app_name";

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableSysoutLogging();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //設(shè)置窗口的時(shí)間單位為process time
        env.setParallelism(1);//全局并發(fā)數(shù)

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "kafka bootstrap.servers");
        //設(shè)置topic和 app name
        //FlinkKafkaManager 源碼見筆記二
        FlinkKafkaManager manager = new FlinkKafkaManager("kafka.topic", APP_NAME, properties);
        FlinkKafkaConsumer09<JSONObject> consumer = manager.build(JSONObject.class);
        consumer.setStartFromLatest();

        //獲取DataStream,并轉(zhuǎn)成Bean3
        DataStream<Bean3> stream = env.addSource(consumer).map(new FlatMap());

        final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
        tableEnvironment.registerDataStream("common", stream, "timestamp,appId,module,tt.proctime");//注冊表名

        //module 必須加``,否則報(bào)錯(cuò)
        String sql = "SELECT appId, COUNT(`module`) AS totals FROM common WHERE appId = '100007336' OR appId = '100013668' GROUP BY TUMBLE(tt, INTERVAL '10' SECOND),appId";
        Table query = tableEnvironment.sqlQuery(sql);

        DataStream<Row> result = tableEnvironment.toAppendStream(query, Row.class);
        result.process(new ProcessFunction<Row, Object>() {
            @Override
            public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
                System.out.println(String.format("AppId:%s, Module Count:%s", value.getField(0).toString(), value.getField(1).toString()));
            }
        });

        try {
            env.execute(APP_NAME);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class FlatMap implements MapFunction<JSONObject, Bean3> {
        @Override
        public Bean3 map(JSONObject jsonObject) throws Exception {
            return new Bean3(new Date().getTime(), jsonObject.getString("appId"), jsonObject.getString("module"));
        }
    }
}

除了StreamTableEnvironment處理不一樣,其余代碼幾乎沒有改變,這里需要注意的是,Table API中的window函數(shù)在SQL里表現(xiàn)為TUMBLE(tt, INTERVAL '10' SECOND)(針對滾動窗口)

更多細(xì)節(jié)部分,參考官網(wǎng)文檔即可(我自己也沒怎么看,特殊需求特殊對待,哈哈)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Apache Flink(下簡稱Flink)項(xiàng)目是大數(shù)據(jù)處理領(lǐng)域最近冉冉升起的一顆新星,其不同于其他大數(shù)據(jù)項(xiàng)目的諸...
    Alukar閱讀 27,294評論 1 48
  • Spark SQL, DataFrames and Datasets Guide Overview SQL Dat...
    Joyyx閱讀 8,494評論 0 16
  • Apache Flink features two relational APIs - the Table API...
    zachary_1db5閱讀 1,181評論 0 0
  • 春風(fēng)吹來春天 也吹來它最真誠的祝福 慷慨地釋放所有新綠 綠得真讓人心醉 腳下踩著綠的影子 漫天縈繞著春的氣息 聚攏...
    6葛丁滟閱讀 129評論 0 0
  • 昨天跟宋老師談了一下,並不知道要談什麼,只是內(nèi)在有個(gè)力量推著我,需要去聽聽宋老師的建議。 宋老師說“一個(gè)人若不接受...
    HaiyanF閱讀 234評論 0 0

友情鏈接更多精彩內(nèi)容