基于 Table API 實(shí)現(xiàn)實(shí)時(shí)報(bào)表

Apache Flink 提供了一個(gè)統(tǒng)一的關(guān)系A(chǔ)PI —— Table API, 用于批處理和實(shí)時(shí)計(jì)算。也就是說(shuō),無(wú)論是在無(wú)界的實(shí)時(shí)數(shù)據(jù)流上還是在有界批量數(shù)據(jù)集上都可以使用相同的語(yǔ)義執(zhí)行查詢,并且得到相同的結(jié)果。Flink Table API 通常用于簡(jiǎn)化數(shù)據(jù)分析,數(shù)據(jù)管道和ETL應(yīng)用的定義。

What Will You Be Building?

在本教程中,你將會(huì)學(xué)習(xí)如何構(gòu)建實(shí)時(shí)儀表盤(pán)來(lái)按賬戶跟蹤財(cái)務(wù)交易。數(shù)據(jù)管道將從Kafka讀取數(shù)據(jù),把結(jié)果數(shù)據(jù)寫(xiě)入MySQL并通過(guò)Grafana進(jìn)行可視化。

Prerequisites

本次演練假設(shè)你對(duì)Java或者Scala有一定的了解,但是即使你熟悉和使用的是其他語(yǔ)言,也應(yīng)該可以理解。同時(shí)還假設(shè)你熟悉基本的關(guān)系概念,例如 selectgroup by

Help, I’m Stuck!

如果你陷入困境,可以查看 community support resources。值得一提的是,Apache Flink 的用戶郵件列表一直被評(píng)為所有Apache項(xiàng)目中最活躍的項(xiàng)目之一,因此通過(guò)郵件列表進(jìn)行求助嗎,也不失為一種很棒的快速解決問(wèn)題途徑。

How To Follow Along

如果你想繼續(xù)跟進(jìn)本教程,你將需要具有以下環(huán)境的一臺(tái)計(jì)算機(jī):

  • Java 8 或者 Java 11
  • Maven
  • Docker

本次演練需要的配置文件在 flink-playgrounds倉(cāng)庫(kù)中提供,下載之后,在你的IDE中打開(kāi) flink-palyground/table-walkthrough,并導(dǎo)航到文件 SpendReport。

 EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        tEnv.executeSql("CREATE TABLE transactions (\n" +
                "    account_id  BIGINT,\n" +
                "    amount      BIGINT,\n" +
                "    transaction_time TIMESTAMP(3),\n" +
                "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic'     = 'transactions',\n" +
                "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
                "    'format'    = 'csv'\n" +
                ")");

        tEnv.executeSql("CREATE TABLE spend_report (\n" +
                "    account_id BIGINT,\n" +
                "    log_ts     TIMESTAMP(3),\n" +
                "    amount     BIGINT\n," +
                "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
                ") WITH (\n" +
                "  'connector'  = 'jdbc',\n" +
                "  'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
                "  'table-name' = 'spend_report',\n" +
                "  'driver'     = 'com.mysql.jdbc.Driver',\n" +
                "  'username'   = 'sql-demo',\n" +
                "  'password'   = 'demo-sql'\n" +
                ")");

        Table transactions = tEnv.from("transactions");
        report(transactions).executeInsert("spend_report");

代碼分析

執(zhí)行環(huán)境

代碼前兩行設(shè)置了 TableEnvironment. 它用于配置作業(yè)的屬性,指定是以批的方式還是以流的方式運(yùn)行程序以及數(shù)據(jù)源的創(chuàng)建。本次演練使用流式的模式創(chuàng)建了一個(gè)標(biāo)準(zhǔn)的 TableEnvironment.

        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

注冊(cè)表

接下來(lái),在當(dāng)前的 Catalog中注冊(cè)表,你可以使用它連接外部系統(tǒng)以讀寫(xiě)批量數(shù)據(jù)集和流式數(shù)據(jù)。數(shù)據(jù)源表提供了對(duì)存儲(chǔ)在外部系統(tǒng)中數(shù)據(jù)的訪問(wèn),這些系統(tǒng)包括數(shù)據(jù)庫(kù)、鍵值存儲(chǔ)、消息隊(duì)列、文件系統(tǒng)等。表數(shù)據(jù)匯向外部系統(tǒng)按表的方式寫(xiě)入數(shù)據(jù)。不同類型的數(shù)據(jù)源和數(shù)據(jù)匯支持不同的存儲(chǔ)格式,例如CSV、JSON、Avro或者Parquet等。

