Flink 使用之 SQL UDF

Flink 使用介紹相關(guān)文檔目錄

Flink 使用介紹相關(guān)文檔目錄

簡(jiǎn)介

在使用純Flink SQL的場(chǎng)景下,對(duì)于復(fù)雜業(yè)務(wù)邏輯,F(xiàn)link提供的內(nèi)置fucntion是無(wú)法滿足要求的。我們需要實(shí)現(xiàn)自定義的function,來(lái)擴(kuò)充Flink的功能。用戶自己實(shí)現(xiàn)的function稱(chēng)為UDF(user defined function)。

Flink支持如下四種UDF:

  • ScalarFunction: 類(lèi)似于Flink算子的map,一對(duì)一轉(zhuǎn)換。
  • TableFunction: 類(lèi)似于flatmap,一對(duì)多。
  • AggregateFunction: 類(lèi)似于reduce,多對(duì)一。通過(guò)聚合操作把多行輸出為一個(gè)值。
  • TableAggregateFunction: 多對(duì)多。目前沒(méi)發(fā)現(xiàn)如何在SQL中使用(官網(wǎng)給出了在Table API中的使用方法),暫不介紹。

編寫(xiě)注意事項(xiàng)

  • 編寫(xiě)UDF需要在項(xiàng)目中引入如下依賴(lài)。
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
  • UDF必須繼承自ScalarFunction等基類(lèi)。
  • UDF必須定義為public,不能為abstract。必須能被全局訪問(wèn)到。所以說(shuō)不能包含非靜態(tài)內(nèi)部類(lèi)或者匿名類(lèi)。
  • 必須擁有默認(rèn)構(gòu)造函數(shù)(無(wú)參數(shù)構(gòu)造函數(shù))。使用Table API的時(shí)候可以支持使用有參數(shù)構(gòu)造函數(shù)的UDF來(lái)構(gòu)造有狀態(tài)UDF。SQL模式建議使用無(wú)狀態(tài)UDF。
  • UDF必須無(wú)狀態(tài),只能包含static字段和transient字段。

注冊(cè)UDF

定義好的UDF在SQL使用之前,必須要注冊(cè)。注冊(cè)方法有如下兩種。

使用Java API:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// 注冊(cè)UDF
// 創(chuàng)建UDF,無(wú)法覆蓋已經(jīng)存在的同名function。該function位于目前所在的catalog和database中(有命名空間)。全名為catalog_name.database_name.function_name
tEnv.createFunction("function_name", new MyFunction());
// 創(chuàng)建臨時(shí)function,可以覆蓋已存在的function,有命名空間
tEnv.createTemporaryFunction("function_name", new MyFunction());
// 創(chuàng)建臨時(shí)系統(tǒng)function,可以覆蓋已存在的function,位于全局,無(wú)命名空間概念
tEnv.createTemporarySystemFunction("function_name", new MyFunction());

使用SQL方式:

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
  [IF NOT EXISTS] [[catalog_name.]db_name.]function_name
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]

具體解釋和Java API相同,不再贅述。

例如:

CREATE TEMPORARY SYSTEM FUNCTION changecase AS 'com.paultech.ChangeCaseTool';

注意:必須把UDF的jar包添加到Flink框架的classpath下(例如放置到$FLINK_HOME/lib中)。或者通過(guò)ADD JAR動(dòng)態(tài)加載用戶jar到classpath。參見(jiàn)https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/jar/。

結(jié)果計(jì)算

UDF可以按照實(shí)際需要,重寫(xiě)基類(lèi)提供的open(),close()isDeterministic()方法。

UDF的結(jié)果計(jì)算方法例如eval(), accumulate(), 或者 retract()方法,在運(yùn)行階段被動(dòng)態(tài)生成的代碼調(diào)用。

結(jié)果計(jì)算方法可以定義一個(gè)或者多個(gè)參數(shù),可以使用重載方法,也可以使用變長(zhǎng)參數(shù)。

類(lèi)型推斷

Flink Table API是強(qiáng)類(lèi)型API,所有函數(shù)的參數(shù)類(lèi)型和返回類(lèi)型都必須映射為DataType。Flink支持自動(dòng)類(lèi)型推斷和通過(guò)注解(@DataTypeHint@FunctionHint)指定類(lèi)型。如果有更為復(fù)雜的類(lèi)型推斷邏輯,可以重寫(xiě)父類(lèi)的getTypeInference方法。

