Overview #
如果一個(gè)表沒(méi)有定義主鍵,它就是一個(gè)追加表。與主鍵表相比,它不具有直接接收更改日志的能力。它不能通過(guò)upsert直接更新數(shù)據(jù)。它只能接收來(lái)自追加數(shù)據(jù)的傳入數(shù)據(jù)。
CREATE TABLE my_table (
product_id BIGINT,
price DOUBLE,
sales BIGINT
) WITH (
-- 'target-file-size' = '256 MB',
-- 'file.format' = 'parquet',
-- 'file.compression' = 'zstd',
-- 'file.compression.zstd-level' = '3'
);
典型應(yīng)用場(chǎng)景下的批量寫(xiě)、批量讀,類(lèi)似于普通的Hive分區(qū)表,但與Hive表相比,可以帶來(lái):
- 對(duì)象存儲(chǔ)(S3、OSS)友好
- 時(shí)間旅行和回滾
- 低成本的DELETE / UPDATE
- 流式插入自動(dòng)合并小文件
- 像隊(duì)列一樣流式讀寫(xiě)
- 借助排序和索引高性能查詢(xún)
Streaming #
您可以通過(guò)Flink以一種非常靈活的方式流式寫(xiě)入Append表,或者通過(guò)Flink讀取Append表,將其像隊(duì)列一樣使用。唯一的區(qū)別是它的延遲是以分鐘為單位的。它的優(yōu)點(diǎn)是成本非常低,并且能夠向下推過(guò)濾器和投影。
Automatic small file merging #
在流寫(xiě)作業(yè)中,如果沒(méi)有桶定義,writer就沒(méi)有壓縮,而是使用Compact Coordinator掃描小文件,然后將壓縮任務(wù)傳遞給Compact Worker。在流模式下,如果你在flink中運(yùn)行insert sql,拓?fù)鋵⑹沁@樣的:

