Spark引擎

Quick Start #

Preparation #

Paimon目前支持Spark 3.5、3.4、3.3、3.2和3.1。為了獲得更好的體驗(yàn),我們建議使用最新的Spark版本。

下載對(duì)應(yīng)版本的jar文件。

Version Jar
Spark 3.5 paimon-spark-3.5-0.9.0.jar
Spark 3.4 paimon-spark-3.4-0.9.0.jar
Spark 3.3 paimon-spark-3.3-0.9.0.jar
Spark 3.2 paimon-spark-3.2-0.9.0.jar
Spark 3.1 paimon-spark-3.1-0.9.0.jar

您也可以從源代碼手動(dòng)構(gòu)建綁定的jar。

要從源代碼構(gòu)建,請(qǐng)克隆git存儲(chǔ)庫(kù).

使用以下命令構(gòu)建綁定的jar。

mvn clean install -DskipTests

對(duì)于Spark 3.3,你可以在。/paimon-spark/paimon-spark-3.3/target/paimon-spark-3.3-0.9.0.jar中找到捆綁的jar包。

Setup #

如果您使用的是HDFS,請(qǐng)確保設(shè)置了環(huán)境變量HADOOP_HOME或HADOOP_CONF_DIR。

Step 1: Specify Paimon Jar File
在啟動(dòng)spark-sql時(shí),將paimon jar文件的路徑附加到——jars參數(shù)中。

spark-sql ... --jars /path/to/paimon-spark-3.3-0.9.0.jar

或者使用——packages選項(xiàng)。

spark-sql ... --packages org.apache.paimon:paimon-spark-3.3:0.9.0

或者,您可以復(fù)制paimon-spark-3.3-0.9.0.jar到spark安裝目錄下的spark/jars下。

Step 2: Specify Paimon Catalog

Catalog:
在啟動(dòng)Spark -sql時(shí),使用以下命令以Paimon的名稱注冊(cè)Paimon的Spark目錄。倉(cāng)庫(kù)的表文件存儲(chǔ)在/tmp/paimon下。

spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=file:/tmp/paimon \
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

使用spark.sql.catalog.(catalog_name)下的屬性配置目錄。在上面的例子中,‘ paimon ’是目錄名,您可以將其更改為您自己喜歡的目錄名。

在spark-sql命令行啟動(dòng)后,運(yùn)行以下SQL創(chuàng)建并切換到數(shù)據(jù)庫(kù)default。

USE paimon;
USE default;

切換到目錄('USE paimon')后,Spark的現(xiàn)有表將不能直接訪問(wèn),您可以使用spark_catalog.{database_name}。{table_name}訪問(wèn)Spark表。

Create Table #

Catalog:

create table my_table (
    k int,
    v string
) tblproperties (
    'primary-key' = 'k'
);

Insert Table #

Paimon currently supports Spark 3.2+ for SQL write.

INSERT INTO my_table VALUES (1, 'Hi'), (2, 'Hello');

Query Table #

SQL:

SELECT * FROM my_table;

/*
1   Hi
2   Hello
*/

Spark Type Conversion #

本節(jié)列出了Spark和Paimon之間所有支持的類型轉(zhuǎn)換。所有Spark的數(shù)據(jù)類型都可以在org.apache.spark.sql.types包中找到。

Spark Data Type Paimon Data Type Atomic Type
StructType RowType false
MapType MapType false
ArrayType ArrayType false
BooleanType BooleanType true
ByteType TinyIntType true
ShortType SmallIntType true
IntegerType IntType true
LongType BigIntType true
FloatType FloatType true
DoubleType DoubleType true
StringType VarCharType(Integer.MAX_VALUE) true
VarCharType (length) VarCharType(length) true
CharType (length) CharType(length) true
DateType DateType true
TimestampType LocalZonedTimestamp true
TimestampNTZType (Spark3.4+) TimestampType true
DecimalType (precision, scale) DecimalType(precision, scale) true
BinaryType VarBinaryType, BinaryType true
由于之前的設(shè)計(jì),在Spark3.3及以下版本中,Paimon會(huì)將Paimon的TimestampType和LocalZonedTimestamp映射到Spark的TimestampType,并且只正確處理TimestampType。 
 
