函數(shù)分類
官網介紹
Currently, Flink distinguishes between the following kinds of functions:
- Scalar functions:標量函數(shù)將標量值映射到一個新的標量值。
- Table functions:制表函數(shù)將標量值映射到新行(類似于列轉行)。
- Aggregate functions:聚合函數(shù)將多行標量值映射為新標量值。
- Table aggregate functions:屬于
Table functions和Aggregate functions功能的合并。 - Async table functions:異步表函數(shù)是用于執(zhí)行查找的表源的特殊函數(shù)。
自定義 Scalar functions
- 準備一個類
import org.apache.flink.table.functions.ScalarFunction;
/**
* 字符串轉換大寫
* @author admin
* @date 2021/8/21
*/
public class MyScalarFunctionByUppercase extends ScalarFunction {
}
- 異常:需要自定義
eval方法。
org.apache.flink.table.api.ValidationException: Function class 'com.admin.flink.demo12.function.MyScalarFunctionByUppercase'
does not implement a method named 'eval'.
方法名必須叫eval,參數(shù)和返回值隨意。
public class MyScalarFunctionByUppercase extends ScalarFunction {
public String eval(String words){
return words.toUpperCase();
}
}
- 應用
@Test
public void test1(){
// 環(huán)境準備
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 模擬數(shù)據
DataStreamSource<String> source = env.fromElements("java", "google", "hello");
// 給字段取名
Table table = tableEnv.fromDataStream(source,$("words"));
// 使用內聯(lián)的方式
table.select($("words"),call(MyScalarFunctionByUppercase.class,$("words")))
.execute()
.print();
}
- 查詢
+----+--------------------------------+--------------------------------+
| op | words | _c1 |
+----+--------------------------------+--------------------------------+
| +I | java | JAVA |
| +I | google | GOOGLE |
| +I | hello | HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set
- 注冊后再使用
// 先注冊再使用
tableEnv.createFunction("toUppercase",MyScalarFunctionByUppercase.class);
table.select($("words"),call("toUppercase",$("words")))
.execute()
.print();
- 查詢
+----+--------------------------------+--------------------------------+
| op | words | _c1 |
+----+--------------------------------+--------------------------------+
| +I | java | JAVA |
| +I | google | GOOGLE |
| +I | hello | HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set
- 在sql中使用,必須先注冊
// 給字段取名
Table table = tableEnv.fromDataStream(source,$("words"));
// 先注冊再使用
tableEnv.createFunction("toUppercase",MyScalarFunctionByUppercase.class);
// 在sql中使用
tableEnv.sqlQuery("select words,toUppercase(words) as upp_words from "+table)
.execute()
.print();
- 查詢
+----+--------------------------------+--------------------------------+
| op | words | upp_words |
+----+--------------------------------+--------------------------------+
| +I | java | JAVA |
| +I | google | GOOGLE |
| +I | hello | HELLO |
+----+--------------------------------+--------------------------------+
3 rows in set
自定義 Table functions
- 自定義函數(shù)
/**
* 行專列
* 泛型:每行數(shù)據有多列
* @FunctionHint 指定返回列的類型
* @author admin
* @date 2021/8/21
*/
@FunctionHint(output = @DataTypeHint("row<w string,len int>"))
public class MyTableFunctionByRowToColumn extends TableFunction<Row> {
public void eval(String phrase){
Arrays.stream(phrase.split(" ")).forEach(s -> {
collect( Row.of(s,s.length()));
});
}
}
查詢
+----+--------------------------------+--------------------------------+-------------+
| op | phrase | w | len |
+----+--------------------------------+--------------------------------+-------------+
| +I | hello world! | hello | 5 |
| +I | hello world! | world! | 6 |
| +I | 明天 你好! | 明天 | 2 |
| +I | 明天 你好! | 你好! | 3 |
| +I | 數(shù)碼 寶貝 | 數(shù)碼 | 2 |
| +I | 數(shù)碼 寶貝 | 寶貝 | 2 |
| +I | 名偵探 柯南! | 名偵探 | 3 |
| +I | 名偵探 柯南! | 柯南! | 3 |
+----+--------------------------------+--------------------------------+-------------+
8 rows in set
- 應用 table api 使用(內聯(lián))
@Test
public void test1(){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "數(shù)碼 寶貝", "名偵探 柯南!");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv.fromDataStream(source,$("phrase"));
// 需求:將元數(shù)據炸開,
table.joinLateral(call(MyTableFunctionByRowToColumn.class,$("phrase")))
.execute().print();
}
- 應用 sql 使用
@Test
public void test2(){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "數(shù)碼 寶貝", "名偵探 柯南!");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv.fromDataStream(source,$("phrase"));
// 創(chuàng)建一張臨時表
tableEnv.createTemporaryView("t",table);
// 需求:將元數(shù)據炸開,
// 注冊
tableEnv.createFunction("rowToColumn",MyTableFunctionByRowToColumn.class);
//查詢
tableEnv.sqlQuery("select phrase , w ,len from t join lateral table (rowToColumn(phrase)) on true")
.execute()
.print();
}
- 取別名,內部內置函數(shù) T 就是用于取別名
<!--取別名-->
<sql id="tableFunction2">
select
phrase , w1 ,len1
from #{tableName}
join lateral table (rowToColumn(phrase))
as T(w1,len1)
on true
</sql>
查詢
+----+--------------------------------+--------------------------------+-------------+
| op | phrase | w1 | len1 |
+----+--------------------------------+--------------------------------+-------------+
| +I | hello world! | hello | 5 |
| +I | hello world! | world! | 6 |
| +I | 明天 你好! | 明天 | 2 |
| +I | 明天 你好! | 你好! | 3 |
| +I | 數(shù)碼 寶貝 | 數(shù)碼 | 2 |
| +I | 數(shù)碼 寶貝 | 寶貝 | 2 |
| +I | 名偵探 柯南! | 名偵探 | 3 |
| +I | 名偵探 柯南! | 柯南! | 3 |
+----+--------------------------------+--------------------------------+-------------+