實(shí)時(shí)數(shù)倉|以u(píng)psert的方式讀寫Kafka數(shù)據(jù)——以Flink1.12為例

在某些場(chǎng)景中,比如GROUP BY聚合之后的結(jié)果,需要去更新之前的結(jié)果值。這個(gè)時(shí)候,需要將 Kafka 消息記錄的 key 當(dāng)成主鍵處理,用來確定一條數(shù)據(jù)是應(yīng)該作為插入、刪除還是更新記錄來處理。在Flink1.11中,可以通過 flink-cdc-connectors 項(xiàng)目提供的 changelog-json format來實(shí)現(xiàn)該功能。關(guān)于該功能的使用,見之前的分享Flink1.11中的CDC Connectors操作實(shí)踐

在Flink1.12版本中, 新增了一個(gè) upsert connector(upsert-kafka),該 connector 擴(kuò)展自現(xiàn)有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作為 source 使用,也可以作為 sink 使用,并且提供了與現(xiàn)有的 kafka connector 相同的基本功能和持久性保證,因?yàn)閮烧咧g復(fù)用了大部分代碼。本文將以Flink1.12為例,介紹該功能的基本使用步驟,以下是全文,希望對(duì)你有所幫助。

公眾號(hào)『大數(shù)據(jù)技術(shù)與數(shù)倉』,回復(fù)『資料』領(lǐng)取大數(shù)據(jù)資料包

Upsert Kafka connector簡介

Upsert Kafka Connector允許用戶以u(píng)psert的方式從Kafka主題讀取數(shù)據(jù)或?qū)?shù)據(jù)寫入Kafka主題。

當(dāng)作為數(shù)據(jù)源時(shí),upsert-kafka Connector會(huì)生產(chǎn)一個(gè)changelog流,其中每條數(shù)據(jù)記錄都表示一個(gè)更新或刪除事件。更準(zhǔn)確地說,如果不存在對(duì)應(yīng)的key,則視為INSERT操作。如果已經(jīng)存在了相對(duì)應(yīng)的key,則該key對(duì)應(yīng)的value值為最后一次更新的值。

用表來類比,changelog 流中的數(shù)據(jù)記錄被解釋為 UPSERT,也稱為 INSERT/UPDATE,因?yàn)槿魏尉哂邢嗤?key 的現(xiàn)有行都被覆蓋。另外,value 為空的消息將會(huì)被視作為 DELETE 消息。

當(dāng)作為數(shù)據(jù)匯時(shí),upsert-kafka Connector會(huì)消費(fèi)一個(gè)changelog流。它將INSERT / UPDATE_AFTER數(shù)據(jù)作為正常的Kafka消息值寫入(即INSERT和UPDATE操作,都會(huì)進(jìn)行正常寫入,如果是更新,則同一個(gè)key會(huì)存儲(chǔ)多條數(shù)據(jù),但在讀取該表數(shù)據(jù)時(shí),只保留最后一次更新的值),并將 DELETE 數(shù)據(jù)以 value 為空的 Kafka 消息寫入(key被打上墓碑標(biāo)記,表示對(duì)應(yīng) key 的消息被刪除)。Flink 將根據(jù)主鍵列的值對(duì)數(shù)據(jù)進(jìn)行分區(qū),從而保證主鍵上的消息有序,因此同一主鍵上的更新/刪除消息將落在同一分區(qū)中

依賴

為了使用Upsert Kafka連接器,需要添加下面的依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.12.0</version>
</dependency>

如果使用SQL Client,需要下載flink-sql-connector-kafka_2.11-1.12.0.jar,并將其放置在Flink安裝目錄的lib文件夾下。

使用方式

使用樣例

-- 創(chuàng)建一張kafka表,用戶存儲(chǔ)sink的數(shù)據(jù)
CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
  'key.format' = 'avro',
  'value.format' = 'avro'
);

尖叫提示:

要使用 upsert-kafka connector,必須在創(chuàng)建表時(shí)使用PRIMARY KEY定義主鍵,并為鍵(key.format)和值(value.format)指定序列化反序列化格式。

upsert-kafka connector參數(shù)

  • connector

必選。指定要使用的連接器,Upsert Kafka 連接器使用:'upsert-kafka'

  • topic

必選。用于讀取和寫入的 Kafka topic 名稱。

  • properties.bootstrap.servers

必選。以逗號(hào)分隔的 Kafka brokers 列表。

  • key.format

必選。用于對(duì) Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 語法指定。支持的格式包括 'csv''json'、'avro'。

  • value.format

