Spark-SQL之DataFrame操作大全

原文鏈接
Spark SQL中的DataFrame類似于一張關(guān)系型數(shù)據(jù)表。在關(guān)系型數(shù)據(jù)庫(kù)中對(duì)單表或進(jìn)行的查詢操作,在DataFrame中都可以通過(guò)調(diào)用其API接口來(lái)實(shí)現(xiàn)??梢詤⒖?,Scala提供的DataFrame API。

本文中的代碼基于Spark-2.2.1的文檔實(shí)現(xiàn)。

一、DataFrame對(duì)象的生成

Spark-SQL可以以其他RDD對(duì)象、parquet文件、json文件、hive表,以及通過(guò)JDBC連接到其他關(guān)系型數(shù)據(jù)庫(kù)作為數(shù)據(jù)源來(lái)生成DataFrame對(duì)象。本文將以MySQL數(shù)據(jù)庫(kù)為數(shù)據(jù)源,生成DataFrame對(duì)象后進(jìn)行相關(guān)的DataFame之上的操作。
  文中生成DataFrame的代碼如下:

object DataFrameOperations {
  def main (args: Array[String ]) {
    val sparkConf = new SparkConf().setAppName( "Spark SQL DataFrame Operations").setMaster( "local[2]" )
    val sparkContext = new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sparkContext)
    val url = "jdbc:mysql://m000:3306/test"

    val jdbcDF = sqlContext.read.format( "jdbc" ).options(
      Map( "url" -> url,
        "user" -> "root",
        "password" -> "root",
        "dbtable" -> "spark_sql_test" )).load()

    val joinDF1 = sqlContext.read.format( "jdbc" ).options(
      Map("url" -> url ,
        "user" -> "root",
        "password" -> "root",
        "dbtable" -> "spark_sql_join1" )).load()

    val joinDF2 = sqlContext.read.format( "jdbc" ).options(
      Map ( "url" -> url ,
        "user" -> "root",
        "password" -> "root",
        "dbtable" -> "spark_sql_join2" )).load()

    ... ...
  }
}

后續(xù)代碼都在上面... ...處。

二、DataFrame對(duì)象上Action操作

1、show:展示數(shù)據(jù)

以表格的形式在輸出中展示jdbcDF中的數(shù)據(jù),類似于select * from spark_sql_test的功能。
  show方法有四種調(diào)用方式,分別為,
(1)show
  只顯示前20條記錄。
  示例:

jdbcDF.show

結(jié)果:
  [圖片上傳失敗...(image-74ce47-1531294704004)]

(2)show(numRows: Int)
  顯示numRows
  示例:

jdbcDF.show(3)

結(jié)果:
  [圖片上傳失敗...(image-97195f-1531294704004)]

(3)show(truncate: Boolean)
  是否最多只顯示20個(gè)字符,默認(rèn)為true
  示例:

jdbcDF.show(true)
jdbcDF.show(false)

結(jié)果:
  [圖片上傳失敗...(image-88e83a-1531294704004)]

(4)show(numRows: Int, truncate: Boolean)
  綜合前面的顯示記錄條數(shù),以及對(duì)過(guò)長(zhǎng)字符串的顯示格式。
  示例:

jdbcDF.show(3, false)

結(jié)果:
  [圖片上傳失敗...(image-9da1e7-1531294704003)]

2、collect:獲取所有數(shù)據(jù)到數(shù)組

不同于前面的show方法,這里的collect方法會(huì)將jdbcDF中的所有數(shù)據(jù)都獲取到,并返回一個(gè)Array對(duì)象。

jdbcDF.collect()

結(jié)果如下,結(jié)果數(shù)組包含了jdbcDF的每一條記錄,每一條記錄由一個(gè)GenericRowWithSchema對(duì)象來(lái)表示,可以存儲(chǔ)字段名及字段值。
  [圖片上傳失敗...(image-33b26f-1531294704003)]

3、collectAsList:獲取所有數(shù)據(jù)到List

功能和collect類似,只不過(guò)將返回結(jié)構(gòu)變成了List對(duì)象,使用方法如下

jdbcDF.collectAsList()

結(jié)果如下,
  [圖片上傳失敗...(image-3d76a7-1531294704003)]

4、describe(cols: String*):獲取指定字段的統(tǒng)計(jì)信息