因此,當(dāng)使用Spark3.3及以下版本時(shí),讀取其他引擎(如Flink)編寫的LocalZonedTimestamp類型的Paimon表時(shí),LocalZonedTimestamp類型的查詢結(jié)果會(huì)有時(shí)區(qū)偏移,需要手動(dòng)調(diào)整。

當(dāng)使用Spark3.4及以上版本時(shí),所有時(shí)間戳類型都可以正確解析。

SQL DDL #

Create Catalog #

Paimon catalog目前支持三種類型的元數(shù)據(jù)存儲(chǔ):

  • filesystem metastore (default), 它在文件系統(tǒng)中存儲(chǔ)元數(shù)據(jù)和表文件。
  • hive metastore, 它還將元數(shù)據(jù)存儲(chǔ)在Hive metastore中。用戶可以直接從Hive訪問(wèn)這些表。
  • jdbc metastore, 它額外地將元數(shù)據(jù)存儲(chǔ)在關(guān)系數(shù)據(jù)庫(kù)中,如MySQL, Postgres等。

有關(guān)創(chuàng)建編目時(shí)的詳細(xì)選項(xiàng),請(qǐng)參閱CatalogOptions。

Create Filesystem Catalog #

下面的Spark SQL注冊(cè)并使用一個(gè)名為my_catalog的Paimon編目。元數(shù)據(jù)和表文件存放在hdfs:///path/to/warehouse下。

下面的shell命令注冊(cè)一個(gè)名為paimon的paimon目錄。元數(shù)據(jù)和表文件存放在hdfs:///path/to/warehouse下。

spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse

對(duì)于在catalog中創(chuàng)建的表,您可以使用前綴spark.sql.catalog.paimon.table-default定義任何默認(rèn)的表選項(xiàng)。

啟動(dòng)spark-sql之后,可以使用下面的SQL切換到paimon編目的default數(shù)據(jù)庫(kù)。

USE paimon.default;

Creating Hive Catalog #

通過(guò)使用Paimon Hive catalog,對(duì)catalog的更改將直接影響到相應(yīng)的Hive metastore。在這樣的目錄中創(chuàng)建的表也可以直接從Hive訪問(wèn)。

要使用Hive catalog, Database name, Table name和Field name應(yīng)該是小寫的。

您的Spark安裝應(yīng)該能夠檢測(cè)或已經(jīng)包含Hive依賴項(xiàng)。更多信息請(qǐng)看這里。

下面的shell命令注冊(cè)一個(gè)名為Paimon的Paimon Hive目錄。元數(shù)據(jù)和表文件存放在hdfs:///path/to/warehouse下。此外,元數(shù)據(jù)也存儲(chǔ)在Hive metastore中。

spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \
    --conf spark.sql.catalog.paimon.metastore=hive \
    --conf spark.sql.catalog.paimon.uri=thrift://<hive-metastore-host-name>:<port>

您可以使用前綴spark.sql.catalog.paimon.table-default定義任何默認(rèn)的表選項(xiàng)。對(duì)于在目錄中創(chuàng)建的表。

啟動(dòng)spark-sql之后,可以使用下面的SQL切換到paimon編目的默認(rèn)數(shù)據(jù)庫(kù)。

USE paimon.default;

另外,您可以創(chuàng)建SparkGenericCatalog.。

Synchronizing Partitions into Hive Metastore #

默認(rèn)情況下,Paimon不會(huì)將新創(chuàng)建的分區(qū)同步到Hive metastore。用戶將在Hive中看到一個(gè)未分區(qū)的表。分區(qū)下推將改為過(guò)濾器下推。

如果您想在Hive中看到一個(gè)分區(qū)表,并將新創(chuàng)建的分區(qū)同步到Hive metastore中,請(qǐng)?jiān)O(shè)置表屬性metastore.partitioned-table為true。參見CoreOptions

Creating JDBC Catalog #

通過(guò)使用Paimon JDBC編目,對(duì)編目的更改將直接存儲(chǔ)在SQLite、MySQL、postgres等關(guān)系數(shù)據(jù)庫(kù)中。

目前,鎖配置只支持MySQL和SQLite。如果您使用不同類型的數(shù)據(jù)庫(kù)進(jìn)行目錄存儲(chǔ),請(qǐng)不要配置lock.enabled。

