自定義函數(UDF)可以用 JVM 語言(例如 Java 或 Scala)或 Python 實現,實現者可以在 UDF 中使用任意第三方庫,本文聚焦于使用 JVM 語言開發(fā)自定義函數。
原文檔:https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/functions/udfs/
1、概述
以下示例展示了如何創(chuàng)建一個基本的標量函數。
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;
// 定義函數邏輯
public static class SubstringFunction extends ScalarFunction {
public String eval(String s, Integer begin, Integer end) {
return s.substring(begin, end);
}
}
當前 Flink 有如下幾種函數:
- 標量函數 將標量值轉換成一個新標量值;
- 表值函數 將標量值轉換成新的行數據;
- 聚合函數 將多行數據里的標量值轉換成一個新標量值;
- 表值聚合函數 將多行數據里的標量值轉換成新的行數據;
- 異步表值函數 是異步查詢外部數據系統(tǒng)的特殊函數。
2、發(fā)開指南
1)函數類
實現類必須繼承自合適的基類之一(例如 org.apache.flink.table.functions.ScalarFunction )。
該類必須聲明為 public ,而不是 abstract ,并且可以被全局訪問。不允許使用非靜態(tài)內部類或匿名類。
2)求值方法
基類提供了一組可以被重寫的方法,例如 open()、 close() 或 isDeterministic() 。
但是,除了上述方法之外,作用于每條傳入記錄的主要邏輯還必須通過專門的求值方法來實現。
根據函數的種類,后臺生成的運算符會在運行時調用諸如 eval()、accumulate() 或 retract() 之類的求值方法。
一個重載函數的示例:
import org.apache.flink.table.functions.ScalarFunction;
// 有多個重載求值方法的函數
public static class SumFunction extends ScalarFunction {
public Integer eval(Integer a, Integer b) {
return a + b;
}
public Integer eval(String a, String b) {
return Integer.valueOf(a) + Integer.valueOf(b);
}
public Integer eval(Double... d) {
double result = 0;
for (double value : d)
result += value;
return (int) result;
}
}
3)類型推導
Flink 自定義函數實現了自動的類型推導提取,通過反射從函數的類及其求值方法中派生數據類型。如果這種隱式的反射提取方法不成功,則可以通過使用 @DataTypeHint 和 @FunctionHint 注解相關參數、類或方法來支持提取過程,下面展示了有關如何注解函數的例子。
自動類型推導之 @DataTypeHint
需要支持以內聯方式自動提取出函數參數、返回值的類型。
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
// 有多個重載求值方法的函數
public static class OverloadedFunction extends ScalarFunction {
// no hint required
public Long eval(long a, long b) {
return a + b;
}
// 定義 decimal 的精度和小數位
public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
return BigDecimal.valueOf(a + b);
}
// 定義嵌套數據類型
@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
public Row eval(int i) {
return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
}
// 允許任意類型的符入,并輸出序列化定制后的值
@DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return MyUtils.serializeToByteBuffer(o);
}
}
自動類型推導之 @FunctionHint
有時我們希望一種求值方法可以同時處理多種數據類型,有時又要求對重載的多個求值方法僅聲明一次通用的結果類型。
@FunctionHint 注解可以提供從入參數據類型到結果數據類型的映射,它可以在整個函數類或求值方法上注解輸入、累加器和結果的數據類型??梢栽陬愴敳柯暶饕粋€或多個注解,也可以為類的所有求值方法分別聲明一個或多個注解。
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
// 為函數類的所有求值方法指定同一個輸出類型
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public static class OverloadedFunction extends TableFunction<Row> {
public void eval(int a, int b) {
collect(Row.of("Sum", a + b));
}
// overloading of arguments is still possible
public void eval() {
collect(Row.of("Empty args", -1));
}
}
// 解耦類型推導與求值方法,類型推導完全取決于 FunctionHint
@FunctionHint(
input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
output = @DataTypeHint("INT")
)
@FunctionHint(
input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
output = @DataTypeHint("BIGINT")
)
@FunctionHint(
input = {},
output = @DataTypeHint("BOOLEAN")
)
public static class OverloadedFunction extends TableFunction<Object> {
// an implementer just needs to make sure that a method exists
// that can be called by the JVM
public void eval(Object... o) {
if (o.length == 0) {
collect(false);
}
collect(o[0]);
}
}
定制類型推導
通過重寫 getTypeInference() 定制自動類型推導邏輯,實現者可以創(chuàng)建任意像系統(tǒng)內置函數那樣有用的函數。
需要包含
import org.apache.flink.table.types.inference.TypeInference;
4)運行時集成
有時候自定義函數需要獲取一些全局信息,或者在真正被調用之前做一些配置(setup)/清理(clean-up)的工作。
open() 方法在求值方法被調用之前先調用。close() 方法在求值方法調用完之后被調用。
open() 方法提供了一個 FunctionContext,它包含了一些自定義函數被執(zhí)行時的上下文信息,比如 metric group、分布式文件緩存,或者是全局的作業(yè)參數等。
下面的信息可以通過調用 FunctionContext 的對應的方法來獲得:
| 方法 | 描述 |
|---|---|
| getMetricGroup() | 執(zhí)行該函數的 subtask 的 Metric Group。 |
| getCachedFile(name) | 分布式文件緩存的本地臨時文件副本。 |
| getJobParameter(name, defaultValue) | 跟對應的 key 關聯的全局參數值。 |
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public static class HashCodeFunction extends ScalarFunction {
private int factor = 0;
@Override
public void open(FunctionContext context) throws Exception {
// 獲取參數 "hashcode_factor"
// 如果不存在,則使用默認值 "12"
factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "12"));
}
public int eval(String s) {
return s.hashCode() * factor;
}
}
TableEnvironment env = TableEnvironment.create(...);
// 設置任務參數
env.getConfig().addJobParameter("hashcode_factor", "31");
// 注冊函數
env.createTemporarySystemFunction("hashCode", HashCodeFunction.class);
// 調用函數
env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable");
3、標量函數
自定義標量函數可以把 0 到多個標量值映射成 1 個標量值,數據類型里列出的任何數據類型都可作為求值方法的參數和返回值類型。
想要實現自定義標量函數,你需要擴展 org.apache.flink.table.functions 里面的 ScalarFunction 并且實現一個或者多個求值方法。標量函數的行為取決于你寫的求值方法。求值方法必須是 public 的,而且名字必須是 eval。
4、表值函數
跟自定義標量函數一樣,自定義表值函數的輸入參數也可以是 0 到多個標量。但是跟標量函數只能返回一個值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果輸出行只包含 1 列,會省略結構化信息并生成標量值,這個標量值在運行階段會隱式地包裝進行里。
要定義一個表值函數,你需要擴展 org.apache.flink.table.functions 下的 TableFunction,可以通過實現多個名為 eval 的方法對求值方法進行重載。
像其他函數一樣,輸入和輸出類型也可以通過反射自動提取出來。表值函數返回的表的類型取決于 TableFunction 類的泛型參數 T,不同于標量函數,表值函數的求值方法本身不包含返回類型,而是通過 collect(T) 方法來發(fā)送要輸出的行。