這個(gè)方法可以動(dòng)態(tài)的傳入一個(gè)或多個(gè)String類型的字段名,結(jié)果仍然為DataFrame對(duì)象,用于統(tǒng)計(jì)數(shù)值類型字段的統(tǒng)計(jì)值,比如count, mean, stddev, min, max等。
  使用方法如下,其中c1字段為字符類型,c2字段為整型,c4字段為浮點(diǎn)型

jdbcDF .describe("c1" , "c2", "c4" ).show()

結(jié)果如下,
  [圖片上傳失敗...(image-6a3df1-1531294704003)]

5、first, head, take, takeAsList:獲取若干行記錄

這里列出的四個(gè)方法比較類似,其中
 ?。?)first獲取第一行記錄
 ?。?)head獲取第一行記錄,head(n: Int)獲取前n行記錄
 ?。?)take(n: Int)獲取前n行數(shù)據(jù)
 ?。?)takeAsList(n: Int)獲取前n行數(shù)據(jù),并以List的形式展現(xiàn)
  以Row或者Array[Row]的形式返回一行或多行數(shù)據(jù)。firsthead功能相同。
  taketakeAsList方法會(huì)將獲得到的數(shù)據(jù)返回到Driver端,所以,使用這兩個(gè)方法時(shí)需要注意數(shù)據(jù)量,以免Driver發(fā)生OutOfMemoryError

使用和結(jié)果略。

二、DataFrame對(duì)象上的條件查詢和join等操作

以下返回為DataFrame類型的方法,可以連續(xù)調(diào)用。

1、where條件相關(guān)

(1)where(conditionExpr: String):SQL語(yǔ)言中where關(guān)鍵字后的條件
  傳入篩選條件表達(dá)式,可以用andor。得到DataFrame類型的返回結(jié)果,
  示例:

jdbcDF .where("id = 1 or c1 = 'b'" ).show()

結(jié)果,
  [圖片上傳失敗...(image-6fe28f-1531294704001)]

(2)filter:根據(jù)字段進(jìn)行篩選
  傳入篩選條件表達(dá)式,得到DataFrame類型的返回結(jié)果。和where使用條件相同
  示例:

jdbcDF .filter("id = 1 or c1 = 'b'" ).show()

結(jié)果,
  [圖片上傳失敗...(image-10ab81-1531294704001)]

2、查詢指定字段

(1)select:獲取指定字段值
  根據(jù)傳入的String類型字段名,獲取指定字段的值,以DataFrame類型返回
  示例:

jdbcDF.select( "id" , "c3" ).show( false)

結(jié)果:
  [圖片上傳失敗...(image-90815d-1531294704001)]

還有一個(gè)重載的select方法,不是傳入String類型參數(shù),而是傳入Column類型參數(shù)??梢詫?shí)現(xiàn)select id, id+1 from test這種邏輯。

jdbcDF.select(jdbcDF( "id" ), jdbcDF( "id") + 1 ).show( false)

結(jié)果:
  [圖片上傳失敗...(image-11829f-1531294704001)]

能得到Column類型的方法是apply以及col方法,一般用apply方法更簡(jiǎn)便。

(2)selectExpr:可以對(duì)指定字段進(jìn)行特殊處理
  可以直接對(duì)指定字段調(diào)用UDF函數(shù),或者指定別名等。傳入String類型參數(shù),得到DataFrame對(duì)象。
  示例,查詢id字段,c3字段取別名timec4字段四舍五入:

jdbcDF .selectExpr("id" , "c3 as time" , "round(c4)" ).show(false)

結(jié)果,
  [圖片上傳失敗...(image-f6977c-1531294704000)]

(3)col:獲取指定字段
  只能獲取一個(gè)字段,返回對(duì)象為Column類型。
  val idCol = jdbcDF.col(“id”)果略。

(4)apply:獲取指定字段
  只能獲取一個(gè)字段,返回對(duì)象為Column類型
  示例:

val idCol1 = jdbcDF.apply("id")
val idCol2 = jdbcDF("id")

結(jié)果略。

(5)drop:去除指定字段,保留其他字段
  返回一個(gè)新的DataFrame對(duì)象,其中不包含去除的字段,一次只能去除一個(gè)字段。
  示例:

jdbcDF.drop("id")
jdbcDF.drop(jdbcDF("id"))

結(jié)果:
  [圖片上傳失敗...(image-6f7b58-1531294704000)]

