2022-06-23-Flink-51(三. SQL手冊)

1. 函數(shù)類型

函數(shù) | Apache Flink
Flink 中的函數(shù)有兩個劃分標(biāo)準(zhǔn)

  1. 一個劃分標(biāo)準(zhǔn)是:系統(tǒng)(內(nèi)置)函數(shù)和 Catalog 函數(shù)。系統(tǒng)函數(shù)沒有名稱空間,只能通過其名稱來進行引用。 Catalog 函數(shù)屬于 Catalog 和數(shù)據(jù)庫,因此它們擁有 Catalog 和數(shù)據(jù)庫命名空間。 用戶可以通過全/部分限定名(catalog.db.func 或 db.func)或者函數(shù)名 來對 Catalog 函數(shù)進行引用。
  2. 另一個劃分標(biāo)準(zhǔn)是:臨時函數(shù)和持久化函數(shù)。 臨時函數(shù)始終由用戶創(chuàng)建,它容易改變并且僅在會話的生命周期內(nèi)有效。 持久化函數(shù)不是由系統(tǒng)提供,就是存儲在 Catalog 中,它在會話的整個生命周期內(nèi)都有效

看一下函數(shù)如何引用和函數(shù)解析優(yōu)先級??

2. 系統(tǒng)內(nèi)置函數(shù)

系統(tǒng)(內(nèi)置)函數(shù) | Apache Flink

3. 自定義函數(shù)

當(dāng)前 Flink 有如下幾種函數(shù):

  1. 標(biāo)量函數(shù) 將標(biāo)量值轉(zhuǎn)換成一個新標(biāo)量值;
  2. 表值函數(shù) 將標(biāo)量值轉(zhuǎn)換成新的行數(shù)據(jù);
  3. 聚合函數(shù) 將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成一個新標(biāo)量值;
  4. 表值聚合函數(shù) 將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成新的行數(shù)據(jù);
  5. 異步表值函數(shù) 是異步查詢外部數(shù)據(jù)系統(tǒng)的特殊函數(shù)。
public class tableDemo5 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //輸入表
        tableEnv.executeSql("CREATE TABLE input_table( tag STRING ,uid INT, money INT ) " +
                " WITH( 'connector' = 'datagen',  " +
                " 'rows-per-second'='1',  " +
                " 'fields.uid.kind'='sequence', " +
                " 'fields.uid.start'='1'," +
                " 'fields.uid.end'='1000'," +
                " 'fields.tag.length'='1'," +
                " 'fields.money.min'='1'," +
                " 'fields.money.max'='1000')");

        //輸出表
        tableEnv.executeSql("CREATE TABLE out_Table (tag STRING, money_2 BIGINT, money BIGINT ) WITH ( 'connector' = 'print' )");
        //注冊自定義標(biāo)量函數(shù)
        tableEnv.createTemporarySystemFunction("myDouble",DoubleFunction.class);
        tableEnv.executeSql("INSERT INTO out_Table SELECT tag ,myDouble(money),money FROM input_table");
        env.execute();



    }

    public static class DoubleFunction extends ScalarFunction {
        public Integer eval(Integer money) {
            return money * 2;
        }
    }
}

如果你的函數(shù)在初始化時,是有入?yún)⒌模敲葱枰愕娜雲(yún)⑹?Serializable 的。即 Java 中需要繼承 Serializable 接口

public class tableDemo6 {

    public static void main(String[] args) throws Exception {
               EnvironmentSettings setting = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(setting);
        //輸入表
        tableEnv.executeSql("CREATE TABLE input_table( tag STRING ,uid INT, money INT ) " +
                " WITH( 'connector' = 'datagen',  " +
                " 'rows-per-second'='1',  " +
                " 'fields.uid.kind'='sequence', " +
                " 'fields.uid.start'='1'," +
                " 'fields.uid.end'='1000'," +
                " 'fields.tag.length'='1'," +
                " 'fields.money.min'='1'," +
                " 'fields.money.max'='1000')");

        //輸出表
        tableEnv.executeSql("CREATE TABLE out_Table (tag STRING, money_2 BIGINT, money BIGINT ) WITH ( 'connector' = 'print' )");
        //注冊自定義標(biāo)量函數(shù)
        tableEnv.createTemporarySystemFunction("myDouble",new DoubleFunction(false));
        tableEnv.executeSql("INSERT INTO out_Table SELECT tag ,myDouble(money),money FROM input_table");



    }

    public static class DoubleFunction extends ScalarFunction {
        private boolean endInclusive;

        public DoubleFunction(boolean endInclusive) {
            this.endInclusive = endInclusive;
        }