自動(dòng)類(lèi)型推斷

對(duì)于自動(dòng)類(lèi)型推斷,Java數(shù)據(jù)類(lèi)型和DataType類(lèi)型對(duì)應(yīng)關(guān)系參見(jiàn)https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/types/#data-type-extraction

注解顯式指定類(lèi)型

@DataTypeHint可用于返回值,方法體(作用于返回值)和方法參數(shù)上,從而修改返回值或者式參數(shù)的DataType。

@DataTypeHint支持復(fù)雜類(lèi)型,例如@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")

@FunctionHint適用于一個(gè)eval等結(jié)果計(jì)算方法可以接收多組類(lèi)型不同的參數(shù),返回值類(lèi)型和接收參數(shù)類(lèi)型相關(guān)的這種場(chǎng)景。我們貼出官網(wǎng)的例子:

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

// function with overloaded evaluation methods
// but globally defined output type
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public static class OverloadedFunction extends TableFunction<Row> {

  public void eval(int a, int b) {
    collect(Row.of("Sum", a + b));
  }

  // overloading of arguments is still possible
  public void eval() {
    collect(Row.of("Empty args", -1));
  }
}

// decouples the type inference from evaluation methods,
// the type inference is entirely determined by the function hints
@FunctionHint(
  input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
  output = @DataTypeHint("INT")
)
@FunctionHint(
  input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
  output = @DataTypeHint("BIGINT")
)
@FunctionHint(
  input = {},
  output = @DataTypeHint("BOOLEAN")
)
public static class OverloadedFunction extends TableFunction<Object> {

  // an implementer just needs to make sure that a method exists
  // that can be called by the JVM
  public void eval(Object... o) {
    if (o.length == 0) {
      collect(false);
    }
    collect(o[0]);
  }
}

自定義類(lèi)型推斷

如果注解無(wú)法描述類(lèi)型推斷邏輯,可以重寫(xiě)getTypeInference方法,使用代碼實(shí)現(xiàn)復(fù)雜的類(lèi)型推斷邏輯。寫(xiě)法和參考官網(wǎng)的例子。

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.types.Row;

public static class LiteralFunction extends ScalarFunction {
  public Object eval(String s, String type) {
    switch (type) {
      case "INT":
        return Integer.valueOf(s);
      case "DOUBLE":
        return Double.valueOf(s);
      case "STRING":
      default:
        return s;
    }
  }

  // the automatic, reflection-based type inference is disabled and
  // replaced by the following logic
  @Override
  public TypeInference getTypeInference(DataTypeFactory typeFactory) {
    return TypeInference.newBuilder()
      // specify typed arguments
      // parameters will be casted implicitly to those types if necessary
      .typedArguments(DataTypes.STRING(), DataTypes.STRING())
      // specify a strategy for the result data type of the function
      .outputTypeStrategy(callContext -> {
        if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
          throw callContext.newValidationError("Literal expected for second argument.");
        }
        // return a data type based on a literal
        final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING");
        switch (literal) {
          case "INT":
            return Optional.of(DataTypes.INT().notNull());
          case "DOUBLE":
            return Optional.of(DataTypes.DOUBLE().notNull());
          case "STRING":
          default:
            return Optional.of(DataTypes.STRING());
        }
      })
      .build();
  }
}

確定性

如果UDF不能返回確定的結(jié)果(例如random(), date()now()),必須重寫(xiě)isDeterministic()并返回false。這涉及到執(zhí)行計(jì)劃優(yōu)化過(guò)程。

如果UDF的isDeterministic()返回true,并且傳入的參數(shù)全都是常量,在planning階段該UDF的值會(huì)被預(yù)先計(jì)算出來(lái)。例如SELECT ABS(-1)會(huì)優(yōu)化為SELECT 1。但是SELECT ABS(field) FROM t不會(huì)優(yōu)化,因?yàn)?code>field不是常量。

如果UDF的isDeterministic()返回false,或者傳入的參數(shù)存在變量,UDF的值在執(zhí)行階段才會(huì)被計(jì)算出來(lái)。

open和close方法

openclose方法可用于編寫(xiě)自定義的初始化和清理邏輯。open方法的執(zhí)行時(shí)機(jī)早于eval等結(jié)果計(jì)算方法。

