Flink(1.13) FlinkSql自定義函數(shù)

函數(shù)分類

官網介紹
Currently, Flink distinguishes between the following kinds of functions:

  • Scalar functions:標量函數(shù)將標量值映射到一個新的標量值。
  • Table functions:制表函數(shù)將標量值映射到新行(類似于列轉行)。
  • Aggregate functions:聚合函數(shù)將多行標量值映射為新標量值。
  • Table aggregate functions:屬于Table functionsAggregate functions功能的合并。
  • Async table functions:異步表函數(shù)是用于執(zhí)行查找的表源的特殊函數(shù)。

自定義 Scalar functions

  • 準備一個類
import org.apache.flink.table.functions.ScalarFunction;

/**
 * 字符串轉換大寫
 * @author admin
 * @date 2021/8/21
 */
public class MyScalarFunctionByUppercase extends ScalarFunction {

}
  • 異常:需要自定義eval方法。
org.apache.flink.table.api.ValidationException: Function class 'com.admin.flink.demo12.function.MyScalarFunctionByUppercase' 
does not implement a method named 'eval'.

方法名必須叫eval,參數(shù)和返回值隨意。

public class MyScalarFunctionByUppercase extends ScalarFunction {
    public String eval(String words){
        return words.toUpperCase();
    }
}
  • 應用
    @Test
    public void test1(){
        // 環(huán)境準備
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 模擬數(shù)據
        DataStreamSource<String> source = env.fromElements("java", "google", "hello");

        // 給字段取名
        Table table = tableEnv.fromDataStream(source,$("words"));

        // 使用內聯(lián)的方式
        table.select($("words"),call(MyScalarFunctionByUppercase.class,$("words")))
                .execute()
                .print();
    }
  • 查詢
+----+--------------------------------+--------------------------------+
| op |                          words |                            _c1 |
+----+--------------------------------+--------------------------------+
| +I |                           java |                           JAVA |
| +I |                         google |                         GOOGLE |
| +I |                          hello |                          HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set
  • 注冊后再使用
        // 先注冊再使用
        tableEnv.createFunction("toUppercase",MyScalarFunctionByUppercase.class);

        table.select($("words"),call("toUppercase",$("words")))
                .execute()
                .print();
  • 查詢
+----+--------------------------------+--------------------------------+
| op |                          words |                            _c1 |
+----+--------------------------------+--------------------------------+
| +I |                           java |                           JAVA |
| +I |                         google |                         GOOGLE |
| +I |                          hello |                          HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set
  • 在sql中使用,必須先注冊
        // 給字段取名
        Table table = tableEnv.fromDataStream(source,$("words"));

        // 先注冊再使用
        tableEnv.createFunction("toUppercase",MyScalarFunctionByUppercase.class);

        // 在sql中使用
        tableEnv.sqlQuery("select words,toUppercase(words) as upp_words from "+table)
        .execute()
        .print();
  • 查詢
+----+--------------------------------+--------------------------------+
| op |                          words |                      upp_words |
+----+--------------------------------+--------------------------------+
| +I |                           java |                           JAVA |
| +I |                         google |                         GOOGLE |
| +I |                          hello |                          HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set

自定義 Table functions

  • 自定義函數(shù)
/**
 * 行專列
 * 泛型:每行數(shù)據有多列
 * @FunctionHint 指定返回列的類型
 * @author admin
 * @date 2021/8/21
 */
@FunctionHint(output = @DataTypeHint("row<w string,len int>"))
public class MyTableFunctionByRowToColumn extends TableFunction<Row> {

    public void eval(String phrase){

        Arrays.stream(phrase.split(" ")).forEach(s -> {
            collect( Row.of(s,s.length()));
        });
    }

}

查詢

+----+--------------------------------+--------------------------------+-------------+
| op |                         phrase |                              w |         len |
+----+--------------------------------+--------------------------------+-------------+
| +I |                   hello world! |                          hello |           5 |
| +I |                   hello world! |                         world! |           6 |
| +I |                     明天 你好! |                           明天 |           2 |
| +I |                     明天 你好! |                          你好! |           3 |
| +I |                      數(shù)碼 寶貝 |                           數(shù)碼 |           2 |
| +I |                      數(shù)碼 寶貝 |                           寶貝 |           2 |
| +I |                   名偵探 柯南! |                         名偵探 |           3 |
| +I |                   名偵探 柯南! |                          柯南! |           3 |
+----+--------------------------------+--------------------------------+-------------+
8 rows in set
  • 應用 table api 使用(內聯(lián))
    @Test
    public void test1(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "數(shù)碼 寶貝", "名偵探 柯南!");

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(source,$("phrase"));

        // 需求:將元數(shù)據炸開,
        table.joinLateral(call(MyTableFunctionByRowToColumn.class,$("phrase")))
                .execute().print();

    }
  • 應用 sql 使用
    @Test
    public void test2(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "數(shù)碼 寶貝", "名偵探 柯南!");

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Table table = tableEnv.fromDataStream(source,$("phrase"));

        // 創(chuàng)建一張臨時表
        tableEnv.createTemporaryView("t",table);

        // 需求:將元數(shù)據炸開,

        // 注冊
        tableEnv.createFunction("rowToColumn",MyTableFunctionByRowToColumn.class);

        //查詢
        tableEnv.sqlQuery("select phrase , w ,len from t join lateral table (rowToColumn(phrase)) on true")
                .execute()
                .print();

    }
  • 取別名,內部內置函數(shù) T 就是用于取別名
    <!--取別名-->
    <sql id="tableFunction2">
        select
        phrase , w1 ,len1
        from #{tableName}
        join lateral table (rowToColumn(phrase))
        as T(w1,len1)
        on true
    </sql>

查詢

+----+--------------------------------+--------------------------------+-------------+
| op |                         phrase |                             w1 |        len1 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                   hello world! |                          hello |           5 |
| +I |                   hello world! |                         world! |           6 |
| +I |                     明天 你好! |                           明天 |           2 |
| +I |                     明天 你好! |                          你好! |           3 |
| +I |                      數(shù)碼 寶貝 |                           數(shù)碼 |           2 |
| +I |                      數(shù)碼 寶貝 |                           寶貝 |           2 |
| +I |                   名偵探 柯南! |                         名偵探 |           3 |
| +I |                   名偵探 柯南! |                          柯南! |           3 |
+----+--------------------------------+--------------------------------+-------------+
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容