        public Integer eval(Integer money) {
            if (endInclusive) {
                 return money * 2;
            }
            return  money;
        }
    }
}

4. 開發(fā)UDF的需知事項

  1. 首先需要繼承 Flink SQL UDF 體系提供的基類,每種 UDF 實現(xiàn)都有不同的基類
  2. 實現(xiàn) UDF 執(zhí)行邏輯函數(shù),不同類型的 UDF 需要實現(xiàn)不同的執(zhí)行邏輯函數(shù)
  3. 注意 UDF 入?yún)ⅰ⒊鰠㈩愋屯茖?dǎo),F(xiàn)link 在一些基礎(chǔ)類型上的是可以直接推導(dǎo)出類型信息的,但是一些復(fù)雜類型就無能為力了,這里需要用戶主動介入
  4. 明確 UDF 輸出結(jié)果是否是定值,如果是定值則 Flink 會在生成計劃時就執(zhí)行一遍,得出結(jié)果,然后使用這個定值的結(jié)果作為后續(xù)的執(zhí)行邏輯的參數(shù),這樣可以做到不用在 Flink SQL 任務(wù)運行時每次都執(zhí)行一次,會有性能優(yōu)化
  5. 巧妙運用運行時上下文,可以在任務(wù)運行前加載到一些外部資源、上下文配置信息,擴展 UDF 能力
注意 UDF 入?yún)?、出參類型推?dǎo)

Data Types | Apache Flink

5. SQL 標(biāo)量函數(shù)(Scalar Function)

// 有多個重載求值方法的函數(shù)
public static class OverloadedFunction extends ScalarFunction {

  // 不需要任何聲明,可以直接推導(dǎo)出類型信息,即入?yún)⒑统鰠?yīng)到 SQL 中的 bigint 類型
  public Long eval(long a, long b) {
    return a + b;
  }

  // 使用 @DataTypeHint("DECIMAL(12, 3)") 定義 decimal 的精度和小數(shù)位
  public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
    return BigDecimal.valueOf(a + b);
  }

  // 使用注解定義嵌套數(shù)據(jù)類型
  @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
  public Row eval(int i) {
    return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
  }

  // 允許任意類型的輸入,并輸出序列化定制后的值
  @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
  public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
    return MyUtils.serializeToByteBuffer(o);
  }
}

6. SQL 表值函數(shù)(Table Function)

表值函數(shù)即 UDTF,常用于進一條數(shù)據(jù),出多條數(shù)據(jù)的場景

public class tableDemo6 {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings setting = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(setting);
        tableEnv.executeSql("CREATE TABLE input_table (id INT ,tag STRING) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/e.txt', 'format' = 'csv')");
        tableEnv.executeSql("CREATE TABLE out_Table (id INT, tag_1 STRING, tag_2 INT ) WITH ( 'connector' = 'print' )");
        tableEnv.createTemporarySystemFunction("MySplit", MyFunction.class);
        tableEnv.executeSql("INSERT INTO out_Table SELECT  id,tag_1, tag_2 FROM input_table , LATERAL TABLE(MySplit(tag)) ");
    }

    //自定義實現(xiàn)ScalarFunction
    @FunctionHint(output = @DataTypeHint("ROW<tag_1 STRING, tag_2 INT>"))
    public static class MyFunction extends TableFunction<Row> {

        public void eval(String str) {
            String[] split = str.split("\\|");
            for (String s : split) {
                collect(Row.of(s, s.length()));
            }
        }
    }
}

BUG:Recovery is suppressed by NoRestartBackoffTimeStrategy
Row導(dǎo)錯包了...

如果你是使用 Scala 實現(xiàn)函數(shù),不要使用 Scala 中 object 實現(xiàn) UDF,Scala object 是單例的,有可能會導(dǎo)致并發(fā)問題

7. SQL 聚合函數(shù)(Aggregate Function)

聚合函數(shù)即 UDAF,常用于進多條數(shù)據(jù),出一條數(shù)據(jù)的場景

  1. 實現(xiàn) AggregateFunction 接口,其中所有的方法必須是 public 的、非 static 的,傳一個是最終的輸出類型和中間狀態(tài)類型
  2. Acc聚合中間結(jié)果 createAccumulator():為當(dāng)前 Key 初始化一個空的 accumulator
  3. accumulate(Acc accumulator, Input輸入?yún)?shù)):對于每一行數(shù)據(jù),都會調(diào)用 accumulate() 方法來更新 accumulator,這個方法就是用于處理每一條輸入數(shù)據(jù);
  4. Output輸出參數(shù) getValue(Acc accumulator):通過調(diào)用 getValue 方法來計算和返回最終的結(jié)果
  5. retract(Acc accumulator, Input輸入?yún)?shù)):在回撤流的場景下必須要實現(xiàn)
  6. merge(Acc accumulator, Iterable<Acc> it):在許多批式聚合以及流式聚合中的 Session、Hop 窗口聚合場景下都是必須要實現(xiàn)的。除此之外,這個方法對于優(yōu)化也很多幫助。例如,如果你打開了兩階段聚合優(yōu)化,就需要 AggregateFunction 實現(xiàn) merge 方法,從而可以做到在數(shù)據(jù)進行 shuffle 前先進行一次聚合計算。
  7. resetAccumulator():在批式聚合中是必須實現(xiàn)的。