Spark中的Paimon JDBC Catalog需要正確添加相應(yīng)的連接數(shù)據(jù)庫(kù)的jar包。您應(yīng)該首先下載JDBC連接器綁定的jar并將其添加到類路徑中。例如MySQL, postgres

database type Bundle Name SQL Client JAR
mysql mysql-connector-java Download
postgres postgresql Download
spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \
    --conf spark.sql.catalog.paimon.metastore=jdbc \
    --conf spark.sql.catalog.paimon.uri=jdbc:mysql://<host>:<port>/<databaseName> \
    --conf spark.sql.catalog.paimon.jdbc.user=... \
    --conf spark.sql.catalog.paimon.jdbc.password=...
USE paimon.default;

Create Table #

在使用Paimon catalog之后,您可以創(chuàng)建和刪除表。在Paimon Catalogs中創(chuàng)建的表由編目管理。當(dāng)表從目錄中刪除時(shí),它的表文件也將被刪除。

下面的SQL假設(shè)您已經(jīng)注冊(cè)并正在使用Paimon編目。它在目錄的默認(rèn)數(shù)據(jù)庫(kù)中創(chuàng)建一個(gè)名為my_table的托管表,其中有五列,其中dt、hh和user_id是主鍵。

CREATE TABLE my_table (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);

您可以創(chuàng)建分區(qū)表:

CREATE TABLE my_table (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);

Create Table As Select #

表可以通過(guò)查詢的結(jié)果來(lái)創(chuàng)建和填充,例如,我們有這樣一個(gè)sql: CREATE Table table_b AS SELECT id, name FORM table_a,結(jié)果表table_b相當(dāng)于用下面的語(yǔ)句創(chuàng)建表并插入數(shù)據(jù):INSERT INTO table_b FROM table_a

當(dāng)使用CREATE TABLE作為SELECT時(shí),我們可以指定主鍵或分區(qū),語(yǔ)法請(qǐng)參考下面的sql。

CREATE TABLE my_table (
     user_id BIGINT,
     item_id BIGINT
);
CREATE TABLE my_table_as AS SELECT * FROM my_table;

