詳解flink中Look up維表的使用

背景

在流式計(jì)算中,維表是一個(gè)很常見(jiàn)的概念,一般用于sql的join中,對(duì)流式數(shù)據(jù)進(jìn)行數(shù)據(jù)補(bǔ)全,比如我們的source stream是來(lái)自日志的訂單數(shù)據(jù),但是日志中我們只是記錄了訂單商品的id,并沒(méi)有其他的信息,但是我們把數(shù)據(jù)存入數(shù)倉(cāng)進(jìn)行數(shù)據(jù)分析的時(shí)候,卻需要商品名稱、價(jià)格等等其他的信息,這種問(wèn)題我們可以在進(jìn)行流處理的時(shí)候通過(guò)查詢維表的方式對(duì)數(shù)據(jù)進(jìn)行數(shù)據(jù)補(bǔ)全。

維表一般存儲(chǔ)在外部存儲(chǔ)中,比如mysql、hbase、redis等等,今天我們以mysql為例,講講flink中維表的使用。

LookupableTableSource

在flink中提供了一個(gè)LookupableTableSource,可以用于實(shí)現(xiàn)維表,也就是我們可以通過(guò)某幾個(gè)key列去查詢外部存儲(chǔ)來(lái)獲取相關(guān)的信息用于補(bǔ)全stream的數(shù)據(jù)。

public interface LookupableTableSource<T> extends TableSource<T> {

    TableFunction<T> getLookupFunction(String[] lookupKeys);

    AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);

    boolean isAsyncEnabled();
}

我們看到,LookupableTableSource有三個(gè)方法

  • getLookupFunction:用于同步查詢維表的數(shù)據(jù),返回一個(gè)TableFunction,所以本質(zhì)上還是通過(guò)用戶自定義 UDTF來(lái)實(shí)現(xiàn)的。
  • getAsyncLookupFunction:用于異步查詢維表的數(shù)據(jù),該方法返回一個(gè)對(duì)象
  • isAsyncEnabled:默認(rèn)情況下是同步查詢,如果要開(kāi)啟異步查詢,這個(gè)方法需要返回true

在flink里,我們看到實(shí)現(xiàn)了這個(gè)接口的主要有四個(gè)類,JdbcTableSource,HBaseTableSource,CsvTableSource,HiveTableSource,今天我們主要以jdbc為例講講如何進(jìn)行維表查詢。

實(shí)例講解

接下來(lái)我們講一個(gè)小例子,首先定義一下stream source,我們使用flink 1.11提供的datagen來(lái)生成數(shù)據(jù)。

我們來(lái)模擬生成用戶的數(shù)據(jù),這里只生成的用戶的id,范圍在1-100之間。

CREATE TABLE datagen (
 userid int,
 proctime as PROCTIME()
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='100',
 'fields.userid.kind'='random',
 'fields.userid.min'='1',
 'fields.userid.max'='100'
)

datagen具體的使用方法可以參考:

聊聊flink 1.11 中的隨機(jī)數(shù)據(jù)生成器-DataGen connector

然后再創(chuàng)建一個(gè)mysql維表信息:

CREATE TABLE dim_mysql (
  id int,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/test',
   'table-name' = 'userinfo',
   'username' = 'root',
   'password' = 'root'
)

我們這個(gè)mysql表中樣例數(shù)據(jù)如下:

image

最后執(zhí)行sql查詢,流表關(guān)聯(lián)維表:

SELECT * FROM datagen LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF datagen.proctime  ON datagen.userid = dim_mysql.id

結(jié)果示例如下:

3> 53,2020-09-03T07:19:34.565,null,null
3> 73,2020-09-03T07:19:34.566,null,null
1> 14,2020-09-03T07:19:34.566,14,aaddda
2> 11,2020-09-03T07:19:34.566,null,null
4> 8,2020-09-03T07:19:34.566,8,name8
1> 61,2020-09-03T07:19:34.567,null,null
3> 12,2020-09-03T07:19:34.567,12,aaa
2> 99,2020-09-03T07:19:34.567,null,null
4> 37,2020-09-03T07:19:34.568,null,null
2> 13,2020-09-03T07:19:34.569,13,aaddda
3> 6,2020-09-03T07:19:34.568,6,name6