3、limit

limit方法獲取指定DataFrame的前n行記錄,得到一個(gè)新的DataFrame對(duì)象。和takehead不同的是,limit方法不是Action操作。

jdbcDF.limit(3).show( false)

結(jié)果,
  [圖片上傳失敗...(image-2249e4-1531294704000)]

4、order by

(1)orderBysort:按指定字段排序,默認(rèn)為升序
  示例1,按指定字段排序。加個(gè)-表示降序排序。sortorderBy使用方法相同

jdbcDF.orderBy(- jdbcDF("c4")).show(false)
// 或者
jdbcDF.orderBy(jdbcDF("c4").desc).show(false)

結(jié)果,
  [圖片上傳失敗...(image-edc65-1531294704000)]

示例2,按字段字符串升序排序

jdbcDF.orderBy("c4").show(false)

結(jié)果,
  [圖片上傳失敗...(image-24e536-1531294704000)]

(2)sortWithinPartitions
  和上面的sort方法功能類似,區(qū)別在于sortWithinPartitions方法返回的是按Partition排好序的DataFrame對(duì)象。

5、group by

(1)groupBy:根據(jù)字段進(jìn)行group by操作
  groupBy方法有兩種調(diào)用方式,可以傳入String類型的字段名,也可傳入Column類型的對(duì)象。
  使用方法如下,

jdbcDF .groupBy("c1" )
jdbcDF.groupBy( jdbcDF( "c1"))

(2)cuberollup:group by的擴(kuò)展
  功能類似于SQL中的group by cube/rollup,略。

(3)GroupedData對(duì)象
  該方法得到的是GroupedData類型對(duì)象,在GroupedData的API中提供了group by之后的操作,比如,

  • max(colNames: String*)方法,獲取分組中指定字段或者所有的數(shù)字類型字段的最大值,只能作用于數(shù)字型字段

  • min(colNames: String*)方法,獲取分組中指定字段或者所有的數(shù)字類型字段的最小值,只能作用于數(shù)字型字段

  • mean(colNames: String*)方法,獲取分組中指定字段或者所有的數(shù)字類型字段的平均值,只能作用于數(shù)字型字段

  • sum(colNames: String*)方法,獲取分組中指定字段或者所有的數(shù)字類型字段的和值,只能作用于數(shù)字型字段

  • count()方法,獲取分組中的元素個(gè)數(shù)

    運(yùn)行結(jié)果示例:
      count
      [圖片上傳失敗...(image-116886-1531294703991)]

    max
      [圖片上傳失敗...(image-2203f6-1531294703991)]

    這里面比較復(fù)雜的是以下兩個(gè)方法,
    agg,該方法和下面介紹的類似,可以用于對(duì)指定字段進(jìn)行聚合操作。

pivot

6、distinct

(1)distinct:返回一個(gè)不包含重復(fù)記錄的DataFrame
  返回當(dāng)前DataFrame中不重復(fù)的Row記錄。該方法和接下來(lái)的dropDuplicates()方法不傳入指定字段時(shí)的結(jié)果相同。
  示例:

jdbcDF.distinct()

結(jié)果,
  [圖片上傳失敗...(image-b4c9e1-1531294703999)]

(2)dropDuplicates:根據(jù)指定字段去重
  根據(jù)指定字段去重。類似于select distinct a, b操作
  示例:

jdbcDF.dropDuplicates(Seq("c1"))

結(jié)果:
  [圖片上傳失敗...(image-3ca46-1531294703999)]

7、聚合

聚合操作調(diào)用的是agg方法,該方法有多種調(diào)用方式。一般與groupBy方法配合使用。
  以下示例其中最簡(jiǎn)單直觀的一種用法,對(duì)id字段求最大值,對(duì)c4字段求和。

jdbcDF.agg("id" -> "max", "c4" -> "sum")

結(jié)果:
  [圖片上傳失敗...(image-7afa72-1531294703999)]

8、union

unionAll方法:對(duì)兩個(gè)DataFrame進(jìn)行組合
  類似于SQL中的UNION ALL操作。
  示例:

jdbcDF.unionALL(jdbcDF.limit(1))

結(jié)果:
  [圖片上傳失敗...(image-f861cb-1531294703999)]

9、join

