非主鍵表

Overview #

如果一個(gè)表沒(méi)有定義主鍵,它就是一個(gè)追加表。與主鍵表相比,它不具有直接接收更改日志的能力。它不能通過(guò)upsert直接更新數(shù)據(jù)。它只能接收來(lái)自追加數(shù)據(jù)的傳入數(shù)據(jù)。

CREATE TABLE my_table (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT
) WITH (
    -- 'target-file-size' = '256 MB',
    -- 'file.format' = 'parquet',
    -- 'file.compression' = 'zstd',
    -- 'file.compression.zstd-level' = '3'
);

典型應(yīng)用場(chǎng)景下的批量寫(xiě)、批量讀,類(lèi)似于普通的Hive分區(qū)表,但與Hive表相比,可以帶來(lái):

  1. 對(duì)象存儲(chǔ)(S3、OSS)友好
  2. 時(shí)間旅行和回滾
  3. 低成本的DELETE / UPDATE
  4. 流式插入自動(dòng)合并小文件
  5. 像隊(duì)列一樣流式讀寫(xiě)
  6. 借助排序和索引高性能查詢(xún)

Streaming #

您可以通過(guò)Flink以一種非常靈活的方式流式寫(xiě)入Append表,或者通過(guò)Flink讀取Append表,將其像隊(duì)列一樣使用。唯一的區(qū)別是它的延遲是以分鐘為單位的。它的優(yōu)點(diǎn)是成本非常低,并且能夠向下推過(guò)濾器和投影。

Automatic small file merging #

在流寫(xiě)作業(yè)中,如果沒(méi)有桶定義,writer就沒(méi)有壓縮,而是使用Compact Coordinator掃描小文件,然后將壓縮任務(wù)傳遞給Compact Worker。在流模式下,如果你在flink中運(yùn)行insert sql,拓?fù)鋵⑹沁@樣的:


image.png

不用擔(dān)心背壓,壓縮從不背壓。

如果設(shè)置 write-only = true,則將在拓?fù)渲袆h除 Compact Coordinator 和 Compact Worker.

自動(dòng)壓縮僅在Flink引擎流模式下支持。您還可以在paimon中通過(guò)flink動(dòng)作啟動(dòng)一個(gè)壓縮作業(yè),并通過(guò)set write-only禁用所有其他壓縮作業(yè)。

Streaming Query #

您可以流式Append表,并像使用消息隊(duì)列一樣使用它。與主鍵表一樣,流式讀取有兩種選擇:

  1. 默認(rèn)情況下,流式讀取在第一次啟動(dòng)時(shí)生成表上的最新快照,并繼續(xù)讀取最新的增量記錄。
  2. 可以指定scan.mode or scan.snapshot-id 或 scan.timestamp-millis 或 scan.file-creation-time-millis ,只允許流式讀增量。

與flink-kafka類(lèi)似,默認(rèn)情況下順序是不保證的,如果你的數(shù)據(jù)有某種順序要求,你也需要考慮定義一個(gè)桶鍵,參見(jiàn) Bucketed Append


Query #

Data Skipping By Order #

默認(rèn)情況下,Paimon記錄清單文件中每個(gè)字段的最大值和最小值。

在查詢(xún)中,根據(jù)查詢(xún)的WHERE條件,根據(jù)manifest中的統(tǒng)計(jì)信息做文件過(guò)濾,如果過(guò)濾效果好,將查詢(xún)本來(lái)分鐘的查詢(xún)加速到毫秒級(jí)完成執(zhí)行。

通常數(shù)據(jù)分布并不總是有效的過(guò)濾,那么如果我們能在WHERE條件下按字段對(duì)數(shù)據(jù)進(jìn)行排序呢?您可以查看Flink COMPACT ActionFlink COMPACT ProcedureSpark COMPACT Procedure.

Data Skipping By File Index #

你也可以使用文件索引,它在讀取端通過(guò)索引過(guò)濾文件。

CREATE TABLE <PAIMON_TABLE> (<COLUMN> <COLUMN_TYPE> , ...) WITH (
    'file-index.bloom-filter.columns' = 'c1,c2',
    'file-index.bloom-filter.c1.items' = '200'
);

