Pyspark Join操作

Spark Join 操作

[TOC]

官方文檔:https://spark.apache.org/docs/2.4.7/api/python/pyspark.sql.html?highlight=join#pyspark.sql.DataFrame.join

從文檔中可以看到關(guān)于join的介紹:join(other, on=None, how=None)

從函數(shù)中可以看到有三個參數(shù):

  • other:需要合并的DataFrame格式的數(shù)據(jù)。官方寫的是Right side of the join,翻譯過來就是放在右側(cè)的DataFrame數(shù)據(jù)。
  • on:用來執(zhí)行對等連接的列名,可以是字符串、字符串列表或者表達(dá)式。如果是字符串或者字符串列表,那么兩邊的數(shù)據(jù)都得存在該列。spark的橫向合并不向pandas那么簡單,直接橫向拼接。spark合并必須有對應(yīng)的列作為參照,列值形同的就合并,不存在的就填充空值。
  • how:合并方式。默認(rèn)的是inner,其他的還有cross,outer,full,full_outer,left,left_outer,right,right_outer,left_semi,left_anti。

一.創(chuàng)建數(shù)據(jù)

首先創(chuàng)建如下兩組數(shù)據(jù):

score.png

我將在這兩組數(shù)據(jù)基礎(chǔ)上做些測試。

創(chuàng)建代碼:

# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("create df") \
    .getOrCreate()

# 第一組數(shù)據(jù),包含年齡、體重、身高信息
body_info = [["Bom", 20, 97.6, 165],
             ["Alice", 23, 90.0, 160],
             ["kuke", 33, 190.0, 170],
             ["jike", 19, 120.0, 170],
             ["Joe", 24, 89.0, 162]]

body_df = spark.createDataFrame(body_info, ["name", "age", "weight", "height"])
body_df.show()

# 第二組數(shù)據(jù),包含一些成績信息,語數(shù)外
score_info = [["Bom", 88, 97, 90],
             ["Alice", 85, 99, 92],
             ["kuke", 77, 82, 80],
             ["jike", 65, 58, 30],
             ["Joe", 90, 100, 92]]
score_df = spark.createDataFrame(score_info, ["name", "Chinese", "Math", "English"])
score_df.show()


# 輸出
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
|  Bom| 20|  97.6|   165|
|Alice| 23|  90.0|   160|
| kuke| 33| 190.0|   170|
| jike| 19| 120.0|   170|
|  Joe| 24|  89.0|   162|
+-----+---+------+------+

+-----+-------+----+-------+
| name|Chinese|Math|English|
+-----+-------+----+-------+
|  Bom|     88|  97|     90|
|Alice|     85|  99|     92|
| kuke|     77|  82|     80|
| jike|     65|  58|     30|
|  Joe|     90| 100|     92|
+-----+-------+----+-------+

二.合并操作

這節(jié)的操作只針對第三個參數(shù)how做實(shí)驗(yàn),第二個參數(shù)on都設(shè)為name。

1.inner

情況一:第一組數(shù)據(jù)行數(shù) = 第二組數(shù)據(jù)行數(shù)

合并代碼:

merge_df = body_df.join(score_df, on="name", how="inner")
merge_df.show()

輸出

+-----+---+------+------+-------+----+-------+
| name|age|weight|height|Chinese|Math|English|
+-----+---+------+------+-------+----+-------+
| jike| 19| 120.0|   170|     65|  58|     30|
|  Bom| 20|  97.6|   165|     88|  97|     90|
|  Joe| 24|  89.0|   162|     90| 100|     92|
|Alice| 23|  90.0|   160|     85|  99|     92|
| kuke| 33| 190.0|   170|     77|  82|     80|
+-----+---+------+------+-------+----+-------+

結(jié)果顯而易見,是把第二組數(shù)據(jù)除name列拼到了第一組的右邊。

情況二:第一組數(shù)據(jù)行數(shù) > 第二組數(shù)據(jù)行數(shù)

刪掉第二組數(shù)據(jù)的2和4行,變?yōu)椋?/p>

+-----+-------+----+-------+
| name|Chinese|Math|English|
+-----+-------+----+-------+
|  Bom|     88|  97|     90|
| kuke|     77|  82|     80|
|  Joe|     90| 100|     92|
+-----+-------+----+-------+

合并代碼:

merge_df = body_df.join(score_df, on="name", how="inner")
merge_df.show()

輸出:

+----+---+------+------+-------+----+-------+
|name|age|weight|height|Chinese|Math|English|
+----+---+------+------+-------+----+-------+
| Bom| 20|  97.6|   165|     88|  97|     90|
| Joe| 24|  89.0|   162|     90| 100|     92|
|kuke| 33| 190.0|   170|     77|  82|     80|
+----+---+------+------+-------+----+-------+

從輸出結(jié)果可以看到,inner操作先根據(jù)兩組的name列求交集,再合并數(shù)據(jù)。

情況三:第一組數(shù)據(jù)行數(shù) < 第二組數(shù)據(jù)行數(shù)

刪掉第一組數(shù)據(jù)的2和4行,變?yōu)椋?/p>