重點(diǎn)來(lái)了。在SQL語(yǔ)言中用得很多的就是join操作,DataFrame中同樣也提供了join的功能。
  接下來(lái)隆重介紹join方法。在DataFrame中提供了六個(gè)重載的join方法。
(1)、笛卡爾積

joinDF1.join(joinDF2)

(2)、using一個(gè)字段形式
  下面這種join類似于a join b using column1的形式,需要兩個(gè)DataFrame中有相同的一個(gè)列名,

joinDF1.join(joinDF2, "id")

joinDF1joinDF2根據(jù)字段id進(jìn)行join操作,結(jié)果如下,using字段只顯示一次。
  [圖片上傳失敗...(image-22eb0a-1531294703998)]

(3)、using多個(gè)字段形式
  除了上面這種using一個(gè)字段的情況外,還可以using多個(gè)字段,如下

joinDF1.join(joinDF2, Seq("id", "name"))

(4)、指定join類型
  兩個(gè)DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi類型。在上面的using多個(gè)字段的join情況下,可以寫(xiě)第三個(gè)String類型參數(shù),指定join的類型,如下所示

joinDF1.join(joinDF2, Seq("id", "name"), "inner")

(5)、使用Column類型來(lái)join
  如果不用using模式,靈活指定join字段的話,可以使用如下形式

joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"))

結(jié)果如下,
  [圖片上傳失敗...(image-773330-1531294703997)]

(6)、在指定join字段同時(shí)指定join類型
  如下所示

joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"), "inner")

10、獲取指定字段統(tǒng)計(jì)信息

stat方法可以用于計(jì)算指定字段或指定字段之間的統(tǒng)計(jì)信息,比如方差,協(xié)方差等。這個(gè)方法返回一個(gè)DataFramesStatFunctions類型對(duì)象。
  下面代碼演示根據(jù)c4字段,統(tǒng)計(jì)該字段值出現(xiàn)頻率在30%以上的內(nèi)容。在jdbcDF中字段c1的內(nèi)容為"a, b, a, c, d, b"。其中ab出現(xiàn)的頻率為2 / 6,大于0.3

jdbcDF.stat.freqItems(Seq ("c1") , 0.3).show()

結(jié)果如下:
  [圖片上傳失敗...(image-9dfa31-1531294703997)]

11、獲取兩個(gè)DataFrame中共有的記錄

intersect方法可以計(jì)算出兩個(gè)DataFrame中相同的記錄,

jdbcDF.intersect(jdbcDF.limit(1)).show(false)

結(jié)果如下:
  [圖片上傳失敗...(image-5fc1b9-1531294703997)]

12、獲取一個(gè)DataFrame中有另一個(gè)DataFrame中沒(méi)有的記錄

示例:

jdbcDF.except(jdbcDF.limit(1)).show(false)

結(jié)果如下,
  [圖片上傳失敗...(image-a2f264-1531294703997)]

13、操作字段名

(1)withColumnRenamed:重命名DataFrame中的指定字段名
  如果指定的字段名不存在,不進(jìn)行任何操作。下面示例中將jdbcDF中的id字段重命名為idx。

jdbcDF.withColumnRenamed( "id" , "idx" )

結(jié)果如下:
  [圖片上傳失敗...(image-73edbb-1531294703997)]

(2)withColumn:往當(dāng)前DataFrame中新增一列
  whtiColumn(colName: String , col: Column)方法根據(jù)指定colName往DataFrame中新增一列,如果colName已存在,則會(huì)覆蓋當(dāng)前列。
  以下代碼往jdbcDF中新增一個(gè)名為id2的列,

jdbcDF.withColumn("id2", jdbcDF("id")).show( false)

結(jié)果如下,
  [圖片上傳失敗...(image-725494-1531294703997)]

14、行轉(zhuǎn)列

有時(shí)候需要根據(jù)某個(gè)字段內(nèi)容進(jìn)行分割,然后生成多行,這時(shí)可以使用explode方法
  下面代碼中,根據(jù)c3字段中的空格將字段內(nèi)容進(jìn)行分割,分割的內(nèi)容存儲(chǔ)在新的字段c3_中,如下所示

jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}

結(jié)果如下,
  [圖片上傳失敗...(image-578204-1531294703997)]

15、其他操作

API中還有na, randomSplit, repartition, alias, as方法,待后續(xù)補(bǔ)充。

原文鏈接

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容