[LakeHouse] Delta Lake全部開源,聊聊Delta的實現(xiàn)架構

歡迎關注公眾號“Tim在路上”
剛剛結束的Data + AI summit上,Databricks宣布將Delta Lake全部開源。

目前在LakeHouse的市場上國內有Hudi,國外有Iceberg, Delta Lake社區(qū)正被他們沖擊著,這次Delta Lake的全部開源不管是急病亂投醫(yī),還是絕地反擊我們暫不討論。今天我們主要來了解了Delta Lake是如何實現(xiàn)的。

Delta Lake的誕生

在2017年,Delta Lake 橫空出世,它主打的概念是湖倉一體,最初只開放給付費用戶使用。在2019年時,為提高其市場的占用份額和影響力,將其進行部分開源。

Delta Lake創(chuàng)建之初的定位主要是為解決云存儲中很難實現(xiàn) ACID 事務和高性能的問題。

  1. 更新不是原子操作,因此查詢不是隔離的,那么在多對象的更新中,reader將可以查詢到部分的更新,某個對象更新失敗后回滾需要整體回滾。
  2. 在大型表的云存儲中進行元數(shù)據操作成本很高。例如parquet文件的footer中包含min/max統(tǒng)計信息可以幫助reader進行選擇性的讀取,在HDFS上讀取這樣的頁腳可能需要幾毫秒,但是在云存儲上需要更高的讀取延時。
  3. 對象存儲上的list操作性能非常差。

為了解決上面的問題,設計并實現(xiàn)了基于云存儲的ACID表存儲層--Delta Lake。

Delta Lake的實現(xiàn)思想也很簡單:使用存儲在云對象存儲中的預寫日志,以ACID的方式來管理維護Delta表中的信息。

那么Delta Lake是如何解決上面的存儲層問題呢?我列舉了如下幾個重要的特性:

  1. 時間旅行,允許用戶查詢時間點的快照,也可以根據時間點進行回滾。
  2. Upsert、Delete和Merge操作,可以有效的重寫對象,支持流式更新操作。
  3. 高效的流式IO, 通過流式操作將小對象寫入表中,并以事務的方式進行合并更新,同時還支持增量消費。
  4. 自動的數(shù)據布局優(yōu)化,可以自動的優(yōu)化表中的對象大小,并將數(shù)據記錄進行聚類。
  5. 支持schema進化,支持表的schema更改但不用重寫他們。

Delta Lake的存儲架構

Delta Lake 的數(shù)據存儲原理其實很簡單。它通過 Partition Directories 存儲數(shù)據,數(shù)據格式推薦為 Parquet,然后通過預寫的 Transaction Log (事務日志)記錄的表版本(Table Version) 和變更歷史,以維護歷史版本數(shù)據。

Delta Lake中的一些表級的操作,例如更新元數(shù)據、更新表名、變更 Schema、增加或刪除Partition、添加或者移除文件,都會以日志的形式將所有的操作存儲在表中。

1ed.png

上圖展示了Delta中數(shù)據的組織形式。數(shù)據和事務日志都被存儲在表級的目錄下,其中數(shù)據以傳統(tǒng)的Hive分區(qū)目錄的方式存儲,事務日志被存儲在_delta_log的目錄下。

  1. Delta每次事務commit都會產生一個json的元數(shù)據文件,文件內容包括本次commit做的所有action,比如AddFile/RemoveFile,也包括對schema的修改等等;
  2. 每產生一個新的json文件就會產生一個新的Delta的snapshot,snapshot的版本即該json文件中的數(shù)字,該數(shù)字必須是連續(xù)自增,Delta的某個版本的snapshot是通過順序回放所有小于等于該snapshot版本號的所有json文件得到;
  3. Delta Lake會以一定的頻率做checkpoint,checkpoint以Parquet的格式存儲,目的是為了便于使用Spark并行進行向量化處理。
  4. delta_log子目錄下還包含一個last_checkpoint文件指向最新的checkpoint,從而在日志操作時可以快速找到最新的checkpoint。

從上面的元數(shù)據結構可以看出,Delta和Hudi和Iceberg其實是大同小異。

那么Delta基于事務日志實現(xiàn)的細節(jié)又是怎樣的呢?

Delta事務日志的實現(xiàn)細節(jié)

Delta事務日志的實現(xiàn)主要是基于MVCC多版本控制協(xié)議實現(xiàn)。Delta 的 MVCC 算法保留多個數(shù)據副本,而不是立即替換包含正在更新或刪除的記錄的文件。

表的讀?。褐饕峭ㄟ^使用事務日志有選擇地選擇要處理的數(shù)據文件,確保他們一次只能看到表的一致快照。

表的寫入與修改:首先,樂觀地寫出新數(shù)據文件或修改現(xiàn)有數(shù)據文件的拷貝副本。然后,進行事務提交,通過向日志中添加新條目來創(chuàng)建表的最新原子版本。在此日志條目中,他們記錄了要在邏輯上添加和刪除哪些數(shù)據文件,以及對有關表的其他元數(shù)據的更改。

在用戶指定的保留期(默認為 7 天)后,過期的數(shù)據文件將被刪除。

  • Delta files
./_delta_log/00000000000000000001.json

Delta files是以原子性單位產生的文件(即沒提交一次commit事務),文件的命名是遞增的版本id, 它與checkpoints文件一起構成表中所有更改的日志。