public class tableDemo7 {
    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("CREATE TABLE input_table (id INT ,money INT ,cnt INT) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/f.txt', 'format' = 'csv')");
        //加權(quán)平均值
        tableEnv.createTemporarySystemFunction("myAvg",myAvg.class);
        Table table = tableEnv.sqlQuery("SELECT id,myAvg(money,cnt)  FROM input_table GROUP BY id");
        tableEnv.toChangelogStream(table).print();
        env.execute();

    }

    //最終輸出類型和中間狀態(tài)類型
    public static class myAvg extends AggregateFunction<Long, avgAccumulator> {

        // 獲取返回結(jié)果
        @Override
        public Long getValue(avgAccumulator acc) {
            if (acc.count == 0) {
                return null;
            } else {
                return acc.sum / acc.count;

            }
        }
        //初始化
        @Override
        public avgAccumulator createAccumulator() {
            return new avgAccumulator();
        }

        //中間狀態(tài)的計算
        public void accumulate(avgAccumulator acc, Long iMoney, Integer iCnt) {
            acc.sum += iMoney * iCnt;
            acc.count += iCnt;
        }


        // Session window 可以使用這個方法將幾個單獨窗口的結(jié)果合并
        public void merge(avgAccumulator acc, Iterable<avgAccumulator> it) {
            for (avgAccumulator a : it) {
                acc.count += a.count;
                acc.sum += a.sum;
            }
        }

        public void resetAccumulator(avgAccumulator acc) {
            acc.count = 0;
            acc.sum = 0L;
        }
    }

    public static class avgAccumulator {
        public long sum = 0;
        public int count = 0;
    }
}

突然發(fā)現(xiàn)案例都是官網(wǎng)的,哈哈哈哈
User-defined Functions | Apache Flink

8. SQL 表值聚合函數(shù)(Table Aggregate Function)

  1. 實現(xiàn) TableAggregateFunction 接口,其中所有的方法必須是 public 的、非 static 的
  2. Acc聚合中間結(jié)果 createAccumulator():為當(dāng)前 Key 初始化一個空的 accumulator,其存儲了聚合的中間結(jié)果
  3. accumulate(Acc accumulator, Input輸入?yún)?shù)):對于每一行數(shù)據(jù),都會調(diào)用 accumulate() 方法來更新 accumulator
  4. emitValue(Acc accumulator, Collector<OutPut> collector) 或者 emitUpdateWithRetract(Acc accumulator, RetractableCollector<OutPut> collector):當(dāng)遍歷所有的數(shù)據(jù),當(dāng)所有的數(shù)據(jù)都處理完了之后,通過調(diào)用 emit 方法來計算和輸出最終的結(jié)果
  5. retract(Acc accumulator, Input輸入?yún)?shù)):在回撤流的場景下必須要實現(xiàn)
  6. merge(Acc accumulator, Iterable<Acc> it):在許多批式聚合以及流式聚合中的 Session、Hop 窗口聚合場景下都是必須要實現(xiàn)的。