可參考官網(wǎng)的例子(https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/udfs/#runtime-integration)。這個(gè)例子在啟動(dòng)作業(yè)的時(shí)候加載job parameter。

import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

public static class HashCodeFunction extends ScalarFunction {

    private int factor = 0;

    @Override
    public void open(FunctionContext context) throws Exception {
        // access the global "hashcode_factor" parameter
        // "12" would be the default value if the parameter does not exist
        factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "12"));
    }

    public int eval(String s) {
        return s.hashCode() * factor;
    }
}

TableEnvironment env = TableEnvironment.create(...);

// add job parameter
env.getConfig().addJobParameter("hashcode_factor", "31");

// register the function
env.createTemporarySystemFunction("hashCode", HashCodeFunction.class);

// use the function
env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable");

ScalarFunction

直接用例子說(shuō)明用法。我們編寫(xiě)一個(gè)大小寫(xiě)轉(zhuǎn)換UDF??梢越邮斩鄠€(gè)參數(shù)。默認(rèn)將string轉(zhuǎn)換為大寫(xiě)?;蛘呤峭ㄟ^(guò)boolean指定轉(zhuǎn)換為大寫(xiě)還是小寫(xiě)。

public static class ChangeCaseTool extends ScalarFunction {
    public String eval(String s) {
        return s.toUpperCase(Locale.ROOT);
    }
    public String eval(String s, Boolean changeToUppercase) {
        if (changeToUppercase) {
            return s.toUpperCase(Locale.ROOT);
        } else {
            return s.toLowerCase(Locale.ROOT);
        }
    }
}

配合如下例子使用:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// 測(cè)試表數(shù)據(jù)
DataStreamSource<Row> streamSource = env.fromElements(
        Row.of("hello#world", 100),
        Row.of("hola#hastaLAvista", 50));

// 轉(zhuǎn)換DataStream為T(mén)able,并指定字段名
Table table = tEnv.fromDataStream(streamSource).as("name", "value");

// 將Table映射為demo表
tEnv.createTemporaryView("demo", table);

// 注冊(cè)UDF
tEnv.createTemporaryFunction("changecase", new ChangeCaseTool());

// 執(zhí)行SQL時(shí)候調(diào)用UDF
tEnv.executeSql("select changecase(`name`, true) as `name`, `value` from demo").print();

輸出如下:

+----+--------------------------------+-------------+
| op |                           name |       value |
+----+--------------------------------+-------------+
| +I |                    HELLO#WORLD |         100 |
| +I |              HOLA#HASTALAVISTA |          50 |
+----+--------------------------------+-------------+

TableFunction

TableFunction將一個(gè)字段拆分為多列。

同樣直接以例子說(shuō)明。例子中的方法將內(nèi)容按照指定的delimiter拆分,然后獲取拆分后第一個(gè)和第二個(gè)字符串,列明分別為word1和word2。

@FunctionHint(output = @DataTypeHint("ROW<word1 STRING, word2 STRING>"))
public static class StringSplitter extends TableFunction<Row> {
    public void eval(String s, String delimiter) {
        String[] split = s.split(delimiter);
        if (split.length >= 2) {
            collect(Row.of(split[0], split[1]));
        } else if (split.length == 1) {
            collect(Row.of(split[0], null));
        }
    }
}

由于TableFunction的計(jì)算結(jié)果是一個(gè)偽表,我們對(duì)它進(jìn)行操作的時(shí)候(例如join)需要使用LATERAL TABLE(function(field))或者LATERAL TABLE(function(field)) AS T(NEW_FIELD_NAME1, NEW_FIELD_NAME2)(修改字段名)把UDF計(jì)算結(jié)果作為表來(lái)使用。

例子如下:

tEnv.createTemporaryFunction("split", new StringSplitter());
tEnv.executeSql("select * from demo, lateral table(split(`name`, '#'))").print();

結(jié)果如下:

+----+--------------------------------+-------------+--------------------------------+--------------------------------+
| op |                           name |       value |                          word1 |                          word2 |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+
| +I |                    hello#world |         100 |                          hello |                          world |
| +I |              hola#hastaLAvista |          50 |                           hola |                   hastaLAvista |
+----+--------------------------------+-------------+--------------------------------+--------------------------------+

AggregateFunction