+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
|  Bom| 20|  97.6|   165|
| kuke| 33| 190.0|   170|
|  Joe| 24|  89.0|   162|
+-----+---+------+------+

合并代碼:

merge_df = body_df.join(score_df, on="name", how="inner")
merge_df.show()

輸出:

+----+---+------+------+-------+----+-------+
|name|age|weight|height|Chinese|Math|English|
+----+---+------+------+-------+----+-------+
| Bom| 20|  97.6|   165|     88|  97|     90|
| Joe| 24|  89.0|   162|     90| 100|     92|
|kuke| 33| 190.0|   170|     77|  82|     80|
+----+---+------+------+-------+----+-------+

結(jié)果和情況二一樣,先求交集,在合并。

2.cross

情況一:第一組數(shù)據(jù)行數(shù) = 第二組數(shù)據(jù)行數(shù)

合并代碼:

merge_df = body_df.join(score_df, on="name", how="cross")
merge_df.show()

在spark2.4.7中用上邊的代碼運(yùn)行會毫不客氣的報(bào)錯:

pyspark.sql.utils.IllegalArgumentException: 'requirement failed: Unsupported using join type Cross'

之后通過Google,搜索到不是這么用:

改成如下:

merge_df = body_df.crossJoin(score_df)
merge_df.show()

輸出:

+-----+---+------+------+-----+-------+----+-------+
| name|age|weight|height| name|Chinese|Math|English|
+-----+---+------+------+-----+-------+----+-------+
|  Bom| 20|  97.6|   165|  Bom|     88|  97|     90|
|  Bom| 20|  97.6|   165|Alice|     85|  99|     92|
|  Bom| 20|  97.6|   165| kuke|     77|  82|     80|
|  Bom| 20|  97.6|   165| jike|     65|  58|     30|
|  Bom| 20|  97.6|   165|  Joe|     90| 100|     92|
|Alice| 23|  90.0|   160|  Bom|     88|  97|     90|
|Alice| 23|  90.0|   160|Alice|     85|  99|     92|
|Alice| 23|  90.0|   160| kuke|     77|  82|     80|
|Alice| 23|  90.0|   160| jike|     65|  58|     30|
|Alice| 23|  90.0|   160|  Joe|     90| 100|     92|
| kuke| 33| 190.0|   170|  Bom|     88|  97|     90|
| kuke| 33| 190.0|   170|Alice|     85|  99|     92|
| kuke| 33| 190.0|   170| kuke|     77|  82|     80|
| kuke| 33| 190.0|   170| jike|     65|  58|     30|
| kuke| 33| 190.0|   170|  Joe|     90| 100|     92|
| jike| 19| 120.0|   170|  Bom|     88|  97|     90|
| jike| 19| 120.0|   170|Alice|     85|  99|     92|
| jike| 19| 120.0|   170| kuke|     77|  82|     80|
| jike| 19| 120.0|   170| jike|     65|  58|     30|
| jike| 19| 120.0|   170|  Joe|     90| 100|     92|
+-----+---+------+------+-----+-------+----+-------+

看到這結(jié)果開始有點(diǎn)摸不著頭腦,后來看懂了是第一組數(shù)據(jù)的每一行都會和第二組的每一行生成新的一行。

情況二:第一組數(shù)據(jù)行數(shù) > 第二組數(shù)據(jù)行數(shù)

刪掉第二組數(shù)據(jù)的2和4行,變?yōu)椋?/p>

+-----+-------+----+-------+
| name|Chinese|Math|English|
+-----+-------+----+-------+
|  Bom|     88|  97|     90|
| kuke|     77|  82|     80|
|  Joe|     90| 100|     92|
+-----+-------+----+-------+

合并代碼:

merge_df = body_df.crossJoin(score_df)
merge_df.show()

輸出:

+-----+---+------+------+----+-------+----+-------+
| name|age|weight|height|name|Chinese|Math|English|
+-----+---+------+------+----+-------+----+-------+
|  Bom| 20|  97.6|   165| Bom|     88|  97|     90|
|  Bom| 20|  97.6|   165|kuke|     77|  82|     80|
|  Bom| 20|  97.6|   165| Joe|     90| 100|     92|
|Alice| 23|  90.0|   160| Bom|     88|  97|     90|
|Alice| 23|  90.0|   160|kuke|     77|  82|     80|
|Alice| 23|  90.0|   160| Joe|     90| 100|     92|
| kuke| 33| 190.0|   170| Bom|     88|  97|     90|
| kuke| 33| 190.0|   170|kuke|     77|  82|     80|
| kuke| 33| 190.0|   170| Joe|     90| 100|     92|
| jike| 19| 120.0|   170| Bom|     88|  97|     90|
| jike| 19| 120.0|   170|kuke|     77|  82|     80|
| jike| 19| 120.0|   170| Joe|     90| 100|     92|
|  Joe| 24|  89.0|   162| Bom|     88|  97|     90|
|  Joe| 24|  89.0|   162|kuke|     77|  82|     80|
|  Joe| 24|  89.0|   162| Joe|     90| 100|     92|
+-----+---+------+------+----+-------+----+-------+