不用擔(dān)心背壓,壓縮從不背壓。
如果設(shè)置 write-only = true,則將在拓?fù)渲袆h除 Compact Coordinator 和 Compact Worker.
自動(dòng)壓縮僅在Flink引擎流模式下支持。您還可以在paimon中通過(guò)flink動(dòng)作啟動(dòng)一個(gè)壓縮作業(yè),并通過(guò)set write-only禁用所有其他壓縮作業(yè)。
Streaming Query #
您可以流式Append表,并像使用消息隊(duì)列一樣使用它。與主鍵表一樣,流式讀取有兩種選擇:
- 默認(rèn)情況下,流式讀取在第一次啟動(dòng)時(shí)生成表上的最新快照,并繼續(xù)讀取最新的增量記錄。
- 可以指定scan.mode or scan.snapshot-id 或 scan.timestamp-millis 或 scan.file-creation-time-millis ,只允許流式讀增量。
與flink-kafka類(lèi)似,默認(rèn)情況下順序是不保證的,如果你的數(shù)據(jù)有某種順序要求,你也需要考慮定義一個(gè)桶鍵,參見(jiàn) Bucketed Append
Query #
Data Skipping By Order #
默認(rèn)情況下,Paimon記錄清單文件中每個(gè)字段的最大值和最小值。
在查詢(xún)中,根據(jù)查詢(xún)的WHERE條件,根據(jù)manifest中的統(tǒng)計(jì)信息做文件過(guò)濾,如果過(guò)濾效果好,將查詢(xún)本來(lái)分鐘的查詢(xún)加速到毫秒級(jí)完成執(zhí)行。
通常數(shù)據(jù)分布并不總是有效的過(guò)濾,那么如果我們能在WHERE條件下按字段對(duì)數(shù)據(jù)進(jìn)行排序呢?您可以查看Flink COMPACT Action 或 Flink COMPACT Procedure 或 Spark COMPACT Procedure.
Data Skipping By File Index #
你也可以使用文件索引,它在讀取端通過(guò)索引過(guò)濾文件。
CREATE TABLE <PAIMON_TABLE> (<COLUMN> <COLUMN_TYPE> , ...) WITH (
'file-index.bloom-filter.columns' = 'c1,c2',
'file-index.bloom-filter.c1.items' = '200'
);
定義file-index.bloom-filter.columns,Paimon將為每個(gè)文件創(chuàng)建相應(yīng)的索引文件。如果索引文件太小,它將直接存儲(chǔ)在清單中,或者存儲(chǔ)在數(shù)據(jù)文件的目錄中。每個(gè)數(shù)據(jù)文件對(duì)應(yīng)一個(gè)索引文件,索引文件有一個(gè)單獨(dú)的文件定義,可以包含不同類(lèi)型的索引和多個(gè)列。
數(shù)據(jù)文件索引是某個(gè)數(shù)據(jù)文件對(duì)應(yīng)的外部索引文件。如果索引文件太小,它將直接存儲(chǔ)在清單中,否則存儲(chǔ)在數(shù)據(jù)文件的目錄中。每個(gè)數(shù)據(jù)文件對(duì)應(yīng)一個(gè)索引文件,索引文件有一個(gè)單獨(dú)的文件定義,可以包含不同類(lèi)型的索引和多個(gè)列。
不同的文件索引在不同的場(chǎng)景下可能是有效的。例如,在點(diǎn)查找場(chǎng)景中,bloom過(guò)濾器可以加快查詢(xún)速度。使用位圖可能會(huì)占用更多空間,但可以獲得更高的準(zhǔn)確性。
目前,文件索引僅支持在僅追加表中。
布隆過(guò)濾器:
- file-index.bloom-filter.columns: 指定需要布隆過(guò)濾器索引的列。
- file-index.bloom-filter.<column_name>.fpp 配置誤報(bào)概率。
- file-index.bloom-filter.<column_name>.items 在一個(gè)數(shù)據(jù)文件中配置所期望的不同項(xiàng)。
位圖:
file-index.bitmap.columns: 指定需要位圖索引的列。
將支持更多的過(guò)濾器類(lèi)型…
如果您想在現(xiàn)有表中添加文件索引,而不需要重寫(xiě),您可以使用rewrite_file_index過(guò)程。在使用該過(guò)程之前,您應(yīng)該在目標(biāo)表中配置適當(dāng)?shù)呐渲?。您可以使用ALTER子句配置file-index.<filter-type>.columns到表。
如何調(diào)用:參見(jiàn) flink procedures
Update #
現(xiàn)在,只有Spark SQL支持DELETE和UPDATE,你可以看看Spark Write.
舉例:
DELETE FROM my_table WHERE currency = 'UNKNOWN';
更新追加表有兩種模式:
- COW (Copy on Write):搜索命中的文件,然后重寫(xiě)每個(gè)文件,從文件中刪除需要?jiǎng)h除的數(shù)據(jù)。這個(gè)操作代價(jià)很大。
- MOW(寫(xiě)合并):通過(guò)指定 'deletion-vectors.enabled' = 'true',則可以啟用刪除向量模式。只標(biāo)記相應(yīng)文件的某些記錄進(jìn)行刪除,并寫(xiě)入刪除文件,不重寫(xiě)整個(gè)文件。
Bucketed Append #
您可以定義bucket和bucket-key以獲得bucket附加表。
創(chuàng)建分桶追加表的示例:
CREATE TABLE my_table (
product_id BIGINT,
price DOUBLE,
sales BIGINT
) WITH (
'bucket' = '8',
'bucket-key' = 'product_id'
);
Streaming #
一個(gè)普通的追加表對(duì)它的流寫(xiě)和讀沒(méi)有嚴(yán)格的順序保證,但在某些情況下,你需要定義一個(gè)類(lèi)似于Kafka的鍵。
同一bucket中的每條記錄都是嚴(yán)格排序的,流式讀取將按照寫(xiě)入的順序?qū)⒂涗泜鬏數(shù)较掠?。要使用這種模式,您不需要配置特殊配置,所有數(shù)據(jù)都將作為隊(duì)列放入一個(gè)bucket中。