/* partitioned table*/
CREATE TABLE my_table_partition (
      user_id BIGINT,
      item_id BIGINT,
      behavior STRING,
      dt STRING,
      hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE my_table_partition_as PARTITIONED BY (dt) AS SELECT * FROM my_table_partition;

/* change TBLPROPERTIES */
CREATE TABLE my_table_options (
       user_id BIGINT,
       item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
CREATE TABLE my_table_options_as TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM my_table_options;


/* primary key */
CREATE TABLE my_table_pk (
     user_id BIGINT,
     item_id BIGINT,
     behavior STRING,
     dt STRING,
     hh STRING
) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_pk_as TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM my_table_pk;

/* primary key + partition */
CREATE TABLE my_table_all (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
    'primary-key' = 'dt,hh,user_id'
);
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all;

SQL Write #

Syntax #

INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query };

有關(guān)更多信息,請(qǐng)查看語(yǔ)法文檔:
Spark INSERT Statement

INSERT INTO #

使用INSERT INTO將記錄和更改應(yīng)用到表中。

INSERT INTO my_table SELECT ...

Overwriting the Whole Table #

使用INSERT OVERWRITE覆蓋整個(gè)未分區(qū)表。

INSERT OVERWRITE my_table SELECT ...

Overwriting a Partition #

使用INSERT OVERWRITE覆蓋分區(qū)。

INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...

Dynamic Overwrite #

Spark默認(rèn)的覆蓋模式是靜態(tài)分區(qū)覆蓋。要啟用動(dòng)態(tài)覆蓋,需要將Spark會(huì)話配置spark.sql.sources. partitionoverwritemode設(shè)置為dynamic
舉例:

CREATE TABLE my_table (id INT, pt STRING) PARTITIONED BY (pt);
INSERT INTO my_table VALUES (1, 'p1'), (2, 'p2');

-- Static overwrite (Overwrite the whole table)
INSERT OVERWRITE my_table VALUES (3, 'p1');

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
|  3| p1|
+---+---+
*/

-- Dynamic overwrite (Only overwrite pt='p1')
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE my_table VALUES (3, 'p1');

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
|  2| p2|
|  3| p1|
+---+---+
*/

Truncate tables #

TRUNCATE TABLE my_table;

Updating tables #

spark支持更新PrimitiveType和StructType,例如:

-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;

CREATE TABLE t (
  id INT, 
  s STRUCT<c1: INT, c2: STRING>, 
  name STRING)
TBLPROPERTIES (
  'primary-key' = 'id', 
  'merge-engine' = 'deduplicate'
);

-- you can use
UPDATE t SET name = 'a_new' WHERE id = 1;
UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;

Deleting from table #

DELETE FROM my_table WHERE currency = 'UNKNOWN';

Merging into table #

Paimon目前支持Spark 3+中的Merge Into語(yǔ)法,它允許在一次提交中基于源表進(jìn)行一系列更新、插入和刪除。

  1. 這只適用于主鍵表。
  2. 在update子句中不支持更新主鍵列。
  3. 不支持NOT MATCHED BY SOURCE語(yǔ)法。

Example: One
這是一個(gè)簡(jiǎn)單的演示,如果目標(biāo)表中存在一行,則更新它,否則插入它。


-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *

Example: Two
這是一個(gè)包含多個(gè)條件子句的示例。

-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.

MERGE INTO target
USING source
ON target.a = source.a
WHEN MATCHED AND target.a = 5 THEN
   UPDATE SET b = source.b + target.b      -- when matched and meet the condition 1, then update b;
WHEN MATCHED AND source.c > 'c2' THEN
   UPDATE SET *    -- when matched and meet the condition 2, then update all the columns;
WHEN MATCHED THEN
   DELETE      -- when matched, delete this row in target table;
WHEN NOT MATCHED AND c > 'c9' THEN
   INSERT (a, b, c) VALUES (a, b * 1.1, c)      -- when not matched but meet the condition 3, then transform and insert this row;
WHEN NOT MATCHED THEN
INSERT *      -- when not matched, insert this row without any transformation;

Streaming Write #

Paimon目前支持Spark 3+流寫入。
Paimon結(jié)構(gòu)化流只支持追加和完成兩種模式。

// Create a paimon table if not exists.
spark.sql(s"""
           |CREATE TABLE T (k INT, v STRING)
           |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
           |""".stripMargin)

// Here we use MemoryStream to fake a streaming source.
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")

// Streaming Write to paimon table.
val stream = df
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", "/path/to/checkpoint")
  .format("paimon")
  .start("/path/to/paimon/sink/table")

Schema Evolution #

模式演化是一種允許用戶輕松修改表的當(dāng)前模式以適應(yīng)現(xiàn)有數(shù)據(jù)或隨時(shí)間變化的新數(shù)據(jù)的特性,同時(shí)保持?jǐn)?shù)據(jù)的完整性和一致性。

Paimon支持在寫入數(shù)據(jù)時(shí)自動(dòng)合并源數(shù)據(jù)和當(dāng)前表數(shù)據(jù)的模式,并將合并后的模式用作表的最新模式,只需要配置write.merge-schema即可。

data.write
  .format("paimon")
  .mode("append")
  .option("write.merge-schema", "true")
  .save(location)

當(dāng)啟用write.merge-schema時(shí),Paimon默認(rèn)允許用戶對(duì)表模式執(zhí)行以下操作:

  1. 添加列
  2. 擴(kuò)大(提升)轉(zhuǎn)換類型(例如:Int ->Long)

Paimon還支持特定類型之間的顯式類型轉(zhuǎn)換(例如String -> Date, Long -> Int),它需要顯式配置write.merge-schema.explicit-cast。

模式演化可以同時(shí)用于流模式。

val inputData = MemoryStream[(Int, String)]
inputData
  .toDS()
  .toDF("col1", "col2")
  .writeStream
  .format("paimon")
  .option("checkpointLocation", "/path/to/checkpoint")
  .option("write.merge-schema", "true")
  .option("write.merge-schema.explicit-cast", "true")
  .start(location)