必選。用于對(duì) Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 'csv''json''avro'。

  • *properties. **

可選。 該選項(xiàng)可以傳遞任意的 Kafka 參數(shù)。選項(xiàng)的后綴名必須匹配定義在 Kafka 參數(shù)文檔中的參數(shù)名。 Flink 會(huì)自動(dòng)移除 選項(xiàng)名中的 "properties." 前綴,并將轉(zhuǎn)換后的鍵名以及值傳入 KafkaClient。 例如,你可以通過 'properties.allow.auto.create.topics' = 'false' 來禁止自動(dòng)創(chuàng)建 topic。 但是,某些選項(xiàng),例如'key.deserializer''value.deserializer' 是不允許通過該方式傳遞參數(shù),因?yàn)?Flink 會(huì)重寫這些參數(shù)的值。

  • value.fields-include

可選,默認(rèn)為ALL??刂苉ey字段是否出現(xiàn)在 value 中。當(dāng)取ALL時(shí),表示消息的 value 部分將包含 schema 中所有的字段,包括定義為主鍵的字段。當(dāng)取EXCEPT_KEY時(shí),表示記錄的 value 部分包含 schema 的所有字段,定義為主鍵的字段除外。

  • key.fields-prefix

可選。為了避免與value字段命名沖突,為key字段添加一個(gè)自定義前綴。默認(rèn)前綴為空。一旦指定了key字段的前綴,必須在DDL中指明前綴的名稱,但是在構(gòu)建key的序列化數(shù)據(jù)類型時(shí),將移除該前綴。見下面的示例。在需要注意的是:使用該配置屬性,value.fields-include的值必須為EXCEPT_KEY。

-- 創(chuàng)建一張upsert表,當(dāng)指定了qwe前綴,涉及的key必須指定qwe前綴
CREATE TABLE result_total_pvuv_min_prefix (
    qwedo_date     STRING,     -- 統(tǒng)計(jì)日期,必須包含qwe前綴
    qwedo_min      STRING,      -- 統(tǒng)計(jì)分鐘,必須包含qwe前綴
    pv          BIGINT,     -- 點(diǎn)擊量
    uv          BIGINT,     -- 一天內(nèi)同個(gè)訪客多次訪問僅計(jì)算一個(gè)UV
    currenttime TIMESTAMP,  -- 當(dāng)前時(shí)間
    PRIMARY KEY (qwedo_date, qwedo_min) NOT ENFORCED -- 必須包含qwe前綴
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'result_total_pvuv_min_prefix',
  'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
  'key.json.ignore-parse-errors' = 'true',
  'value.json.fail-on-missing-field' = 'false',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields-prefix'='qwe', -- 指定前綴qwe
  'value.fields-include' = 'EXCEPT_KEY' -- key不出現(xiàn)kafka消息的value中
);
-- 向該表中寫入數(shù)據(jù)
INSERT INTO result_total_pvuv_min_prefix
SELECT
  do_date,    --  時(shí)間分區(qū)
  cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- 分鐘級(jí)別的時(shí)間
  pv,
  uv,
  CURRENT_TIMESTAMP AS currenttime -- 當(dāng)前時(shí)間
from
  view_total_pvuv_min;

尖叫提示:

如果指定了key字段前綴,但在DDL中并沒有添加該前綴字符串,那么在向該表寫入數(shù)時(shí),會(huì)拋出下面異常:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: All fields in 'key.fields' must be prefixed with 'qwe' when option 'key.fields-prefix' is set but field 'do_date' is not prefixed.

  • sink.parallelism

可選。定義 upsert-kafka sink 算子的并行度。默認(rèn)情況下,由框架確定并行度,與上游鏈接算子的并行度保持一致。

其他注意事項(xiàng)

Key和Value的序列化格式

關(guān)于Key、value的序列化可以參考Kafka connector。值得注意的是,必須指定Key和Value的序列化格式,其中Key是通過PRIMARY KEY指定的。

Primary Key約束

Upsert Kafka 工作在 upsert 模式(FLIP-149)下。當(dāng)我們創(chuàng)建表時(shí),需要在 DDL 中定義主鍵。具有相同key的數(shù)據(jù),會(huì)存在相同的分區(qū)中。在 changlog source 上定義主鍵意味著在物化后的 changelog 上主鍵具有唯一性。定義的主鍵將決定哪些字段出現(xiàn)在 Kafka 消息的 key 中。

一致性保障