Compaction in Bucket #
默認(rèn)情況下,匯聚節(jié)點(diǎn)將自動(dòng)執(zhí)行壓縮以控制文件數(shù)量。以下選項(xiàng)控制壓縮策略:
| Key | Default | Type | Description |
|---|---|---|---|
| write-only | false | Boolean | 如果設(shè)置為true,將跳過(guò)壓縮和快照過(guò)期。此選項(xiàng)與專(zhuān)用緊湊作業(yè)一起使用。 |
| compaction.min.file-num | 5 | Integer | 對(duì)于文件集[f_0,…],f_N],滿(mǎn)足sum(size(f_i)) >= targetFileSize觸發(fā)追加表壓縮的最小文件數(shù)。這個(gè)值避免了幾乎整個(gè)文件被壓縮,因?yàn)閴嚎s是不劃算的。 |
| compaction.max.file-num | 5 | Integer | 對(duì)于文件集[f_0,…],f_N],觸發(fā)追加表壓縮的最大文件數(shù),即使sum(size(f_i)) < targetFileSize。此值可避免掛起過(guò)多的小文件,以免降低性能。 |
| full-compaction.delta-commits | (none) | Integer | 在增量提交之后,將不斷觸發(fā)完全壓縮。 |
Streaming Read Order #
對(duì)于流式讀取,記錄按以下順序生成:
- 對(duì)于來(lái)自?xún)蓚€(gè)不同分區(qū)的任意兩條記錄
- 如果scan.plan-sort-partition設(shè)置為true,將首先生成分區(qū)值較小的記錄。
- 否則,將首先生成分區(qū)創(chuàng)建時(shí)間較早的記錄。
- 對(duì)于來(lái)自同一分區(qū)和同一桶的任意兩條記錄,先寫(xiě)的記錄將先被生產(chǎn)。
- 對(duì)于來(lái)自同一分區(qū)但不同桶的任意兩條記錄,不同桶由不同的任務(wù)處理,它們之間沒(méi)有順序保證。
Watermark Definition #
您可以為讀取Paimon表定義水印:
CREATE TABLE t (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
-- launch a bounded streaming job to read paimon_table
SELECT window_start, window_end, COUNT(`user`) FROM TABLE(
TUMBLE(TABLE t, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;
你也可以啟用Flink Watermark alignment,這將確保沒(méi)有源/分割/碎片/分區(qū)將其水印增加得太遠(yuǎn)。
| Key | Default | Type | Description |
|---|---|---|---|
| scan.watermark.alignment.group | (none) | String | 一組用于對(duì)齊水印的源。 |
| scan.watermark.alignment.max-drift | (none) | Duration | 在我們暫停從源/任務(wù)/分區(qū)消費(fèi)之前,對(duì)齊水印的最大漂移。 |
Bounded Stream #
流源也可以有界,您可以指定‘ scan.bounded.watermark ’來(lái)定義有界流模式的結(jié)束條件,流讀取將結(jié)束,直到遇到更大的水印快照。
快照中的水印是由writer生成的,例如可以指定一個(gè)kafka源,并聲明水印的定義。當(dāng)使用這個(gè)kafka源對(duì)Paimon表進(jìn)行寫(xiě)操作時(shí),Paimon表的快照會(huì)生成相應(yīng)的水印,這樣流式讀取這個(gè)Paimon表時(shí)就可以使用有界水印的特性。
CREATE TABLE kafka_table (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka'...);
-- launch a streaming insert job
INSERT INTO paimon_table SELECT * FROM kakfa_table;
-- launch a bounded streaming job to read paimon_table
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;
Batch #
在批量查詢(xún)中,如果有必要,可以使用桶狀表來(lái)避免shuffle,例如,您可以使用以下Spark SQL來(lái)讀取Paimon表:
SET spark.sql.sources.v2.bucketing.enabled = true;
CREATE TABLE FACT_TABLE (order_id INT, f1 STRING) TBLPROPERTIES ('bucket'='10', 'bucket-key' = 'order_id');
CREATE TABLE DIM_TABLE (order_id INT, f2 STRING) TBLPROPERTIES ('bucket'='10', 'primary-key' = 'order_id');
SELECT * FROM FACT_TABLE JOIN DIM_TABLE on t1.order_id = t4.order_id;
‘spark.sql.sources.v2.bucketing.enabled’配置用于啟用V2數(shù)據(jù)源。打開(kāi)后,Spark將通過(guò)V2數(shù)據(jù)源的SupportsReportPartitioning識(shí)別特定的分布式報(bào)告,并在必要時(shí)嘗試避免shuffle。
如果兩個(gè)表具有相同的存儲(chǔ)桶策略和相同數(shù)量的存儲(chǔ)桶,則可以避免代價(jià)高昂的聯(lián)接shuffle。