下面列出了配置。
|Scan Mode | Description |
|write.merge-schema |如果為true,則在寫入數(shù)據(jù)前自動(dòng)合并數(shù)據(jù)模式和表模式。 |
|write.merge-schema.explicit-cast |如果為true,則允許合并數(shù)據(jù)類型,如果兩種類型滿足顯式強(qiáng)制轉(zhuǎn)換的規(guī)則。 |


SQL Query #

與所有其他表一樣,可以使用SELECT語(yǔ)句查詢Paimon表。

Batch Query #

Paimon的批處理讀取返回表快照中的所有數(shù)據(jù)。默認(rèn)情況下,批處理讀取返回最新的快照。

Batch Time Travel #

帶時(shí)間旅行的Paimon批讀可以指定一個(gè)快照或一個(gè)標(biāo)簽,并讀取相應(yīng)的數(shù)據(jù)。

需要Spark 3.3+。

你可以在查詢中使用VERSION AS OF和TIMESTAMP AS OF來(lái)進(jìn)行時(shí)間旅行:

-- read the snapshot with id 1L (use snapshot id as version)
SELECT * FROM t VERSION AS OF 1;

-- read the snapshot from specified timestamp 
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';

-- read the snapshot from specified timestamp in unix seconds
SELECT * FROM t TIMESTAMP AS OF 1678883047;

-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';

-- read the snapshot from specified watermark. will match the first snapshot after the watermark
SELECT * FROM t VERSION AS OF 'watermark-1678883047356';

如果標(biāo)簽的名稱是一個(gè)數(shù)字,并且等于快照id,則VERSION AS OF語(yǔ)法將首先考慮標(biāo)簽。例如,如果基于快照2有一個(gè)名為“1”的標(biāo)簽,語(yǔ)句SELECT * FROM t VERSION AS OF ‘1’實(shí)際上查詢快照2而不是快照1。

Batch Incremental #

讀取開始快照(不包含)和結(jié)束快照之間的增量變化。
舉例:

  • “5,10”表示快照5和快照10之間的變化。
  • ‘ TAG1,TAG3 ’表示TAG1和TAG3之間的變化。

默認(rèn)情況下,將掃描changelog文件以查找生成changelog文件的表。否則,掃描新修改的文件。你也可以強(qiáng)制指定“incremental-between-scan-mode”。

需要Spark 3.2+。

Paimon支持使用Spark SQL進(jìn)行增量查詢,而增量查詢是由Spark表值函數(shù)實(shí)現(xiàn)的。

你可以使用query中的paimon_incremental_query來(lái)提取增量數(shù)據(jù):

-- read the incremental data between snapshot id 12 and snapshot id 20.
SELECT * FROM paimon_incremental_query('tableName', 12, 20);

在批處理SQL中,不允許返回DELETE記錄,因此將刪除-D的記錄。如果要查看DELETE記錄,可以查詢audit_log表。

Streaming Query #

Paimon目前支持Spark 3.3+流式讀取。
Paimon支持流式讀取的富掃描模式。這里有一個(gè)列表:

Scan Mode Description
latest 對(duì)于流源,連續(xù)地讀取最新的更改,而不是在開始時(shí)生成快照。
latest-full 對(duì)于流源,在第一次啟動(dòng)時(shí)在表上生成最新的快照,并繼續(xù)讀取最新的更改。
from-timestamp 對(duì)于流源,從指定為“scan.timestamp-millis”的時(shí)間戳開始連續(xù)讀取更改,而不會(huì)在開始時(shí)生成快照。
from-snapshot 對(duì)于流源,從“scan.snapshot-id”指定的快照開始連續(xù)讀取更改,不開始生成快照。
from-snapshot-full 對(duì)于流源,在第一次啟動(dòng)時(shí)從表上指定的“scan.snapshot-id”快照生成,并持續(xù)讀取更改。
default 如果指定了“scan.snapshot-id”,則相當(dāng)于from-snapshot。如果指定了timestamp-millis,則相當(dāng)于from-timestamp?;蛘?,它相當(dāng)于latest-full。

一個(gè)具有默認(rèn)掃描模式的簡(jiǎn)單示例:

// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
  .format("paimon")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

Paimon結(jié)構(gòu)化流還支持多種流讀模式,它可以支持許多觸發(fā)器和許多讀限制。
支持以下讀限制:

Key Default Type Description
read.stream.maxFilesPerTrigger (none) Integer 在單個(gè)批處理中返回的最大文件數(shù)。
read.stream.maxBytesPerTrigger (none) Long 在單個(gè)批處理中返回的最大字節(jié)數(shù)。
read.stream.maxRowsPerTrigger (none) Long 在單個(gè)批處理中返回的最大行數(shù)。
read.stream.minRowsPerTrigger (none) Long 單個(gè)批處理中返回的最小行數(shù),用于與read.stream.maxTriggerDelayMs一起創(chuàng)建MinRowsReadLimit。
read.stream.maxTriggerDelayMs (none) Long 兩個(gè)相鄰批之間的最大延遲,用于與read.stream.minRowsPerTrigger一起創(chuàng)建MinRowsReadLimit。

舉例: 1
使用paimon定義的org.apache.spark.sql.streaming.Trigger.AvailableNow()和maxBytesPerTrigger

// Trigger.AvailableNow()) processes all available data at the start
// of the query in one or multiple batches, then terminates the query.
// That set read.stream.maxBytesPerTrigger to 128M means that each
// batch processes a maximum of 128 MB of data.
val query = spark.readStream
  .format("paimon")
  .option("read.stream.maxBytesPerTrigger", "134217728")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

舉例: 2
使用 org.apache.spark.sql.connector.read.streaming.ReadMinRows.

// It will not trigger a batch until there are more than 5,000 pieces of data,
// unless the interval between the two batches is more than 300 seconds.
val query = spark.readStream
  .format("paimon")
  .option("read.stream.minRowsPerTrigger", "5000")
  .option("read.stream.maxTriggerDelayMs", "300000")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

Paimon結(jié)構(gòu)化流支持以更改日志的形式讀取行(在行中添加rowkind列來(lái)表示其更改類型),有兩種方式:

  • 直接流式讀取系統(tǒng)audit_log表
  • 將 read.changelog設(shè)置為true(默認(rèn)為false),然后流式讀取表位置
    舉例:
// Option 1
val query1 = spark.readStream
  .format("paimon")
  .table("`table_name$audit_log`")
  .writeStream
  .format("console")
  .start()

// Option 2
val query2 = spark.readStream
  .format("paimon")
  .option("read.changelog", "true")
  .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

/*
+I   1  Hi
+I   2  Hello
*/

Query Optimization #

強(qiáng)烈建議與查詢一起指定分區(qū)和主鍵過(guò)濾器,這將加快查詢的數(shù)據(jù)跳過(guò)。
可以加速數(shù)據(jù)跳轉(zhuǎn)的過(guò)濾函數(shù)有:

  • =
  • <
  • <=
  • >
  • >=
  • IN (...)
  • LIKE 'abc%'
  • IS NULL

Paimon將按主鍵對(duì)數(shù)據(jù)進(jìn)行排序,這加快了點(diǎn)查詢和范圍查詢的速度。當(dāng)使用復(fù)合主鍵時(shí),查詢過(guò)濾器最好在主鍵的最左邊形成一個(gè)前綴,以獲得良好的加速。

假設(shè)一個(gè)表具有以下規(guī)格:

CREATE TABLE orders (
    catalog_id BIGINT,
    order_id BIGINT,
    .....,
) TBLPROPERTIES (
    'primary-key' = 'catalog_id,order_id'
);

通過(guò)為主鍵的最左邊的前綴指定一個(gè)范圍過(guò)濾器,查詢可以獲得很好的加速。

SELECT * FROM orders WHERE catalog_id=1025;

SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;

SELECT * FROM orders
  WHERE catalog_id=1025
  AND order_id>2035 AND order_id<6000;

但是,下面的過(guò)濾器不能很好地加速查詢。

SELECT * FROM orders WHERE order_id=29495;

SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;

Altering Tables #

Changing/Adding Table Properties #

下面的SQL將寫緩沖區(qū)大小的表屬性設(shè)置為256 MB。

ALTER TABLE my_table SET TBLPROPERTIES (
    'write-buffer-size' = '256 MB'
);

Removing Table Properties #

下面的SQL刪除了write-buffer-size的表屬性。

ALTER TABLE my_table UNSET TBLPROPERTIES ('write-buffer-size');

