Flink 使用介紹相關(guān)文檔目錄
簡(jiǎn)介
在使用純Flink SQL的場(chǎng)景下,對(duì)于復(fù)雜業(yè)務(wù)邏輯,F(xiàn)link提供的內(nèi)置fucntion是無(wú)法滿足要求的。我們需要實(shí)現(xiàn)自定義的function,來(lái)擴(kuò)充Flink的功能。用戶自己實(shí)現(xiàn)的function稱(chēng)為UDF(user defined function)。
Flink支持如下四種UDF:
- ScalarFunction: 類(lèi)似于Flink算子的map,一對(duì)一轉(zhuǎn)換。
- TableFunction: 類(lèi)似于flatmap,一對(duì)多。
- AggregateFunction: 類(lèi)似于reduce,多對(duì)一。通過(guò)聚合操作把多行輸出為一個(gè)值。
- TableAggregateFunction: 多對(duì)多。目前沒(méi)發(fā)現(xiàn)如何在SQL中使用(官網(wǎng)給出了在Table API中的使用方法),暫不介紹。
編寫(xiě)注意事項(xiàng)
- 編寫(xiě)UDF需要在項(xiàng)目中引入如下依賴(lài)。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
- UDF必須繼承自
ScalarFunction等基類(lèi)。 - UDF必須定義為public,不能為abstract。必須能被全局訪問(wèn)到。所以說(shuō)不能包含非靜態(tài)內(nèi)部類(lèi)或者匿名類(lèi)。
- 必須擁有默認(rèn)構(gòu)造函數(shù)(無(wú)參數(shù)構(gòu)造函數(shù))。使用Table API的時(shí)候可以支持使用有參數(shù)構(gòu)造函數(shù)的UDF來(lái)構(gòu)造有狀態(tài)UDF。SQL模式建議使用無(wú)狀態(tài)UDF。
- UDF必須無(wú)狀態(tài),只能包含static字段和transient字段。
注冊(cè)UDF
定義好的UDF在SQL使用之前,必須要注冊(cè)。注冊(cè)方法有如下兩種。
使用Java API:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 注冊(cè)UDF
// 創(chuàng)建UDF,無(wú)法覆蓋已經(jīng)存在的同名function。該function位于目前所在的catalog和database中(有命名空間)。全名為catalog_name.database_name.function_name
tEnv.createFunction("function_name", new MyFunction());
// 創(chuàng)建臨時(shí)function,可以覆蓋已存在的function,有命名空間
tEnv.createTemporaryFunction("function_name", new MyFunction());
// 創(chuàng)建臨時(shí)系統(tǒng)function,可以覆蓋已存在的function,位于全局,無(wú)命名空間概念
tEnv.createTemporarySystemFunction("function_name", new MyFunction());
使用SQL方式:
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
[IF NOT EXISTS] [[catalog_name.]db_name.]function_name
AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
具體解釋和Java API相同,不再贅述。
例如:
CREATE TEMPORARY SYSTEM FUNCTION changecase AS 'com.paultech.ChangeCaseTool';
注意:必須把UDF的jar包添加到Flink框架的classpath下(例如放置到
$FLINK_HOME/lib中)。或者通過(guò)ADD JAR動(dòng)態(tài)加載用戶jar到classpath。參見(jiàn)https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/jar/。
結(jié)果計(jì)算
UDF可以按照實(shí)際需要,重寫(xiě)基類(lèi)提供的open(),close()和isDeterministic()方法。
UDF的結(jié)果計(jì)算方法例如eval(), accumulate(), 或者 retract()方法,在運(yùn)行階段被動(dòng)態(tài)生成的代碼調(diào)用。
結(jié)果計(jì)算方法可以定義一個(gè)或者多個(gè)參數(shù),可以使用重載方法,也可以使用變長(zhǎng)參數(shù)。
類(lèi)型推斷
Flink Table API是強(qiáng)類(lèi)型API,所有函數(shù)的參數(shù)類(lèi)型和返回類(lèi)型都必須映射為DataType。Flink支持自動(dòng)類(lèi)型推斷和通過(guò)注解(@DataTypeHint和@FunctionHint)指定類(lèi)型。如果有更為復(fù)雜的類(lèi)型推斷邏輯,可以重寫(xiě)父類(lèi)的getTypeInference方法。
自動(dòng)類(lèi)型推斷
對(duì)于自動(dòng)類(lèi)型推斷,Java數(shù)據(jù)類(lèi)型和DataType類(lèi)型對(duì)應(yīng)關(guān)系參見(jiàn)https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/types/#data-type-extraction。
注解顯式指定類(lèi)型
@DataTypeHint可用于返回值,方法體(作用于返回值)和方法參數(shù)上,從而修改返回值或者式參數(shù)的DataType。
@DataTypeHint支持復(fù)雜類(lèi)型,例如@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")。
@FunctionHint適用于一個(gè)eval等結(jié)果計(jì)算方法可以接收多組類(lèi)型不同的參數(shù),返回值類(lèi)型和接收參數(shù)類(lèi)型相關(guān)的這種場(chǎng)景。我們貼出官網(wǎng)的例子:
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
// function with overloaded evaluation methods
// but globally defined output type
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public static class OverloadedFunction extends TableFunction<Row> {
public void eval(int a, int b) {
collect(Row.of("Sum", a + b));
}
// overloading of arguments is still possible
public void eval() {
collect(Row.of("Empty args", -1));
}
}
// decouples the type inference from evaluation methods,
// the type inference is entirely determined by the function hints
@FunctionHint(
input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
output = @DataTypeHint("INT")
)
@FunctionHint(
input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
output = @DataTypeHint("BIGINT")
)
@FunctionHint(
input = {},
output = @DataTypeHint("BOOLEAN")
)
public static class OverloadedFunction extends TableFunction<Object> {
// an implementer just needs to make sure that a method exists
// that can be called by the JVM
public void eval(Object... o) {
if (o.length == 0) {
collect(false);
}
collect(o[0]);
}
}
自定義類(lèi)型推斷
如果注解無(wú)法描述類(lèi)型推斷邏輯,可以重寫(xiě)getTypeInference方法,使用代碼實(shí)現(xiàn)復(fù)雜的類(lèi)型推斷邏輯。寫(xiě)法和參考官網(wǎng)的例子。
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.types.Row;
public static class LiteralFunction extends ScalarFunction {
public Object eval(String s, String type) {
switch (type) {
case "INT":
return Integer.valueOf(s);
case "DOUBLE":
return Double.valueOf(s);
case "STRING":
default:
return s;
}
}
// the automatic, reflection-based type inference is disabled and
// replaced by the following logic
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
// specify typed arguments
// parameters will be casted implicitly to those types if necessary
.typedArguments(DataTypes.STRING(), DataTypes.STRING())
// specify a strategy for the result data type of the function
.outputTypeStrategy(callContext -> {
if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
throw callContext.newValidationError("Literal expected for second argument.");
}
// return a data type based on a literal
final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING");
switch (literal) {
case "INT":
return Optional.of(DataTypes.INT().notNull());
case "DOUBLE":
return Optional.of(DataTypes.DOUBLE().notNull());
case "STRING":
default:
return Optional.of(DataTypes.STRING());
}
})
.build();
}
}
確定性
如果UDF不能返回確定的結(jié)果(例如random(), date()或now()),必須重寫(xiě)isDeterministic()并返回false。這涉及到執(zhí)行計(jì)劃優(yōu)化過(guò)程。
如果UDF的isDeterministic()返回true,并且傳入的參數(shù)全都是常量,在planning階段該UDF的值會(huì)被預(yù)先計(jì)算出來(lái)。例如SELECT ABS(-1)會(huì)優(yōu)化為SELECT 1。但是SELECT ABS(field) FROM t不會(huì)優(yōu)化,因?yàn)?code>field不是常量。
如果UDF的isDeterministic()返回false,或者傳入的參數(shù)存在變量,UDF的值在執(zhí)行階段才會(huì)被計(jì)算出來(lái)。
open和close方法
open和close方法可用于編寫(xiě)自定義的初始化和清理邏輯。open方法的執(zhí)行時(shí)機(jī)早于eval等結(jié)果計(jì)算方法。
可參考官網(wǎng)的例子(https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/udfs/#runtime-integration)。這個(gè)例子在啟動(dòng)作業(yè)的時(shí)候加載job parameter。
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public static class HashCodeFunction extends ScalarFunction {
private int factor = 0;
@Override
public void open(FunctionContext context) throws Exception {
// access the global "hashcode_factor" parameter
// "12" would be the default value if the parameter does not exist
factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "12"));
}
public int eval(String s) {
return s.hashCode() * factor;
}
}
TableEnvironment env = TableEnvironment.create(...);
// add job parameter
env.getConfig().addJobParameter("hashcode_factor", "31");
// register the function
env.createTemporarySystemFunction("hashCode", HashCodeFunction.class);
// use the function
env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable");
ScalarFunction
直接用例子說(shuō)明用法。我們編寫(xiě)一個(gè)大小寫(xiě)轉(zhuǎn)換UDF??梢越邮斩鄠€(gè)參數(shù)。默認(rèn)將string轉(zhuǎn)換為大寫(xiě)?;蛘呤峭ㄟ^(guò)boolean指定轉(zhuǎn)換為大寫(xiě)還是小寫(xiě)。
public static class ChangeCaseTool extends ScalarFunction {
public String eval(String s) {
return s.toUpperCase(Locale.ROOT);
}
public String eval(String s, Boolean changeToUppercase) {
if (changeToUppercase) {
return s.toUpperCase(Locale.ROOT);
} else {
return s.toLowerCase(Locale.ROOT);
}
}
}
配合如下例子使用:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 測(cè)試表數(shù)據(jù)
DataStreamSource<Row> streamSource = env.fromElements(
Row.of("hello#world", 100),
Row.of("hola#hastaLAvista", 50));
// 轉(zhuǎn)換DataStream為T(mén)able,并指定字段名
Table table = tEnv.fromDataStream(streamSource).as("name", "value");
// 將Table映射為demo表
tEnv.createTemporaryView("demo", table);
// 注冊(cè)UDF
tEnv.createTemporaryFunction("changecase", new ChangeCaseTool());
// 執(zhí)行SQL時(shí)候調(diào)用UDF
tEnv.executeSql("select changecase(`name`, true) as `name`, `value` from demo").print();
輸出如下:
+----+--------------------------------+-------------+
| op | name | value |
+----+--------------------------------+-------------+
| +I | HELLO#WORLD | 100 |
| +I | HOLA#HASTALAVISTA | 50 |
+----+--------------------------------+-------------+
TableFunction
TableFunction將一個(gè)字段拆分為多列。
同樣直接以例子說(shuō)明。例子中的方法將內(nèi)容按照指定的delimiter拆分,然后獲取拆分后第一個(gè)和第二個(gè)字符串,列明分別為word1和word2。
@FunctionHint(output = @DataTypeHint("ROW<word1 STRING, word2 STRING>"))
public static class StringSplitter extends TableFunction<Row> {
public void eval(String s, String delimiter) {
String[] split = s.split(delimiter);
if (split.length >= 2) {
collect(Row.of(split[0], split[1]));
} else if (split.length == 1) {
collect(Row.of(split[0], null));
}
}
}
由于TableFunction的計(jì)算結(jié)果是一個(gè)偽表,我們對(duì)它進(jìn)行操作的時(shí)候(例如join)需要使用LATERAL TABLE(function(field))或者LATERAL TABLE(function(field)) AS T(NEW_FIELD_NAME1, NEW_FIELD_NAME2)(修改字段名)把UDF計(jì)算結(jié)果作為表來(lái)使用。
例子如下:
tEnv.createTemporaryFunction("split", new StringSplitter());
tEnv.executeSql("select * from demo, lateral table(split(`name`, '#'))").print();
結(jié)果如下:
+----+--------------------------------+-------------+--------------------------------+--------------------------------+
| op | name | value | word1 | word2 |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+
| +I | hello#world | 100 | hello | world |
| +I | hola#hastaLAvista | 50 | hola | hastaLAvista |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+
AggregateFunction
用一個(gè)例子說(shuō)明。編寫(xiě)一個(gè)自定義聚合函數(shù)MyAvg,根據(jù)物品單價(jià)和數(shù)量,求單價(jià)的平均值。UDF代碼如下:
// 自定義聚合器,持有總價(jià)和數(shù)量,以便于計(jì)算平均值
public static class MyAvgAggregator {
public double sum;
public int count;
}
// AggregateFunction需要聲明聚合結(jié)果數(shù)據(jù)類(lèi)型和自定義聚合器類(lèi)型
public static class MyAvg extends AggregateFunction<Double, MyAvgAggregator> {
// 獲取計(jì)算結(jié)果的方法
@Override
public Double getValue(MyAvgAggregator accumulator) {
return accumulator.sum / accumulator.count;
}
// 創(chuàng)建自定義聚合器
@Override
public MyAvgAggregator createAccumulator() {
return new MyAvgAggregator();
}
// 聚合方法(必須),將數(shù)據(jù)加入到聚合器
public void accumulate(MyAvgAggregator acc, Double unit, Integer count) {
acc.sum += unit * count;
acc.count += count;
}
// 撤回方法(可選),假設(shè)數(shù)據(jù)已經(jīng)添加進(jìn)自定義聚合器。該方法指定了將數(shù)據(jù)從自定義聚合器減去的邏輯。
// 對(duì)于unbounded tables進(jìn)行bounded OVER 聚合運(yùn)算,必須提供此方法(需要減去over window舊的聚合數(shù)據(jù),添加新的數(shù)據(jù)后重新計(jì)算聚合結(jié)果)
public void retract(MyAvgAggregator acc, Double unit, Integer count) {
acc.sum -= unit * count;
acc.count -= count;
}
// 合并方法(可選),包含合并多個(gè)自定義聚合器的邏輯
// 對(duì)于unbounded session window grouping聚合和bounded grouping聚合,必須提供此方法
public void merge(MyAvgAggregator acc, Iterable<MyAvgAggregator> it) {
for (MyAvgAggregator a : it) {
acc.count += a.count;
acc.sum += a.sum;
}
}
}
例子如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource<Row> streamSource = env.fromElements(
Row.of("Apple", 4.0, 50),
Row.of("Banana", 7.5, 20),
Row.of("Peach", 8.0, 15)
);
Table table = tEnv.fromDataStream(streamSource).as("name", "unit_price", "count");
tEnv.createTemporaryView("demo", table);
tEnv.createTemporaryFunction("myavg", new MyAvg());
tEnv.executeSql("select myavg(`unit_price`, `count`) as avg_unit_price from demo").print();
執(zhí)行結(jié)果:
+----+--------------------------------+
| op | avg_unit_price |
+----+--------------------------------+
| +I | 5.529411764705882 |
+----+--------------------------------+
當(dāng)然,對(duì)于金額運(yùn)算結(jié)果,我們可以讓MyAvg返回BigDecimal類(lèi)型(其實(shí)可以使用ROUND函數(shù)解決。這里我們演示下上面介紹的@FunctionHint注解用法)。我們改寫(xiě)MyAvg如下:
// 增加注解,聲明輸入和輸出的數(shù)據(jù)類(lèi)型
@FunctionHint(input = {@DataTypeHint("DOUBLE"), @DataTypeHint("INT")}, output = @DataTypeHint("DECIMAL(12, 2)"))
public static class MyAvg extends AggregateFunction<BigDecimal, MyAvgAggregator> {
@Override
public BigDecimal getValue(MyAvgAggregator accumulator) {
return BigDecimal.valueOf(accumulator.sum).divide(BigDecimal.valueOf(accumulator.count), 2, RoundingMode.HALF_DOWN);
}
// 其余方法完全相同,此處省略
// ...
}
查詢(xún)SQL修改為:
tEnv.executeSql("select cast(myavg(`unit_price`, `count`) as DECIMAL(12, 2)) as avg_unit_price from demo").print();
結(jié)果如下:
+----+----------------+
| op | avg_unit_price |
+----+----------------+
| +I | 5.53 |
+----+----------------+
參考文檔
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/types/