public class tableDemo8 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("CREATE TABLE input_table (id INT ,money INT ,cnt INT) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/f.txt', 'format' = 'csv')");
        //加權(quán)平均值
        tableEnv.createTemporarySystemFunction("top2", Top2.class);
        Table table = tableEnv.from("input_table").groupBy($("id")).flatAggregate(Expressions.call("top2", $("money")).as("value", "row")).select($("id"), $("value"), $("row"));
        tableEnv.toChangelogStream(table).print();
        env.execute();
    }


    public static class Top2Accum {
        public Integer first;
        public Integer second;
        public Integer oldFirst;
        public Integer oldSecond;
    }

    public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {

        //初始化
        @Override
        public Top2Accum createAccumulator() {
            Top2Accum acc = new Top2Accum();
            acc.first = Integer.MIN_VALUE;
            acc.second = Integer.MIN_VALUE;
            acc.oldFirst = Integer.MIN_VALUE;
            acc.oldSecond = Integer.MIN_VALUE;
            return acc;
        }


        public void accumulate(Top2Accum acc, Integer v) {
            if (v > acc.first) {
                acc.second = acc.first;
                acc.first = v;
            } else if (v > acc.second) {
                acc.second = v;
            }
        }

       public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
            // emit the value and rank
            if (acc.first != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.first, 1));
            }
            if (acc.second != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.second, 2));
            }
        }

   public void emitUpdateWithRetract(Top2Accum acc, TableAggregateFunction.RetractableCollector<Tuple2<Integer, Integer>> out) {
        if (!acc.first.equals(acc.oldFirst)) {
            // if there is an update, retract old value then emit new value.
            if (acc.oldFirst != Integer.MIN_VALUE) {
                out.retract(Tuple2.of(acc.oldFirst, 1));
            }
            out.collect(Tuple2.of(acc.first, 1));
            acc.oldFirst = acc.first;
        }

        if (!acc.second.equals(acc.oldSecond)) {
            // if there is an update, retract old value then emit new value.
            if (acc.oldSecond != Integer.MIN_VALUE) {
                out.retract(Tuple2.of(acc.oldSecond, 2));
            }
            out.collect(Tuple2.of(acc.second, 2));
            acc.oldSecond = acc.second;
        }
    }

        public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
            for (Top2Accum otherAcc : iterable) {
                accumulate(acc, otherAcc.first);
                accumulate(acc, otherAcc.second);
            }
        }
    }
}

???emitUpdateWithRetract怎么用不了

9. FlinkSQL-UDF Module

目前 Flink 包含了以下三種 Module:

  1. CoreModule:CoreModule 是 Flink 內(nèi)置的 Module,其包含了目前 Flink 內(nèi)置的所有 UDF,F(xiàn)link 默認開啟的 Module 就是 CoreModule,我們可以直接使用其中的 UDF
  2. HiveModule:HiveModule 可以將 Hive 內(nèi)置函數(shù)作為 Flink 的系統(tǒng)函數(shù)提供給 SQL\Table API 用戶進行使用,比如 get_json_object 這類 Hive 內(nèi)置函數(shù)(Flink 默認的 CoreModule 是沒有的)
  3. 用戶自定義 Module:用戶可以實現(xiàn) Module 接口實現(xiàn)自己的 UDF 擴展 Module
Flink SQL 支持 Hive UDF
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

10. FlinkSQL Catalog

Flink SQL 中是由 Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫、表、分區(qū)、視圖以及數(shù)據(jù)庫或其他外部系統(tǒng)中存儲的函數(shù)和信息。對標(biāo) Hive 去理解就是 Hive 的 MetaStore,都是用于存儲計算引擎涉及到的元數(shù)據(jù)信息。

目前 Flink 包含了以下四種 Catalog:

  1. GenericInMemoryCatalog:GenericInMemoryCatalog 是基于內(nèi)存實現(xiàn)的 Catalog,所有元數(shù)據(jù)只在 session 的生命周期(即一個 Flink 任務(wù)一次運行生命周期內(nèi))內(nèi)可用。
  2. JdbcCatalog:JdbcCatalog 使得用戶可以將 Flink 通過 JDBC 協(xié)議連接到關(guān)系數(shù)據(jù)庫。PostgresCatalog 是當(dāng)前實現(xiàn)的唯一一種 JDBC Catalog,即可以將 Flink SQL 的預(yù)案數(shù)據(jù)存儲在 Postgres 中。
  3. HiveCatalog:HiveCatalog 有兩個用途,作為 Flink 元數(shù)據(jù)的持久化存儲,以及作為讀寫現(xiàn)有 Hive 元數(shù)據(jù)的接口。注意:Hive MetaStore 以小寫形式存儲所有元數(shù)據(jù)對象名稱。而 GenericInMemoryCatalog 會區(qū)分大小寫。
  4. 用戶自定義 Catalog:用戶可以實現(xiàn) Catalog 接口實現(xiàn)自定義 Catalog

11. FlinkSQL 任務(wù)參數(shù)配置

Configuration | Apache Flink

具體參數(shù)分為以下 3 類:

  1. 運行時參數(shù):優(yōu)化 Flink SQL 任務(wù)在執(zhí)行時的任務(wù)性能
  2. 優(yōu)化器參數(shù):Flink SQL 任務(wù)在生成執(zhí)行計劃時,經(jīng)過優(yōu)化器優(yōu)化生成更優(yōu)的執(zhí)行計劃
  3. 表參數(shù):用于調(diào)整 Flink SQL table 的執(zhí)行行為
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容