Changing/Adding Table Comment #

下面的SQL將表my_table的注釋更改為表注釋。

ALTER TABLE my_table SET TBLPROPERTIES (
    'comment' = 'table comment'
    );

Removing Table Comment #

下面的SQL刪除表注釋。

ALTER TABLE my_table UNSET TBLPROPERTIES ('comment');

Rename Table Name #

下面的SQL將表名重命名為新名稱。
最簡(jiǎn)單的sql調(diào)用是:

ALTER TABLE my_table RENAME TO my_table_new;

注意:我們可以在spark中這樣重命名paimon表:

ALTER TABLE [catalog.[database.]]test1 RENAME to [database.]test2;

但是我們不能把目錄名放在重命名表之前,如果我們這樣寫sql,它會(huì)拋出一個(gè)錯(cuò)誤:

ALTER TABLE catalog.database.test1 RENAME to catalog.database.test2;

如果您使用對(duì)象存儲(chǔ),如S3或OSS,請(qǐng)謹(jǐn)慎使用此語(yǔ)法,因?yàn)閷?duì)象存儲(chǔ)的重命名不是原子性的,在失敗的情況下可能只會(huì)移動(dòng)部分文件。

Adding New Columns #

下面的SQL將兩列c1和c2添加到表my_table中。

ALTER TABLE my_table ADD COLUMNS (
    c1 INT,
    c2 STRING
);

Renaming Column Name #

下面的SQL將表my_table中的列c0重命名為c1。

ALTER TABLE my_table RENAME COLUMN c0 TO c1;

Dropping Columns #

下面的SQL從表my_table中刪除兩列c1和c2。

ALTER TABLE my_table DROP COLUMNS (c1, c2);

Dropping Partitions #

下面的SQL刪除paimon表的分區(qū)。對(duì)于spark sql,您需要指定所有分區(qū)列。

ALTER TABLE my_table DROP PARTITION (`id` = 1, `name` = 'paimon');

Changing Column Comment #

下面的SQL將列buy_count的注釋更改為購(gòu)買計(jì)數(shù)。

ALTER TABLE my_table ALTER COLUMN buy_count COMMENT 'buy count';

Adding Column Position #

ALTER TABLE my_table ADD COLUMN c INT FIRST;
ALTER TABLE my_table ADD COLUMN c INT AFTER b;

Changing Column Position #

ALTER TABLE my_table ALTER COLUMN col_a FIRST;

ALTER TABLE my_table ALTER COLUMN col_a AFTER col_b;

Changing Column Type #

ALTER TABLE my_table ALTER COLUMN col_a TYPE DOUBLE;

Auxiliary Statements #

Set / Reset #

SET命令設(shè)置屬性,返回現(xiàn)有屬性的值或返回所有具有值和含義的SQLConf屬性。RESET命令將特定于當(dāng)前會(huì)話的運(yùn)行時(shí)配置重置為通過(guò)set命令設(shè)置的默認(rèn)值。要專門設(shè)置paimon配置,需要添加spark.paimon。前綴。

-- set spark conf
SET spark.sql.sources.partitionOverwriteMode=dynamic;
 
-- set paimon conf
SET spark.paimon.file.block-size=512M;

-- reset conf
RESET spark.paimon.file.block-size;

Describe table #

description TABLE語(yǔ)句返回表的基本元數(shù)據(jù)信息。元數(shù)據(jù)信息包括列名、列類型和列注釋。

-- describe table
DESCRIBE TABLE my_table;

-- describe table with additional metadata
DESCRIBE TABLE EXTENDED my_table;

Show create table #

SHOW CREATE TABLE返回用于創(chuàng)建給定表的CREATE TABLE語(yǔ)句。

SHOW CREATE TABLE my_table;

Show columns #

返回表中列的列表。如果表不存在,則拋出異常。

SHOW COLUMNS FROM my_table;

Show partitions #

SHOW PARTITIONS語(yǔ)句用于列出表的分區(qū)。可以指定一個(gè)可選的分區(qū)規(guī)格來(lái)返回與提供的分區(qū)規(guī)格匹配的分區(qū)。

-- Lists all partitions for my_table
SHOW PARTITIONS my_table;