用一個(gè)例子說(shuō)明。編寫(xiě)一個(gè)自定義聚合函數(shù)MyAvg,根據(jù)物品單價(jià)和數(shù)量,求單價(jià)的平均值。UDF代碼如下:

// 自定義聚合器,持有總價(jià)和數(shù)量,以便于計(jì)算平均值
public static class MyAvgAggregator {
    public double sum;
    public int count;
}

// AggregateFunction需要聲明聚合結(jié)果數(shù)據(jù)類(lèi)型和自定義聚合器類(lèi)型
public static class MyAvg extends AggregateFunction<Double, MyAvgAggregator> {

    // 獲取計(jì)算結(jié)果的方法
    @Override
    public Double getValue(MyAvgAggregator accumulator) {
        return accumulator.sum / accumulator.count;
    }

    // 創(chuàng)建自定義聚合器
    @Override
    public MyAvgAggregator createAccumulator() {
        return new MyAvgAggregator();
    }

    // 聚合方法(必須),將數(shù)據(jù)加入到聚合器
    public void accumulate(MyAvgAggregator acc, Double unit, Integer count) {
        acc.sum += unit * count;
        acc.count += count;
    }

    // 撤回方法(可選),假設(shè)數(shù)據(jù)已經(jīng)添加進(jìn)自定義聚合器。該方法指定了將數(shù)據(jù)從自定義聚合器減去的邏輯。
    // 對(duì)于unbounded tables進(jìn)行bounded OVER 聚合運(yùn)算,必須提供此方法(需要減去over window舊的聚合數(shù)據(jù),添加新的數(shù)據(jù)后重新計(jì)算聚合結(jié)果)
    public void retract(MyAvgAggregator acc, Double unit, Integer count) {
        acc.sum -= unit * count;
        acc.count -= count;
    }

    // 合并方法(可選),包含合并多個(gè)自定義聚合器的邏輯
    // 對(duì)于unbounded session window grouping聚合和bounded grouping聚合,必須提供此方法
    public void merge(MyAvgAggregator acc, Iterable<MyAvgAggregator> it) {
        for (MyAvgAggregator a : it) {
            acc.count += a.count;
            acc.sum += a.sum;
        }
    }
}

例子如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStreamSource<Row> streamSource = env.fromElements(
        Row.of("Apple", 4.0, 50),
        Row.of("Banana", 7.5, 20),
        Row.of("Peach", 8.0, 15)
);

Table table = tEnv.fromDataStream(streamSource).as("name", "unit_price", "count");

tEnv.createTemporaryView("demo", table);

tEnv.createTemporaryFunction("myavg", new MyAvg());

tEnv.executeSql("select myavg(`unit_price`, `count`) as avg_unit_price from demo").print();

執(zhí)行結(jié)果:

+----+--------------------------------+
| op |                 avg_unit_price |
+----+--------------------------------+
| +I |              5.529411764705882 |
+----+--------------------------------+

當(dāng)然,對(duì)于金額運(yùn)算結(jié)果,我們可以讓MyAvg返回BigDecimal類(lèi)型(其實(shí)可以使用ROUND函數(shù)解決。這里我們演示下上面介紹的@FunctionHint注解用法)。我們改寫(xiě)MyAvg如下:

// 增加注解,聲明輸入和輸出的數(shù)據(jù)類(lèi)型
@FunctionHint(input = {@DataTypeHint("DOUBLE"), @DataTypeHint("INT")}, output = @DataTypeHint("DECIMAL(12, 2)"))
public static class MyAvg extends AggregateFunction<BigDecimal, MyAvgAggregator> {
    @Override
    public BigDecimal getValue(MyAvgAggregator accumulator) {
        return BigDecimal.valueOf(accumulator.sum).divide(BigDecimal.valueOf(accumulator.count), 2, RoundingMode.HALF_DOWN);
    }

    // 其余方法完全相同,此處省略
    // ...
}

查詢(xún)SQL修改為:

tEnv.executeSql("select cast(myavg(`unit_price`, `count`) as DECIMAL(12, 2)) as avg_unit_price from demo").print();

結(jié)果如下:

+----+----------------+
| op | avg_unit_price |
+----+----------------+
| +I |           5.53 |
+----+----------------+

參考文檔

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/udfs/#user-defined-functions

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/types/

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容