雖然少了兩行,但不影響情況一對結(jié)論。

情況三:第一組數(shù)據(jù)行數(shù) < 第二組數(shù)據(jù)行數(shù)

刪掉第一組數(shù)據(jù)的2和4行,變?yōu)椋?/p>

+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
|  Bom| 20|  97.6|   165|
| kuke| 33| 190.0|   170|
|  Joe| 24|  89.0|   162|
+-----+---+------+------+

合并代碼:

merge_df = body_df.crossJoin(score_df)
merge_df.show()

輸出:

+----+---+------+------+-----+-------+----+-------+
|name|age|weight|height| name|Chinese|Math|English|
+----+---+------+------+-----+-------+----+-------+
| Bom| 20|  97.6|   165|  Bom|     88|  97|     90|
| Bom| 20|  97.6|   165|Alice|     85|  99|     92|
| Bom| 20|  97.6|   165| kuke|     77|  82|     80|
| Bom| 20|  97.6|   165| jike|     65|  58|     30|
| Bom| 20|  97.6|   165|  Joe|     90| 100|     92|
|kuke| 33| 190.0|   170|  Bom|     88|  97|     90|
|kuke| 33| 190.0|   170|Alice|     85|  99|     92|
|kuke| 33| 190.0|   170| kuke|     77|  82|     80|
|kuke| 33| 190.0|   170| jike|     65|  58|     30|
|kuke| 33| 190.0|   170|  Joe|     90| 100|     92|
| Joe| 24|  89.0|   162|  Bom|     88|  97|     90|
| Joe| 24|  89.0|   162|Alice|     85|  99|     92|
| Joe| 24|  89.0|   162| kuke|     77|  82|     80|
| Joe| 24|  89.0|   162| jike|     65|  58|     30|
| Joe| 24|  89.0|   162|  Joe|     90| 100|     92|
+----+---+------+------+-----+-------+----+-------+

結(jié)論和情況一一樣。

3.outer

情況一:第一組數(shù)據(jù)行數(shù) = 第二組數(shù)據(jù)行數(shù)

合并代碼:

merge_df = body_df.join(score_df, on="name", how="outer")
merge_df.show()

輸出:

+-----+---+------+------+-------+----+-------+
| name|age|weight|height|Chinese|Math|English|
+-----+---+------+------+-------+----+-------+
| jike| 19| 120.0|   170|     65|  58|     30|
|  Bom| 20|  97.6|   165|     88|  97|     90|
|  Joe| 24|  89.0|   162|     90| 100|     92|
|Alice| 23|  90.0|   160|     85|  99|     92|
| kuke| 33| 190.0|   170|     77|  82|     80|
+-----+---+------+------+-------+----+-------+

結(jié)果和inner合并的情況一一樣,結(jié)論暫時(shí)未知。

情況二:第一組數(shù)據(jù)行數(shù) > 第二組數(shù)據(jù)行數(shù)

刪掉第二組數(shù)據(jù)的2和4行,變?yōu)椋?/p>

+-----+-------+----+-------+
| name|Chinese|Math|English|
+-----+-------+----+-------+
|  Bom|     88|  97|     90|
| kuke|     77|  82|     80|
|  Joe|     90| 100|     92|
+-----+-------+----+-------+

合并代碼:

merge_df = body_df.join(score_df, on="name", how="outer")
merge_df.show()

輸出:

+-----+---+------+------+-------+----+-------+
| name|age|weight|height|Chinese|Math|English|
+-----+---+------+------+-------+----+-------+
| jike| 19| 120.0|   170|   null|null|   null|
|  Bom| 20|  97.6|   165|     88|  97|     90|
|  Joe| 24|  89.0|   162|     90| 100|     92|
|Alice| 23|  90.0|   160|   null|null|   null|
| kuke| 33| 190.0|   170|     77|  82|     80|
+-----+---+------+------+-------+----+-------+

從輸出結(jié)果可以看到,沒有的數(shù)據(jù)用null填充。

情況三:第一組數(shù)據(jù)行數(shù) < 第二組數(shù)據(jù)行數(shù)

刪掉第一組數(shù)據(jù)2和4行,變?yōu)椋?/p>

+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
|  Bom| 20|  97.6|   165|
| kuke| 33| 190.0|   170|
|  Joe| 24|  89.0|   162|
+-----+---+------+------+

合并代碼:

merge_df = body_df.join(score_df, on="name", how="outer")
merge_df.show()

輸出:

+-----+----+------+------+-------+----+-------+
| name| age|weight|height|Chinese|Math|English|
+-----+----+------+------+-------+----+-------+
| jike|null|  null|  null|     65|  58|     30|
|  Bom|  20|  97.6|   165|     88|  97|     90|
|  Joe|  24|  89.0|   162|     90| 100|     92|
|Alice|null|  null|  null|     85|  99|     92|
| kuke|  33| 190.0|   170|     77|  82|     80|
+-----+----+------+------+-------+----+-------+

結(jié)合前面兩種情況,先根據(jù)name列的值求并集,有數(shù)據(jù)的直接合并,沒有的填充null。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容