Delta files的json文件中會包含一組應用應用于前一個表版本的actions操作,每一個actions是以一個json組存儲與Delta files中。

其action行為主要有以下幾種:SetTransaction、AddFile、RemoveFile、Metadata、Protocol、CommitInfo和Column Mapping。

  • Checkpoints
./_delta_log/00000000000000000010.checkpoint.parquet

checkpoints文件也存儲在 _delta_log 目錄中,可以為任何版本的表創(chuàng)建。檢查點包含在此版本之前的所有操作的完整回放,并刪除了無效操作。無效操作是那些已被后續(xù)操作取消的操作(例如刪除已添加的文件)。

默認情況下,參考實現(xiàn)每 10 次提交創(chuàng)建一個checkpoint。checkpoints文件名基于檢查點包含的表的版本。

  • 最后一個Checkpoint

Delta 事務日志通常包含許多(例如 10,000+)文件。列出如此大的目錄可能會非常昂貴。最后一個checkpoint文件可以通過提供指向日志末尾附近的指針來幫助降低構建表的最新快照的成本。

讀者可以通過查看_delta_log/_last_checkpoint文件來定位最近的檢查點,而不是列出整個目錄。

U2ntitled.png

那么接下來我們來看看json文件中的內容是什么?下面我們撿幾個重要的展開看看。

Actions

  1. Metadata

元數(shù)據操作更改表的當前元數(shù)據。表的第一個版本必須包含元數(shù)據操作。隨后的元數(shù)據操作完全覆蓋表的當前元數(shù)據。

{
  "metaData":{
    "id":"af23c9d7-fff1-4a5a-a2c8-55c59bd782aa",
    "format":{"provider":"parquet","options":{}},
    "schemaString":"...",
    "partitionColumns":[],
    "configuration":{
      "appendOnly": "true"
    }
  }
}

Metadata中記錄當前表id, 表級別的file format, 分區(qū)列信息等。在schemaString存儲這columnMapping信息。使用列映射來避免任何列命名限制,并支持重命名和刪除列,而無需重寫所有數(shù)據。列映射有三種模式,按名稱和按id和none。

{
    "name" : "e",
    "type" : {
      "type" : "array",
      "elementType" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "d",
          "type" : "integer",
          "nullable" : false,
          "metadata" : {
            "delta.columnMapping.id": 5,
            "delta.columnMapping.physicalName": "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49"
          }
        } ]
      },
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : {
      "delta.columnMapping.id": 4,
      "delta.columnMapping.physicalName": "col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
    }
  }

  1. ADD / Delete File

添加和刪除操作分別用于通過添加或刪除單個數(shù)據文件來修改表中的數(shù)據。

{
  "add": {
    "path":"date=2017-12-10/part-000...c000.gz.parquet",
    "partitionValues":{"date":"2017-12-10"},
    "size":841454,
    "modificationTime":1512909768000,
    "dataChange":true
    "stats":"{\\"numRecords\\":1,\\"minValues\\":{\\"val..."
  }
}

add中記錄添加文件的相對路徑,以及當前file所屬的分區(qū)信息,通過還包含了file的統(tǒng)計信息,包括min/max。

{
  "remove":{
    "path":"part-00001-9…..snappy.parquet",
    "deletionTimestamp":1515488792485,
    "dataChange":true
  }
}

刪除操作只包括一個時間戳,指示刪除發(fā)生的時間。文件的物理刪除可能會延遲進行在用戶指定的過期時間之后。刪除操作應該作為邏輯刪除保持在表的狀態(tài)中,直到過期。當增量文件的創(chuàng)建時間戳超過添加到刪除操作時間戳的過期閾值時,邏輯刪除將過期。

  1. Transaction Identifiers
{
  "txn": {
    "appId":"3ba13872-2d47-4e17-86a0-21afd2a22395",
    "version":364475
  }
}

事務標識符以appId版本對的形式存儲,其中appId是修改表的進程的唯一標識符,版本表示該應用程序取得了多大進展。該信息的原子記錄以及對表的修改使這些外部系統(tǒng)能夠將其寫入到Delta表冪等中。

下面我們來總結對比下:

  1. Delta的實現(xiàn)和Spark深度綁定,雖然delta2.0官宣支持多種引擎,但Iceberg和Hudi早已可以支持多種引擎,另外iceberg不和任何引擎綁定。
  2. 目前Delta只支持COW形式,Iceberg和Hudi都支持部分MOR。
  3. 在實現(xiàn)方式上與Hudi, Iceberg大同小異,但是其事務日志文件中只記錄了上一版本與當前版本的差分Action。如果要獲取某個commit的完整文件列表就需要把之前的差分Action進行重放。不過Delta引入checkpoint機制,當commit積累到一定數(shù)量,會生成一個checkpoint, 此時會刪除簡化無效的Action, 后續(xù)的讀取可以基于這個checkpoint開始重放。
  4. Delta可以生成較少的元數(shù)據文件,基于checkpoint機制和過期文件的刪除,減少了大量小文件的產生,但是并不能很好獲取某個commit的數(shù)據。Iceberg可能會產生大量的元數(shù)據文件,影響了查詢性能,但也相應的增加文件組跳過的能力。

后續(xù)會再繼續(xù)解密下開源的付費功能Z-order的實現(xiàn)源碼。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容