詳解 Spark 中的 Bucketing

什么是 Bucketing

Bucketing 就是利用 buckets(按列進行分桶)來決定數(shù)據(jù)分區(qū)(partition)的一種優(yōu)化技術(shù),它可以幫助在計算中避免數(shù)據(jù)交換(avoid data shuffle)。并行計算的時候shuffle常常會耗費非常多的時間和資源.

Bucketing 的基本原理比較好理解,它會根據(jù)你指定的列(可以是一個也可以是多個)計算哈希值,然后具有相同哈希值的數(shù)據(jù)將會被分到相同的分區(qū)。

bucket

Bucket和Partition的區(qū)別

Bucket的最終目的也是實現(xiàn)分區(qū),但是和Partition的原理不同,當(dāng)我們根據(jù)指定列進行Partition的時候,Spark會根據(jù)列的名字對數(shù)據(jù)進行分區(qū)(如果沒有指定列名則會根據(jù)一個隨機信息對數(shù)據(jù)進行分區(qū))。Bucketing的最大不同在于它使用了指定列的哈希值,這樣可以保證具有相同列值的數(shù)據(jù)被分到相同的分區(qū)。

怎么用 Bucket

按Bucket保存

目前在使用 bucketBy 的時候,必須和 sortBy,saveAsTable 一起使用,如下。這個操作其實是將數(shù)據(jù)保存到了文件中(如果不指定path,也會保存到一個臨時目錄中)。

df.write
  .bucketBy(10, "name")
  .sortBy("name")
  .mode(SaveMode.Overwrite)
  .option("path","/path/to")
  .saveAsTable("bucketed")

數(shù)據(jù)分桶保存之后,我們才能使用它。

直接從table讀取

在一個SparkSession內(nèi),保存之后你可以通過如下命令通過表名獲取其對應(yīng)的DataFrame.

val df = spark.table("bucketed")

其中spark是一個SparkSession對象。獲取之后就可以使用DataFrame或者在SQL中使用表。

從已經(jīng)保存的Parquet文件讀取

如果你要使用歷史保存的數(shù)據(jù),那么就不能用上述方法了,也不能像讀取常規(guī)文件一樣使用 spark.read.parquet() ,這種方式讀進來的數(shù)據(jù)是不帶bucket信息的。正確的方法是利用CREATE TABLE 語句,詳情可用參考 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html

CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
  [(col_name1 col_type1 [COMMENT col_comment1], ...)]
  USING data_source
  [OPTIONS (key1=val1, key2=val2, ...)]
  [PARTITIONED BY (col_name1, col_name2, ...)]
  [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1=val1, key2=val2, ...)]
  [AS select_statement]

示例如下:

spark.sql(
  """
    |CREATE TABLE bucketed
    | (name string)
    |  USING PARQUET
    |  CLUSTERED BY (name) INTO 10 BUCKETS
    |  LOCATION '/path/to'
    |""".stripMargin)

用Buckets的好處

在我們join兩個表的時候,如果兩個表最好按照相同的列劃分成相同的buckets,就可以完全避免shuffle。根據(jù)前面所述的hash值計算方法,兩個表具有相同列值的數(shù)據(jù)會存放在相同的機器上,這樣在進行join操作時就不需要再去和其他機器通訊,直接在本地完成計算即可。假設(shè)你有左右兩個表,各有兩個分區(qū),那么join的時候?qū)嶋H計算就是下圖的樣子,兩個機器進行計算,并且計算后分區(qū)還是2.

with bucket

而當(dāng)需要shuffle的時候,會是這樣的,


without bucket

細(xì)心的你可能發(fā)現(xiàn)了,上面兩個分區(qū)對應(yīng)兩個Executor,下面shuffle之后對應(yīng)的怎么成了三個Executor了?沒錯,當(dāng)數(shù)據(jù)進行shuffle之后,分區(qū)數(shù)就不再保持和輸入的數(shù)據(jù)相同了,實際上也沒有必要保持相同。

本地測試

我們考慮的是大數(shù)據(jù)表的連接,本地測試的時候一般使用小的表,所以逆序需要將小表自動廣播的配置關(guān)掉。如果開啟小表廣播,那么兩個小表的join之后分區(qū)數(shù)是不會變的,例如:

左表分區(qū)數(shù) 右表分區(qū)數(shù)數(shù) Join之后的分區(qū)數(shù)
3 3 3

關(guān)閉配置的命令如下:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

正常情況下join之后分區(qū)數(shù)會發(fā)生變化:

左表分區(qū)數(shù) 右表分區(qū)數(shù)數(shù) Join之后的分區(qū)數(shù)
3 3 200

這個200其實就是 "spark.sql.shuffle.partitions" 配置的值,默認(rèn)就是200. 所以如果在Join過程中出現(xiàn)了shuffle,join之后的分區(qū)一定會變,并且變成spark.sql.shuffle.partitions的值。通常你需要根據(jù)自己的集群資源修改這個值,從而優(yōu)化并行度,但是shuffle是不可避免的。

左右兩個表Bucket數(shù)目不一致時

實際測試結(jié)果如下:

左表Bucket數(shù) 右表Bucekt數(shù) Join之后的分區(qū)數(shù)
8 4 8
4 4 4

Spark依然會利用一些Bucekt的信息,但具體怎么執(zhí)行目前還不太清楚,還是保持一致的好。

另外,如果你spark job的可用計算核心數(shù)小于Bucket值,那么從文件中讀取之后Bucekt值會變,就是說bucket的數(shù)目不會超過你能使用的最大計算核數(shù)。

不要使用的 <=> 符號?。?!

在處理null值的時候,我們可能會用到一些特殊的函數(shù)或者符號,如下表所示。但是在使用bucket的時候這里有個坑,一定要躲過。join的時候千萬不要使用 <=> 符號,使用之后spark就會忽略bucket信息,繼續(xù)shuffle數(shù)據(jù),原因可能和hash計算有關(guān)。

null

原文連接

如果你喜歡我的文章,可以在任一平臺搜索【黑客悟理】關(guān)注我,非常感謝!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容