默認(rèn)情況下,如果啟用 checkpoint,Upsert Kafka sink 會(huì)保證至少一次將數(shù)據(jù)插入 Kafka topic。

這意味著,F(xiàn)link 可以將具有相同 key 的重復(fù)記錄寫入 Kafka topic。但由于該連接器以 upsert 的模式工作,該連接器作為 source 讀入時(shí),可以確保具有相同主鍵值下僅最后一條消息會(huì)生效。因此,upsert-kafka 連接器可以像 HBase sink 一樣實(shí)現(xiàn)冪等寫入。

分區(qū)水位線

Flink 支持根據(jù) Upsert Kafka 的 每個(gè)分區(qū)的數(shù)據(jù)特性發(fā)送相應(yīng)的 watermark。當(dāng)使用這個(gè)特性的時(shí)候,watermark 是在 Kafka consumer 內(nèi)部生成的。 合并每個(gè)分區(qū)生成的 watermark 的方式和 streaming shuffle 的方式是一致的(單個(gè)分區(qū)的輸入取最大值,多個(gè)分區(qū)的輸入取最小值)。 數(shù)據(jù)源產(chǎn)生的 watermark 是取決于該 consumer 負(fù)責(zé)的所有分區(qū)中當(dāng)前最小的 watermark。如果該 consumer 負(fù)責(zé)的部分分區(qū)是空閑的,那么整體的 watermark 并不會(huì)前進(jìn)。在這種情況下,可以通過設(shè)置合適的 table.exec.source.idle-timeout 來緩解這個(gè)問題。

數(shù)據(jù)類型

Upsert Kafka 用字節(jié)bytes存儲(chǔ)消息的 key 和 value,因此沒有 schema 或數(shù)據(jù)類型。消息按格式進(jìn)行序列化和反序列化,例如:csv、json、avro。不同的序列化格式所提供的數(shù)據(jù)類型有所不同,因此需要根據(jù)使用的序列化格式進(jìn)行確定表字段的數(shù)據(jù)類型是否與該序列化類型提供的數(shù)據(jù)類型兼容。

使用案例

本文以實(shí)時(shí)地統(tǒng)計(jì)網(wǎng)頁P(yáng)V和UV的總量為例,介紹upsert-kafka基本使用方式:

  • Kafka 數(shù)據(jù)源

用戶的ippv信息,一個(gè)用戶在一天內(nèi)可以有很多次pv

CREATE TABLE source_ods_fact_user_ippv (
    user_id      STRING,       -- 用戶ID
    client_ip    STRING,       -- 客戶端IP
    client_info  STRING,       -- 設(shè)備機(jī)型信息
    pagecode     STRING,       -- 頁面代碼
    access_time  TIMESTAMP,    -- 請(qǐng)求時(shí)間
    dt           STRING,       -- 時(shí)間分區(qū)天
    WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND  -- 定義watermark
) WITH (
   'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_ippv', -- kafka主題
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消費(fèi)者組
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 數(shù)據(jù)源格式為json
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);
  • Kafka Sink表

統(tǒng)計(jì)每分鐘的PV、UV,并將結(jié)果存儲(chǔ)在Kafka中

CREATE TABLE result_total_pvuv_min (
    do_date     STRING,     -- 統(tǒng)計(jì)日期
    do_min      STRING,      -- 統(tǒng)計(jì)分鐘
    pv          BIGINT,     -- 點(diǎn)擊量
    uv          BIGINT,     -- 一天內(nèi)同個(gè)訪客多次訪問僅計(jì)算一個(gè)UV
    currenttime TIMESTAMP,  -- 當(dāng)前時(shí)間
    PRIMARY KEY (do_date, do_min) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'result_total_pvuv_min',
  'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
  'key.json.ignore-parse-errors' = 'true',
  'value.json.fail-on-missing-field' = 'false',
  'key.format' = 'json',
  'value.format' = 'json',
  'value.fields-include' = 'EXCEPT_KEY' -- key不出現(xiàn)kafka消息的value中
);
  • 計(jì)算邏輯
-- 創(chuàng)建視圖
CREATE VIEW view_total_pvuv_min AS
SELECT
     dt AS do_date,                    -- 時(shí)間分區(qū)
     count (client_ip) AS pv,          -- 客戶端的IP
     count (DISTINCT client_ip) AS uv, -- 客戶端去重
     max(access_time) AS access_time   -- 請(qǐng)求的時(shí)間
FROM
    source_ods_fact_user_ippv
GROUP BY dt;