-- Lists partitions matching the supplied partition spec for my_table
SHOW PARTITIONS my_table PARTITION (dt=20230817);

Analyze table #

ANALYZE TABLE語(yǔ)句收集關(guān)于表的統(tǒng)計(jì)信息,查詢優(yōu)化器將使用這些信息來(lái)找到更好的查詢執(zhí)行計(jì)劃。Paimon支持通過(guò)analyze收集表級(jí)統(tǒng)計(jì)信息和列級(jí)統(tǒng)計(jì)信息。

-- collect table-level statistics
ANALYZE TABLE my_table COMPUTE STATISTICS;

-- collect table-level statistics and column statistics for col1
ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS col1;

-- collect table-level statistics and column statistics for all columns
ANALYZE TABLE my_table COMPUTE STATISTICS FOR ALL COLUMNS;

Procedures #

本節(jié)介紹關(guān)于paimon的所有可用的spark過(guò)程。

Procedure Name Explanation Example
compact 壓縮文件。參數(shù):
- Table:目標(biāo)表標(biāo)識(shí)符。不能是空的。
- 分區(qū):分區(qū)過(guò)濾器?!埃北硎尽癆ND”。
“;”表示“或”。如果要壓縮一個(gè)date=01和day=01的分區(qū),則需要寫入‘date=01,day=01’。所有分區(qū)都為空。(不能與where連用)
where:分區(qū)謂詞。所有分區(qū)都為空。(不能和“partitions”一起使用)
Order_strategy: ‘order’或‘zorder’或‘hilbert’或‘none’??諡椤皀one”。
Order_columns:需要排序的列。如果‘order_strategy’為‘none’,則為空。
Partition_idle_time:這用于對(duì)沒有接收到‘ Partition_idle_time ’新數(shù)據(jù)的分區(qū)進(jìn)行完全壓縮。只有這些分區(qū)會(huì)被壓縮。這個(gè)論證不能用于緊序。
SET spark.sql.shuffle.partitions=10; --set the compact parallelism

CALL sys.compact(table => 'T', partitions => 'p=0;p=1', order_strategy => 'zorder', order_by => 'a,b')

CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy => 'zorder', order_by => 'a,b')

CALL sys.compact(table => 'T', partition_idle_time => '60s')
expire_snapshots 使快照過(guò)期。參數(shù):
- Table:目標(biāo)表標(biāo)識(shí)符。不能是空的。
- Retain_max:保留已完成快照的最大數(shù)量。
- Retain_min:保留已完成快照的最小數(shù)量。
- Older_than:快照被刪除的時(shí)間戳。
- Max_deletes:一次可以刪除的最大快照數(shù)量。
CALL sys.expire_snapshots(table => 'default.T', retain_max => 10)
expire_partitions 使分區(qū)過(guò)期。參數(shù):
- Table:目標(biāo)表標(biāo)識(shí)符。不能是空的。
- Expiration_time:分區(qū)的過(guò)期時(shí)間。如果分區(qū)的生存期超過(guò)此值,則分區(qū)將過(guò)期。分區(qū)時(shí)間從分區(qū)值中提取。
- Timestamp_formatter:從字符串格式化時(shí)間戳的格式化程序。
- Timestamp_pattern:從分區(qū)獲取時(shí)間戳的模式。
- Expire_strategy:分區(qū)過(guò)期策略,取值范圍:‘values-time’或‘update-time’,默認(rèn)為‘values-time’。
CALL sys.expire_partitions(table => 'default.T', expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', timestamp_pattern => '$dt', expire_strategy => 'values-time')
create_tag 創(chuàng)建基于給定快照的標(biāo)記。參數(shù):
- Table:目標(biāo)表標(biāo)識(shí)符。不能是空的。
- 標(biāo)簽:新標(biāo)簽的名稱。不能是空的。
- snapshot(長(zhǎng)):新標(biāo)簽所基于的快照id。
- time_retained:新創(chuàng)建標(biāo)簽的最大保留時(shí)間。
-- based on snapshot 10 with 1d
CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot => 10, time_retained => '1 d')

-- based on the latest snapshot
CALL sys.create_tag(table => 'default.T', tag => 'my_tag')
---- ---- ----
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容