tEnv.executeSql("CREATE TABLE transactions (\n" +
     "    account_id  BIGINT,\n" +
     "    amount      BIGINT,\n" +
     "    transaction_time TIMESTAMP(3),\n" +
     "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
     ") WITH (\n" +
     "    'connector' = 'kafka',\n" +
     "    'topic'     = 'transactions',\n" +
     "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
     "    'format'    = 'csv'\n" +
     ")");

需要注冊(cè)兩個(gè)表,一個(gè)是輸入表transaction,另一個(gè)是輸出表spend report. 表 transaction讀取信用卡交易數(shù)據(jù),其中包含賬戶ID(account_id),時(shí)間戳和美元金額。該表是kafka 主題 transactions上的一個(gè)邏輯表,該主題存儲(chǔ)的數(shù)據(jù)格式為CSV。

tEnv.executeSql("CREATE TABLE spend_report (\n" +
    "    account_id BIGINT,\n" +
    "    log_ts     TIMESTAMP(3),\n" +
    "    amount     BIGINT\n," +
    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
    ") WITH (\n" +
    "    'connector'  = 'jdbc',\n" +
    "    'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
    "    'table-name' = 'spend_report',\n" +
    "    'driver'     = 'com.mysql.jdbc.Driver',\n" +
    "    'username'   = 'sql-demo',\n" +
    "    'password'   = 'demo-sql'\n" +
    ")");

第二個(gè)表spend_report存儲(chǔ)著最終的聚合結(jié)果,它是mysql數(shù)據(jù)庫(kù)中的一個(gè)表。

查詢

在配置完運(yùn)行環(huán)境并注冊(cè)了表之后,你就可以開(kāi)始著手構(gòu)建第一個(gè)應(yīng)用程序了。可以從TableEnvironment中獲取輸入表以讀取數(shù)據(jù),并且使用 executeInsert將結(jié)果數(shù)據(jù)寫(xiě)入到輸出表。report函數(shù)是你實(shí)現(xiàn)真正業(yè)務(wù)邏輯的地方,目前尚未實(shí)現(xiàn)。

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings); 

測(cè)試

本項(xiàng)目包含了一個(gè)輔助測(cè)試類SpendReportTest用于驗(yàn)證報(bào)表邏輯,它已批處理模式創(chuàng)建TableEnvironment

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings); 

Flink的特性之一是在提供了跨越批處理和流式處理的一致性語(yǔ)義。也就是說(shuō)你可以在靜態(tài)數(shù)據(jù)集上以批處理的模式開(kāi)發(fā)和測(cè)試應(yīng)用程序,并將其作為流應(yīng)用程序部署到生產(chǎn)環(huán)境中。

實(shí)現(xiàn)業(yè)務(wù)邏輯

現(xiàn)在作業(yè)的框架已經(jīng)設(shè)置完成,你可以添加一些業(yè)務(wù)邏輯了。我們的目標(biāo)是構(gòu)建一個(gè)展示每個(gè)賬戶一天中每個(gè)小時(shí)總支出的報(bào)告。這就意味這時(shí)間戳字段的粒度需要從毫秒換算到小時(shí)。

Flink支持使用純SQL或者Table API開(kāi)發(fā)關(guān)系應(yīng)用程序。Table API 是和SQL具備相同功能的流暢的DSL,可以用Python,Java,或者Scala編寫(xiě),并且支持強(qiáng)大的IDE集成功能。可以完全像SQL一樣進(jìn)行選擇需要的字段進(jìn)行查詢并且按鍵值分組聚合,再使用一些f內(nèi)置的函數(shù)例如floor和sum,就可以來(lái)實(shí)現(xiàn)report了。