-- 寫入數(shù)據(jù)
INSERT INTO result_total_pvuv_min
SELECT
  do_date,    --  時(shí)間分區(qū)
  cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- 分鐘級(jí)別的時(shí)間
  pv,
  uv,
  CURRENT_TIMESTAMP AS currenttime -- 當(dāng)前時(shí)間
from
  view_total_pvuv_min;
  • 生產(chǎn)用戶訪問數(shù)據(jù)到kafka,向kafka中的user_ippv插入數(shù)據(jù):
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:32:24","dt":"2021-01-08"}

{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1201","access_time":"2021-01-08 11:32:55","dt":"2021-01-08"}

{"user_id":"2","client_ip":"192.165.12.1","client_info":"pc","pagecode":"1031","access_time":"2021-01-08 11:32:59","dt":"2021-01-08"}

{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1101","access_time":"2021-01-08 11:33:24","dt":"2021-01-08"}

{"user_id":"3","client_ip":"192.168.10.3","client_info":"pc","pagecode":"1001","access_time":"2021-01-08 11:33:30","dt":"2021-01-08"}

{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:34:24","dt":"2021-01-08"}
  • 查詢結(jié)果表:
select * from result_total_pvuv_min;

可以看出:每分鐘的pv、uv只顯示一條數(shù)據(jù),即代表著截止到當(dāng)前時(shí)間點(diǎn)的pv和uv

查看Kafka中result_total_pvuv_min主題的數(shù)據(jù),如下:

可以看出:針對(duì)每一條訪問數(shù)據(jù),觸發(fā)計(jì)算了一次PV、UV,每一條數(shù)據(jù)都是截止到當(dāng)前時(shí)間的累計(jì)PV和UV。

尖叫提示:

默認(rèn)情況下,如果在啟用檢查點(diǎn)的情況下執(zhí)行查詢,Upsert Kafka接收器會(huì)將具有至少一次保證的數(shù)據(jù)提取到Kafka主題中。

這意味著,F(xiàn)link可能會(huì)將具有相同鍵的重復(fù)記錄寫入Kafka主題。但是,由于連接器在upsert模式下工作,因此作為源讀回時(shí),同一鍵上的最后一條記錄將生效。因此,upsert-kafka連接器就像HBase接收器一樣實(shí)現(xiàn)冪等寫入。

總結(jié)

本文以Flink1.12為例,介紹了upsert connector(upsert-kafka)的基本使用,該方式允許用戶以u(píng)psert 的方式讀寫Kafka中的表,使用起來非常方便。另外本文也給出了一個(gè)具體的使用案例,可以進(jìn)一步加深對(duì)該功能的使用。

公眾號(hào)『大數(shù)據(jù)技術(shù)與數(shù)倉』,回復(fù)『資料』領(lǐng)取大數(shù)據(jù)資料包---

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 維表是數(shù)倉中的一個(gè)概念,維表中的維度屬性是觀察數(shù)據(jù)的角度,在建設(shè)離線數(shù)倉的時(shí)候,通常是將維表與事實(shí)表進(jìn)行關(guān)聯(lián)構(gòu)建星...
    大數(shù)據(jù)技術(shù)與數(shù)倉閱讀 1,573評(píng)論 0 0
  • 推薦指數(shù): 6.0 書籍主旨關(guān)鍵詞:特權(quán)、焦點(diǎn)、注意力、語言聯(lián)想、情景聯(lián)想 觀點(diǎn): 1.統(tǒng)計(jì)學(xué)現(xiàn)在叫數(shù)據(jù)分析,社會(huì)...
    Jenaral閱讀 5,982評(píng)論 0 5
  • 昨天,在回家的路上,坐在車?yán)镉圃沼圃盏乜粗摹度龉衬墓适隆?,我被里面的?nèi)容深深吸引住了,盡管上學(xué)時(shí)...
    夜闌曉語閱讀 3,943評(píng)論 2 9
  • 一月四號(hào)的大沙有個(gè)想法。從昨晚到現(xiàn)在就一直圍繞在腦子里。或許深受那些小說的影響,或許真的就是我自己腦子或者精神么有...
    一個(gè)人的大沙閱讀 4,360評(píng)論 3 4
  • 記夢(mèng) 前記 他回國了,而事實(shí)上他其實(shí)從未來過。我不知道我們是如何交流的,但在夢(mèng)里沒有語言障礙。我時(shí)而是第三視角看著...
    江挽心閱讀 821評(píng)論 0 0

友情鏈接更多精彩內(nèi)容