我們看到對(duì)于維表中存在的數(shù)據(jù),已經(jīng)關(guān)聯(lián)出來(lái)了,對(duì)于維表中沒(méi)有的數(shù)據(jù),顯示為null

完整代碼請(qǐng)參考: https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/dimension/JdbcDim.java

源碼解析

JdbcTableSource

以jdbc為例,我們來(lái)看看flink底層是怎么做的。

JdbcTableSource#isAsyncEnabled方法返回的是false,也就是不支持異步的查詢,所以進(jìn)入JdbcTableSource#getLookupFunction方法。

    @Override
    public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
        final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
        return JdbcLookupFunction.builder()
                .setOptions(options)
                .setLookupOptions(lookupOptions)
                .setFieldTypes(rowTypeInfo.getFieldTypes())
                .setFieldNames(rowTypeInfo.getFieldNames())
                .setKeyNames(lookupKeys)
                .build();
    }

最終是構(gòu)造了一個(gè)JdbcLookupFunction對(duì)象,

  • options是連接jdbc的一些參數(shù),比如user、pass、url等。
  • lookupOptions是一些有關(guān)維表的參數(shù),主要是緩存的大小、超時(shí)時(shí)間等。
  • lookupKeys也就是要去關(guān)聯(lián)查詢維表的字段。

JdbcLookupFunction

所以我們來(lái)看看JdbcLookupFunction類,這個(gè)JdbcLookupFunction是一個(gè)TableFunction的子類,具體的TableFunction的使用可以參考這個(gè)文章:

Flink實(shí)戰(zhàn)教程-自定義函數(shù)之TableFunction

一個(gè)TableFunction最核心的就是eval方法,在這個(gè)方法里,做的主要的工作就是通過(guò)傳進(jìn)來(lái)的多個(gè)keys拼接成sql去來(lái)查詢數(shù)據(jù),首先查詢的是緩存,緩存有數(shù)據(jù)就直接返回,緩存沒(méi)有的話再去查詢數(shù)據(jù)庫(kù),然后再將查詢的結(jié)果返回并放入緩存,下次查詢的時(shí)候直接查詢緩存。

為什么要加一個(gè)緩存呢?默認(rèn)情況下是不開(kāi)啟緩存的,每來(lái)一個(gè)查詢,都會(huì)給維表發(fā)送一個(gè)請(qǐng)求去查詢,如果數(shù)據(jù)量比較大的話,勢(shì)必會(huì)給存儲(chǔ)維表的系統(tǒng)造成一定的壓力,所以flink提供了一個(gè)LRU緩存,查詢維表的時(shí)候,先查詢緩存,緩存沒(méi)有再去查詢外部系統(tǒng),但是如果有一個(gè)數(shù)據(jù)查詢頻率比較高,一直被命中,就無(wú)法獲取新數(shù)據(jù)了。所以緩存還要加一個(gè)超時(shí)時(shí)間,過(guò)了這個(gè)時(shí)間,把這個(gè)數(shù)據(jù)強(qiáng)制刪除,去外部系統(tǒng)查詢新的數(shù)據(jù)。

具體的怎么開(kāi)啟緩存呢?我們看下JdbcLookupFunction#open方法

    @Override
    public void open(FunctionContext context) throws Exception {
        try {
            establishConnectionAndStatement();
            this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
                    .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
                    .maximumSize(cacheMaxSize)
                    .build();
        } catch (SQLException sqe) {
            throw new IllegalArgumentException("open() failed.", sqe);
        } catch (ClassNotFoundException cnfe) {
            throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
        }
    }

也就是說(shuō)cacheMaxSize和cacheExpireMs需要同時(shí)設(shè)置,就會(huì)構(gòu)造一個(gè)緩存對(duì)象cache來(lái)緩存數(shù)據(jù).這兩個(gè)參數(shù)對(duì)應(yīng)的DDL的屬性就是lookup.cache.max-rows和lookup.cache.ttl

對(duì)于具體的緩存的大小和超時(shí)時(shí)間的設(shè)置,用戶需要根據(jù)自身的情況來(lái)自己定義,在數(shù)據(jù)的準(zhǔn)確性和系統(tǒng)的吞吐量之間做一個(gè)權(quán)衡。

更多干貨信息,歡迎關(guān)注我的公眾號(hào)【大數(shù)據(jù)技術(shù)與應(yīng)用實(shí)戰(zhàn)】

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容