public static Table report(Table transactions) {
    return transactions.select(
            $("account_id"),
            $("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
            $("amount"))
        .groupBy($("account_id"), $("log_ts"))
        .select(
            $("account_id"),
            $("log_ts"),
            $("amount").sum().as("amount"));
}

用戶自定義函數(shù)

Flink包含了數(shù)據(jù)有限的內(nèi)置函數(shù),有時(shí)候你需要通過(guò)自定義函數(shù)來(lái)實(shí)現(xiàn)某些需求。如果floor沒(méi)有在內(nèi)置函數(shù)中實(shí)現(xiàn),你可以選擇自己去實(shí)現(xiàn)。

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;

public class MyFloor extends ScalarFunction {

    public @DataTypeHint("TIMESTAMP(3)") LocalDateTime eval(
        @DataTypeHint("TIMESTAMP(3)") LocalDateTime timestamp) {

        return timestamp.truncatedTo(ChronoUnit.HOURS);
    }
}

之后可以快速將其應(yīng)用在你的程序當(dāng)中,

public static Table report(Table transactions) {
    return transactions.select(
            $("account_id"),
            call(MyFloor.class, $("transaction_time")).as("log_ts"),
            $("amount"))
        .groupBy($("account_id"), $("log_ts"))
        .select(
            $("account_id"),
            $("log_ts"),
            $("amount").sum().as("amount"));
}

此查詢,讀取transactions表中的所有記錄,計(jì)算報(bào)表,并且以高效和可擴(kuò)展的方式輸出結(jié)果。使用此實(shí)現(xiàn)運(yùn)行測(cè)試將通過(guò)。

添加窗口函數(shù)

基于時(shí)間對(duì)數(shù)據(jù)分組在數(shù)據(jù)處理中是一種典型的操作,特別是在處理無(wú)界的流式數(shù)據(jù)時(shí)。基于時(shí)間的分組被稱為窗口,F(xiàn)link提供了靈活的窗口語(yǔ)義。最基本的窗口是滾動(dòng)窗口,它具有固定的大小并且不同窗口之間不存在重疊。

public static Table report(Table transactions) {
    return transactions
        .window(Tumble.over(lit(1).hour()).on($("transaction_time")).as("log_ts"))
        .groupBy($("account_id"), $("log_ts"))
        .select(
            $("account_id"),
            $("log_ts").start().as("log_ts"),
            $("amount").sum().as("amount"));
}

你的應(yīng)用程序基于時(shí)間戳這一列定義了一小時(shí)的滾動(dòng)窗口。因此,時(shí)間戳為 2019-06-01 01:23:47 的行被放入 2019-06-01 01:00:00窗口。

基于時(shí)間的聚合是唯一的,因?yàn)榕c其他屬性不同的是,時(shí)間在連續(xù)的流應(yīng)用程序中通常是向前移動(dòng)的。與floor和其他UDF不同的是, 窗口函數(shù)是可以在運(yùn)行時(shí)應(yīng)用額外優(yōu)化的內(nèi)部函數(shù)。在批處理上下文中,窗口函數(shù)提供了一個(gè)便利的API可以根據(jù)時(shí)間戳屬性對(duì)記錄進(jìn)行分組。

使用此實(shí)現(xiàn)運(yùn)行測(cè)試也將通過(guò)。

流處理模式

現(xiàn)在,這就是一個(gè)具備完整功能的有狀態(tài)的分布式流應(yīng)用程序。該查詢不斷地消費(fèi)來(lái)自kafka的transactions數(shù)據(jù)流,并計(jì)算每小時(shí)的總開(kāi)支,并在結(jié)果準(zhǔn)備就緒后立即發(fā)出。由于輸入的數(shù)據(jù)流式無(wú)界的,所以該查詢將一直運(yùn)行除非手動(dòng)停止。因?yàn)樽鳂I(yè)使用的實(shí)際基于時(shí)間的窗口聚合,所以當(dāng)框架判定不會(huì)再有更多的記錄到達(dá)某個(gè)特定窗口的時(shí)候,F(xiàn)link會(huì)執(zhí)行特定的優(yōu)化,例如狀態(tài)清理。

本次演練 的流應(yīng)用程序是完全基于的Docker的,并且可以本地運(yùn)行。該環(huán)境中包括 Kafka ,一個(gè)連續(xù)的數(shù)據(jù)生成器、MySQL和Grafana。

table-walkthrough文件夾中啟動(dòng)docker-compose腳本:

$ docker-compose build
$ docker-compose up -d

你可以通過(guò)Flink控制臺(tái)查看正在運(yùn)行的作業(yè)的信息:
[圖片上傳失敗...(image-cceed-1619139096330)]
在內(nèi)置的MySQL數(shù)據(jù)庫(kù)中查詢結(jié)果數(shù)據(jù):

$ docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql

mysql> use sql-demo;
Database changed

mysql> select count(*) from spend_report;
+----------+
| count(*) |
+----------+
|      110 |
+----------+

最終,可以使用Grafana對(duì)結(jié)果進(jìn)行可視化展示:
[圖片上傳失敗...(image-c85cce-1619139096331)]

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容