Docker、Spark 和 Iceberg:體驗 Iceberg 的最快方式

81d3aca5f060a75709a80bb2c68a4bc8.png

來源 | docker-spark-and-iceberg
作者 | Sam Redai & Kyle Bendickson
翻譯 | liliwei

如果您因為聽說Iceberg解決了若干問題,例如模式演變或行級更新,而對Iceberg感興趣,并且你想要一種簡單的方法來體驗它,那么您來對地方了!這篇文章將讓您在本地幾分鐘內(nèi)啟動并運行 Spark 和 Iceberg。同時將展示出許多令人驚嘆的 Iceberg 特性,這些特性可以解決您以前使用數(shù)據(jù)倉庫時遇到的問題。

Iceberg 提供了用于直接與表進行交互的庫,但對于大多數(shù)人來說,這些庫的層級太低了,一般情況下會通過計算引擎(如 Spark、Trino 或 Flink)與 Iceberg 進行交互。

讓我們從集成了 Iceberg 的本地 Spark Docker 實例開始。在此環(huán)境中,您將能夠體驗 Iceberg 的諸多特性,例如時間旅行、安全的模式演化和隱藏分區(qū)。

啟動 notebook

首先,如果您還沒有 DockerDocker Compose ,請先安裝它們。接下來,創(chuàng)建一個包含以下內(nèi)容的 docker-compose.yaml 文件。

docker-compose.yaml

version: "3"

services:
  spark-iceberg:
    image: tabulario/spark-iceberg
    depends_on:
      - postgres
    container_name: spark-iceberg
    environment:
      - SPARK_HOME=/opt/spark
      - PYSPARK_PYTON=/usr/bin/python3.9
      - PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin
    volumes:
      - ./warehouse:/home/iceberg/warehouse
      - ./notebooks:/home/iceberg/notebooks/notebooks
    ports:
      - 8888:8888
      - 8080:8080
      - 18080:18080
  postgres:
    image: postgres:13.4-bullseye
    container_name: postgres
    environment:
      - POSTGRES_USER=admin
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=demo_catalog
    volumes:
      - ./postgres/data:/var/lib/postgresql/data

在 docker-compose.yaml 同級目錄中,運行以下命令以啟動并打開一個支持 Iceberg 的 Spark notebook 服務(wù)。

docker-compose up -d
docker exec -it spark-iceberg pyspark-notebook

就是這么簡單!

您現(xiàn)在應(yīng)該有了一個運行在 http://localhost:8888 的功能齊全的 notebook 服務(wù),一個運行在 http://localhost:8080 的 Spark driver,以及一個運行在 http://localhost:18080 的 Spark history。

一個最小化的運行環(huán)境

docker compose 文件提供的運行環(huán)境遠不是一個大規(guī)模的生產(chǎn)級倉庫,但它確實可以讓你演示 Iceberg 的諸多特性。讓我們快速介紹一下這個最小化的運行環(huán)境。

  • 本地模式下的 Spark 3.1.2(引擎)
  • 由 Postgres 容器支撐的 JDBC 目錄(Catalog)
  • 將所有內(nèi)容連接在一起的 docker-compose
  • 可以讓您在 notebook 中輕松運行 SQL的%%sql 命令

那么,Iceberg 能做什么呢?

本文的 docker 環(huán)境附帶一個名為 “ Iceberg - Getting Started ” 的 notebook,其演示了本文將會深入介紹的大量特性。您在此處看到的示例同樣包含在 notebook 中,因此它也是開始學(xué)習(xí)的好地方!

模式演變

任何使用過 Hive 表或其他大數(shù)據(jù)表格式的人都知道改變表模式會是多么棘手。添加先前已刪除的列可能會導(dǎo)致舊的列數(shù)據(jù)死而復(fù)生。即便您幸運地快速捕獲了它,撤消操作通常會非常耗時,并且常常會導(dǎo)致您回到表的原始狀態(tài)。而即使只修改列名也是一個潛在的危險操作。

在 Iceberg 中,您不必擔心哪些更改生效,哪些更改會破壞您的表。在 Iceberg 中,重命名或添加列等表模式操作是安全的,沒有令人始料未及的副作用。

ALTER TABLE taxis
RENAME COLUMN fare_amount TO fare
ALTER TABLE taxis
ADD COLUMN fare_per_distance_unit float AFTER distance;

其他模式更改(例如更改列的類型、添加注釋或移動其位置)也同樣易如反掌。

ALTER TABLE taxis RENAME COLUMN trip_distance TO distance;
ALTER TABLE taxis ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'
ALTER TABLE taxis ALTER COLUMN distance TYPE double;
ALTER TABLE taxis ALTER COLUMN distance AFTER fare;

此外,因為模式演變僅包括元數(shù)據(jù)操作,所以它們執(zhí)行得非常迅速。僅僅為了更改單個列的名稱而重寫整個表的日子已經(jīng)一去不復(fù)返了!