定義file-index.bloom-filter.columns,Paimon將為每個(gè)文件創(chuàng)建相應(yīng)的索引文件。如果索引文件太小,它將直接存儲(chǔ)在清單中,或者存儲(chǔ)在數(shù)據(jù)文件的目錄中。每個(gè)數(shù)據(jù)文件對(duì)應(yīng)一個(gè)索引文件,索引文件有一個(gè)單獨(dú)的文件定義,可以包含不同類(lèi)型的索引和多個(gè)列。

數(shù)據(jù)文件索引是某個(gè)數(shù)據(jù)文件對(duì)應(yīng)的外部索引文件。如果索引文件太小,它將直接存儲(chǔ)在清單中,否則存儲(chǔ)在數(shù)據(jù)文件的目錄中。每個(gè)數(shù)據(jù)文件對(duì)應(yīng)一個(gè)索引文件,索引文件有一個(gè)單獨(dú)的文件定義,可以包含不同類(lèi)型的索引和多個(gè)列。

不同的文件索引在不同的場(chǎng)景下可能是有效的。例如,在點(diǎn)查找場(chǎng)景中,bloom過(guò)濾器可以加快查詢(xún)速度。使用位圖可能會(huì)占用更多空間,但可以獲得更高的準(zhǔn)確性。

目前,文件索引僅支持在僅追加表中。

布隆過(guò)濾器:

  • file-index.bloom-filter.columns: 指定需要布隆過(guò)濾器索引的列。
  • file-index.bloom-filter.<column_name>.fpp 配置誤報(bào)概率。
  • file-index.bloom-filter.<column_name>.items 在一個(gè)數(shù)據(jù)文件中配置所期望的不同項(xiàng)。

位圖:
file-index.bitmap.columns: 指定需要位圖索引的列。

將支持更多的過(guò)濾器類(lèi)型…

如果您想在現(xiàn)有表中添加文件索引,而不需要重寫(xiě),您可以使用rewrite_file_index過(guò)程。在使用該過(guò)程之前,您應(yīng)該在目標(biāo)表中配置適當(dāng)?shù)呐渲?。您可以使用ALTER子句配置file-index.<filter-type>.columns到表。
如何調(diào)用:參見(jiàn) flink procedures


Update #

現(xiàn)在,只有Spark SQL支持DELETE和UPDATE,你可以看看Spark Write.
舉例:

DELETE FROM my_table WHERE currency = 'UNKNOWN';

更新追加表有兩種模式:

  1. COW (Copy on Write):搜索命中的文件,然后重寫(xiě)每個(gè)文件,從文件中刪除需要?jiǎng)h除的數(shù)據(jù)。這個(gè)操作代價(jià)很大。
  2. MOW(寫(xiě)合并):通過(guò)指定 'deletion-vectors.enabled' = 'true',則可以啟用刪除向量模式。只標(biāo)記相應(yīng)文件的某些記錄進(jìn)行刪除,并寫(xiě)入刪除文件,不重寫(xiě)整個(gè)文件。

Bucketed Append #

您可以定義bucket和bucket-key以獲得bucket附加表。
創(chuàng)建分桶追加表的示例:

CREATE TABLE my_table (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT
) WITH (
    'bucket' = '8',
    'bucket-key' = 'product_id'
);

Streaming #

一個(gè)普通的追加表對(duì)它的流寫(xiě)和讀沒(méi)有嚴(yán)格的順序保證,但在某些情況下,你需要定義一個(gè)類(lèi)似于Kafka的鍵。

同一bucket中的每條記錄都是嚴(yán)格排序的,流式讀取將按照寫(xiě)入的順序?qū)⒂涗泜鬏數(shù)较掠?。要使用這種模式,您不需要配置特殊配置,所有數(shù)據(jù)都將作為隊(duì)列放入一個(gè)bucket中。

image.png

Compaction in Bucket #

默認(rèn)情況下,匯聚節(jié)點(diǎn)將自動(dòng)執(zhí)行壓縮以控制文件數(shù)量。以下選項(xiàng)控制壓縮策略:

Key Default Type Description
write-only false Boolean 如果設(shè)置為true,將跳過(guò)壓縮和快照過(guò)期。此選項(xiàng)與專(zhuān)用緊湊作業(yè)一起使用。
compaction.min.file-num 5 Integer 對(duì)于文件集[f_0,…],f_N],滿(mǎn)足sum(size(f_i)) >= targetFileSize觸發(fā)追加表壓縮的最小文件數(shù)。這個(gè)值避免了幾乎整個(gè)文件被壓縮,因?yàn)閴嚎s是不劃算的。
compaction.max.file-num 5 Integer 對(duì)于文件集[f_0,…],f_N],觸發(fā)追加表壓縮的最大文件數(shù),即使sum(size(f_i)) < targetFileSize。此值可避免掛起過(guò)多的小文件,以免降低性能。
full-compaction.delta-commits (none) Integer 在增量提交之后,將不斷觸發(fā)完全壓縮。

Streaming Read Order #

對(duì)于流式讀取,記錄按以下順序生成:

  • 對(duì)于來(lái)自?xún)蓚€(gè)不同分區(qū)的任意兩條記錄
    • 如果scan.plan-sort-partition設(shè)置為true,將首先生成分區(qū)值較小的記錄。
    • 否則,將首先生成分區(qū)創(chuàng)建時(shí)間較早的記錄。
  • 對(duì)于來(lái)自同一分區(qū)和同一桶的任意兩條記錄,先寫(xiě)的記錄將先被生產(chǎn)。
  • 對(duì)于來(lái)自同一分區(qū)但不同桶的任意兩條記錄,不同桶由不同的任務(wù)處理,它們之間沒(méi)有順序保證。

Watermark Definition #

您可以為讀取Paimon表定義水印:

CREATE TABLE t (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);

-- launch a bounded streaming job to read paimon_table
SELECT window_start, window_end, COUNT(`user`) FROM TABLE(
 TUMBLE(TABLE t, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;

你也可以啟用Flink Watermark alignment,這將確保沒(méi)有源/分割/碎片/分區(qū)將其水印增加得太遠(yuǎn)。

Key Default Type Description
scan.watermark.alignment.group (none) String 一組用于對(duì)齊水印的源。
scan.watermark.alignment.max-drift (none) Duration 在我們暫停從源/任務(wù)/分區(qū)消費(fèi)之前,對(duì)齊水印的最大漂移。

Bounded Stream #

流源也可以有界,您可以指定‘ scan.bounded.watermark ’來(lái)定義有界流模式的結(jié)束條件,流讀取將結(jié)束,直到遇到更大的水印快照。

快照中的水印是由writer生成的,例如可以指定一個(gè)kafka源,并聲明水印的定義。當(dāng)使用這個(gè)kafka源對(duì)Paimon表進(jìn)行寫(xiě)操作時(shí),Paimon表的快照會(huì)生成相應(yīng)的水印,這樣流式讀取這個(gè)Paimon表時(shí)就可以使用有界水印的特性。

CREATE TABLE kafka_table (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka'...);

-- launch a streaming insert job
INSERT INTO paimon_table SELECT * FROM kakfa_table;

-- launch a bounded streaming job to read paimon_table
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;

Batch #

在批量查詢(xún)中,如果有必要,可以使用桶狀表來(lái)避免shuffle,例如,您可以使用以下Spark SQL來(lái)讀取Paimon表:

SET spark.sql.sources.v2.bucketing.enabled = true;

CREATE TABLE FACT_TABLE (order_id INT, f1 STRING) TBLPROPERTIES ('bucket'='10', 'bucket-key' = 'order_id');

CREATE TABLE DIM_TABLE (order_id INT, f2 STRING) TBLPROPERTIES ('bucket'='10', 'primary-key' = 'order_id');

SELECT * FROM FACT_TABLE JOIN DIM_TABLE on t1.order_id = t4.order_id;

‘spark.sql.sources.v2.bucketing.enabled’配置用于啟用V2數(shù)據(jù)源。打開(kāi)后,Spark將通過(guò)V2數(shù)據(jù)源的SupportsReportPartitioning識(shí)別特定的分布式報(bào)告,并在必要時(shí)嘗試避免shuffle。

如果兩個(gè)表具有相同的存儲(chǔ)桶策略和相同數(shù)量的存儲(chǔ)桶,則可以避免代價(jià)高昂的聯(lián)接shuffle。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容