分區(qū)

當您考慮更改生產(chǎn)表的分區(qū)方式時,這總是一件大事。這個過程很艱巨,需要遷移數(shù)據(jù)到具有不同分區(qū)方式的全新表——除非您使用的是 Iceberg。歷史數(shù)據(jù)必須變更為新的分區(qū)方式,即使您只想對新的數(shù)據(jù)變更分區(qū)方式。可以說,最困難的是追查表的使用者,以提醒他們在所有查詢中更新分區(qū)子句!Iceberg 可以無縫地處理這個問題。表的分區(qū)可以實時更新并僅應(yīng)用于新寫入的數(shù)據(jù)。

ALTER TABLE taxis
ADD PARTITION FIELD VendorID

而查詢計劃( query plans )將會被拆分開來,分區(qū)變更之前寫入的數(shù)據(jù)使用舊的分區(qū)方式,變更之后寫入的數(shù)據(jù)使用新的分區(qū)方式。查詢表的人甚至無需感知拆分動作。WHERE 子句中的簡單謂詞會自動轉(zhuǎn)換為分區(qū)過濾器,以裁剪不匹配的文件。這就是 Iceberg 中所說的 隱藏分區(qū)。

時間旅行和回滾

當您更改表時,Iceberg 會及時地將每個版本作為“快照”進行跟蹤。您可以時間旅行到任何快照或時間點。當您想要重現(xiàn)先前查詢的結(jié)果時,這一特性會非常有用,比如重新生成一個下游報表產(chǎn)品。

spark.read.table("taxis").count()  # 2,853,020

val ONE_DAY_MS = 86400000;
val NOW = System.currentTimeMillis()

(spark
.read
.option("as-of-timestamp", NOW_MS - ONE_DAY_MS)
.table("taxis")
.count())  # 2,798,371

在一個依賴于許多會頻繁更新的上游表的復(fù)雜邏輯中,引起查詢結(jié)果變動的原因,是正在測試的新代碼還是上游表的更改,往往不好精準確認。而使用時間旅行,您可以確保查詢到指定時間下的所有表(譯者注:指定時間下的表數(shù)據(jù)不會因新的更改而變動),這使創(chuàng)建受控的環(huán)境變得更加容易。

如果您不想在運行時對單個查詢進行時間旅行,而是希望將表實際回滾到特定時間點或特定快照 ID,您可以使用回滾過程語句來輕松實現(xiàn)!

CALL catalog_name.system.rollback_to_timestamp('taxis', TIMESTAMP '2021-12-31 00:00:00.000')
CALL demo.system.rollback_to_snapshot('taxis', <SNAPSHOT>)

極富表現(xiàn)力的行級變更 SQL

Iceberg 的擴展 SQL 可以執(zhí)行應(yīng)用于行級操作的查詢,它非常有表現(xiàn)力。例如,您可以刪除表中與特定謂詞匹配的所有記錄。

DELETE FROM taxis
WHERE fare_per_distance_unit > 4.0 OR distance > 2.0

此外,MERGE INTO 語句使合并兩個表的任務(wù)非常直觀。

MERGE INTO prod.nyctaxis pt
USING (SELECT * FROM staging.nyc.taxis) st
ON pt.id = st.id
WHEN NOT MATCHED THEN INSERT *;

原子性的 CTAS 和 RTAS 語句

Iceberg 確保 CTAS(“CREATE TABLE AS SELECT”的縮寫)或 RTAS(“REPLACE TABLE AS SELECT”的縮寫)語句是原子性的。如果 SELECT 查詢失敗,您不會留下一張必須在他人查詢之前快速刪除的不完整的表!

[CREATE|REPLACE] TABLE prod.nyc.vendor2 AS
SELECT * FROM taxis
WHERE vendor_id = '2'

Spark 過程語句

上文中,我們介紹了一個名為rollback_to_snapshot的回滾過程語句,而 Iceberg 包含更多允許您執(zhí)行各種表維護動作的其它過程語句。您可以使用這些命名直觀的 Spark 過程語句來完成快照過期、重寫清單文件或刪除孤立文件等操作。要閱讀所有可用過程語句的更多信息,請查看 Iceberg 文檔中的Spark 過程語句部分。

結(jié)束語

對 Iceberg 的支持正在持續(xù)快速增長,亞馬遜最近宣布在 EMR 中內(nèi)置對 Iceberg 的支持,Snowflake 最近宣布支持連接到 Iceberg 表以響應(yīng)重要的客戶需求。同樣,社區(qū)每天都在變得愈發(fā)強大,來自許多不同行業(yè)的貢獻者帶來了令人興奮的獨特用例、觀點和解決方案。如果您對社區(qū)所討論的任何特性有想法或只是想打個招呼,請查看我們的社區(qū)頁面,了解可以加入討論的所有方式。我希望能看到你!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容