原文鏈接
Spark SQL中的DataFrame類似于一張關(guān)系型數(shù)據(jù)表。在關(guān)系型數(shù)據(jù)庫(kù)中對(duì)單表或進(jìn)行的查詢操作,在DataFrame中都可以通過(guò)調(diào)用其API接口來(lái)實(shí)現(xiàn)??梢詤⒖?,Scala提供的DataFrame API。
本文中的代碼基于Spark-2.2.1的文檔實(shí)現(xiàn)。
一、DataFrame對(duì)象的生成
Spark-SQL可以以其他RDD對(duì)象、parquet文件、json文件、hive表,以及通過(guò)JDBC連接到其他關(guān)系型數(shù)據(jù)庫(kù)作為數(shù)據(jù)源來(lái)生成DataFrame對(duì)象。本文將以MySQL數(shù)據(jù)庫(kù)為數(shù)據(jù)源,生成DataFrame對(duì)象后進(jìn)行相關(guān)的DataFame之上的操作。
文中生成DataFrame的代碼如下:
object DataFrameOperations {
def main (args: Array[String ]) {
val sparkConf = new SparkConf().setAppName( "Spark SQL DataFrame Operations").setMaster( "local[2]" )
val sparkContext = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sparkContext)
val url = "jdbc:mysql://m000:3306/test"
val jdbcDF = sqlContext.read.format( "jdbc" ).options(
Map( "url" -> url,
"user" -> "root",
"password" -> "root",
"dbtable" -> "spark_sql_test" )).load()
val joinDF1 = sqlContext.read.format( "jdbc" ).options(
Map("url" -> url ,
"user" -> "root",
"password" -> "root",
"dbtable" -> "spark_sql_join1" )).load()
val joinDF2 = sqlContext.read.format( "jdbc" ).options(
Map ( "url" -> url ,
"user" -> "root",
"password" -> "root",
"dbtable" -> "spark_sql_join2" )).load()
... ...
}
}
后續(xù)代碼都在上面... ...處。
二、DataFrame對(duì)象上Action操作
1、show:展示數(shù)據(jù)
以表格的形式在輸出中展示jdbcDF中的數(shù)據(jù),類似于select * from spark_sql_test的功能。
show方法有四種調(diào)用方式,分別為,
(1)show
只顯示前20條記錄。
示例:
jdbcDF.show
結(jié)果:
[圖片上傳失敗...(image-74ce47-1531294704004)]
(2)show(numRows: Int)
顯示numRows條
示例:
jdbcDF.show(3)
結(jié)果:
[圖片上傳失敗...(image-97195f-1531294704004)]
(3)show(truncate: Boolean)
是否最多只顯示20個(gè)字符,默認(rèn)為true。
示例:
jdbcDF.show(true)
jdbcDF.show(false)
結(jié)果:
[圖片上傳失敗...(image-88e83a-1531294704004)]
(4)show(numRows: Int, truncate: Boolean)
綜合前面的顯示記錄條數(shù),以及對(duì)過(guò)長(zhǎng)字符串的顯示格式。
示例:
jdbcDF.show(3, false)
結(jié)果:
[圖片上傳失敗...(image-9da1e7-1531294704003)]
2、collect:獲取所有數(shù)據(jù)到數(shù)組
不同于前面的show方法,這里的collect方法會(huì)將jdbcDF中的所有數(shù)據(jù)都獲取到,并返回一個(gè)Array對(duì)象。
jdbcDF.collect()
結(jié)果如下,結(jié)果數(shù)組包含了jdbcDF的每一條記錄,每一條記錄由一個(gè)GenericRowWithSchema對(duì)象來(lái)表示,可以存儲(chǔ)字段名及字段值。
[圖片上傳失敗...(image-33b26f-1531294704003)]
3、collectAsList:獲取所有數(shù)據(jù)到List
功能和collect類似,只不過(guò)將返回結(jié)構(gòu)變成了List對(duì)象,使用方法如下
jdbcDF.collectAsList()
結(jié)果如下,
[圖片上傳失敗...(image-3d76a7-1531294704003)]
4、describe(cols: String*):獲取指定字段的統(tǒng)計(jì)信息
這個(gè)方法可以動(dòng)態(tài)的傳入一個(gè)或多個(gè)String類型的字段名,結(jié)果仍然為DataFrame對(duì)象,用于統(tǒng)計(jì)數(shù)值類型字段的統(tǒng)計(jì)值,比如count, mean, stddev, min, max等。
使用方法如下,其中c1字段為字符類型,c2字段為整型,c4字段為浮點(diǎn)型
jdbcDF .describe("c1" , "c2", "c4" ).show()
結(jié)果如下,
[圖片上傳失敗...(image-6a3df1-1531294704003)]
5、first, head, take, takeAsList:獲取若干行記錄
這里列出的四個(gè)方法比較類似,其中
?。?)first獲取第一行記錄
?。?)head獲取第一行記錄,head(n: Int)獲取前n行記錄
?。?)take(n: Int)獲取前n行數(shù)據(jù)
?。?)takeAsList(n: Int)獲取前n行數(shù)據(jù),并以List的形式展現(xiàn)
以Row或者Array[Row]的形式返回一行或多行數(shù)據(jù)。first和head功能相同。
take和takeAsList方法會(huì)將獲得到的數(shù)據(jù)返回到Driver端,所以,使用這兩個(gè)方法時(shí)需要注意數(shù)據(jù)量,以免Driver發(fā)生OutOfMemoryError
使用和結(jié)果略。
二、DataFrame對(duì)象上的條件查詢和join等操作
以下返回為DataFrame類型的方法,可以連續(xù)調(diào)用。
1、where條件相關(guān)
(1)where(conditionExpr: String):SQL語(yǔ)言中where關(guān)鍵字后的條件
傳入篩選條件表達(dá)式,可以用and和or。得到DataFrame類型的返回結(jié)果,
示例:
jdbcDF .where("id = 1 or c1 = 'b'" ).show()
結(jié)果,
[圖片上傳失敗...(image-6fe28f-1531294704001)]
(2)filter:根據(jù)字段進(jìn)行篩選
傳入篩選條件表達(dá)式,得到DataFrame類型的返回結(jié)果。和where使用條件相同
示例:
jdbcDF .filter("id = 1 or c1 = 'b'" ).show()
結(jié)果,
[圖片上傳失敗...(image-10ab81-1531294704001)]
2、查詢指定字段
(1)select:獲取指定字段值
根據(jù)傳入的String類型字段名,獲取指定字段的值,以DataFrame類型返回
示例:
jdbcDF.select( "id" , "c3" ).show( false)
結(jié)果:
[圖片上傳失敗...(image-90815d-1531294704001)]
還有一個(gè)重載的select方法,不是傳入String類型參數(shù),而是傳入Column類型參數(shù)??梢詫?shí)現(xiàn)select id, id+1 from test這種邏輯。
jdbcDF.select(jdbcDF( "id" ), jdbcDF( "id") + 1 ).show( false)
結(jié)果:
[圖片上傳失敗...(image-11829f-1531294704001)]
能得到Column類型的方法是apply以及col方法,一般用apply方法更簡(jiǎn)便。
(2)selectExpr:可以對(duì)指定字段進(jìn)行特殊處理
可以直接對(duì)指定字段調(diào)用UDF函數(shù),或者指定別名等。傳入String類型參數(shù),得到DataFrame對(duì)象。
示例,查詢id字段,c3字段取別名time,c4字段四舍五入:
jdbcDF .selectExpr("id" , "c3 as time" , "round(c4)" ).show(false)
結(jié)果,
[圖片上傳失敗...(image-f6977c-1531294704000)]
(3)col:獲取指定字段
只能獲取一個(gè)字段,返回對(duì)象為Column類型。
val idCol = jdbcDF.col(“id”)果略。
(4)apply:獲取指定字段
只能獲取一個(gè)字段,返回對(duì)象為Column類型
示例:
val idCol1 = jdbcDF.apply("id")
val idCol2 = jdbcDF("id")
結(jié)果略。
(5)drop:去除指定字段,保留其他字段
返回一個(gè)新的DataFrame對(duì)象,其中不包含去除的字段,一次只能去除一個(gè)字段。
示例:
jdbcDF.drop("id")
jdbcDF.drop(jdbcDF("id"))
結(jié)果:
[圖片上傳失敗...(image-6f7b58-1531294704000)]
3、limit
limit方法獲取指定DataFrame的前n行記錄,得到一個(gè)新的DataFrame對(duì)象。和take與head不同的是,limit方法不是Action操作。
jdbcDF.limit(3).show( false)
結(jié)果,
[圖片上傳失敗...(image-2249e4-1531294704000)]
4、order by
(1)orderBy和sort:按指定字段排序,默認(rèn)為升序
示例1,按指定字段排序。加個(gè)-表示降序排序。sort和orderBy使用方法相同
jdbcDF.orderBy(- jdbcDF("c4")).show(false)
// 或者
jdbcDF.orderBy(jdbcDF("c4").desc).show(false)
結(jié)果,
[圖片上傳失敗...(image-edc65-1531294704000)]
示例2,按字段字符串升序排序
jdbcDF.orderBy("c4").show(false)
結(jié)果,
[圖片上傳失敗...(image-24e536-1531294704000)]
(2)sortWithinPartitions
和上面的sort方法功能類似,區(qū)別在于sortWithinPartitions方法返回的是按Partition排好序的DataFrame對(duì)象。
5、group by
(1)groupBy:根據(jù)字段進(jìn)行group by操作
groupBy方法有兩種調(diào)用方式,可以傳入String類型的字段名,也可傳入Column類型的對(duì)象。
使用方法如下,
jdbcDF .groupBy("c1" )
jdbcDF.groupBy( jdbcDF( "c1"))
(2)cube和rollup:group by的擴(kuò)展
功能類似于SQL中的group by cube/rollup,略。
(3)GroupedData對(duì)象
該方法得到的是GroupedData類型對(duì)象,在GroupedData的API中提供了group by之后的操作,比如,
max(colNames: String*)方法,獲取分組中指定字段或者所有的數(shù)字類型字段的最大值,只能作用于數(shù)字型字段min(colNames: String*)方法,獲取分組中指定字段或者所有的數(shù)字類型字段的最小值,只能作用于數(shù)字型字段mean(colNames: String*)方法,獲取分組中指定字段或者所有的數(shù)字類型字段的平均值,只能作用于數(shù)字型字段sum(colNames: String*)方法,獲取分組中指定字段或者所有的數(shù)字類型字段的和值,只能作用于數(shù)字型字段-
count()方法,獲取分組中的元素個(gè)數(shù)運(yùn)行結(jié)果示例:
count
[圖片上傳失敗...(image-116886-1531294703991)]max
[圖片上傳失敗...(image-2203f6-1531294703991)]這里面比較復(fù)雜的是以下兩個(gè)方法,
agg,該方法和下面介紹的類似,可以用于對(duì)指定字段進(jìn)行聚合操作。
pivot
6、distinct
(1)distinct:返回一個(gè)不包含重復(fù)記錄的DataFrame
返回當(dāng)前DataFrame中不重復(fù)的Row記錄。該方法和接下來(lái)的dropDuplicates()方法不傳入指定字段時(shí)的結(jié)果相同。
示例:
jdbcDF.distinct()
結(jié)果,
[圖片上傳失敗...(image-b4c9e1-1531294703999)]
(2)dropDuplicates:根據(jù)指定字段去重
根據(jù)指定字段去重。類似于select distinct a, b操作
示例:
jdbcDF.dropDuplicates(Seq("c1"))
結(jié)果:
[圖片上傳失敗...(image-3ca46-1531294703999)]
7、聚合
聚合操作調(diào)用的是agg方法,該方法有多種調(diào)用方式。一般與groupBy方法配合使用。
以下示例其中最簡(jiǎn)單直觀的一種用法,對(duì)id字段求最大值,對(duì)c4字段求和。
jdbcDF.agg("id" -> "max", "c4" -> "sum")
結(jié)果:
[圖片上傳失敗...(image-7afa72-1531294703999)]
8、union
unionAll方法:對(duì)兩個(gè)DataFrame進(jìn)行組合
類似于SQL中的UNION ALL操作。
示例:
jdbcDF.unionALL(jdbcDF.limit(1))
結(jié)果:
[圖片上傳失敗...(image-f861cb-1531294703999)]
9、join
重點(diǎn)來(lái)了。在SQL語(yǔ)言中用得很多的就是join操作,DataFrame中同樣也提供了join的功能。
接下來(lái)隆重介紹join方法。在DataFrame中提供了六個(gè)重載的join方法。
(1)、笛卡爾積
joinDF1.join(joinDF2)
(2)、using一個(gè)字段形式
下面這種join類似于a join b using column1的形式,需要兩個(gè)DataFrame中有相同的一個(gè)列名,
joinDF1.join(joinDF2, "id")
joinDF1和joinDF2根據(jù)字段id進(jìn)行join操作,結(jié)果如下,using字段只顯示一次。
[圖片上傳失敗...(image-22eb0a-1531294703998)]
(3)、using多個(gè)字段形式
除了上面這種using一個(gè)字段的情況外,還可以using多個(gè)字段,如下
joinDF1.join(joinDF2, Seq("id", "name"))
(4)、指定join類型
兩個(gè)DataFrame的join操作有inner, outer, left_outer, right_outer, leftsemi類型。在上面的using多個(gè)字段的join情況下,可以寫(xiě)第三個(gè)String類型參數(shù),指定join的類型,如下所示
joinDF1.join(joinDF2, Seq("id", "name"), "inner")
(5)、使用Column類型來(lái)join
如果不用using模式,靈活指定join字段的話,可以使用如下形式
joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"))
結(jié)果如下,
[圖片上傳失敗...(image-773330-1531294703997)]
(6)、在指定join字段同時(shí)指定join類型
如下所示
joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"), "inner")
10、獲取指定字段統(tǒng)計(jì)信息
stat方法可以用于計(jì)算指定字段或指定字段之間的統(tǒng)計(jì)信息,比如方差,協(xié)方差等。這個(gè)方法返回一個(gè)DataFramesStatFunctions類型對(duì)象。
下面代碼演示根據(jù)c4字段,統(tǒng)計(jì)該字段值出現(xiàn)頻率在30%以上的內(nèi)容。在jdbcDF中字段c1的內(nèi)容為"a, b, a, c, d, b"。其中a和b出現(xiàn)的頻率為2 / 6,大于0.3
jdbcDF.stat.freqItems(Seq ("c1") , 0.3).show()
結(jié)果如下:
[圖片上傳失敗...(image-9dfa31-1531294703997)]
11、獲取兩個(gè)DataFrame中共有的記錄
intersect方法可以計(jì)算出兩個(gè)DataFrame中相同的記錄,
jdbcDF.intersect(jdbcDF.limit(1)).show(false)
結(jié)果如下:
[圖片上傳失敗...(image-5fc1b9-1531294703997)]
12、獲取一個(gè)DataFrame中有另一個(gè)DataFrame中沒(méi)有的記錄
示例:
jdbcDF.except(jdbcDF.limit(1)).show(false)
結(jié)果如下,
[圖片上傳失敗...(image-a2f264-1531294703997)]
13、操作字段名
(1)withColumnRenamed:重命名DataFrame中的指定字段名
如果指定的字段名不存在,不進(jìn)行任何操作。下面示例中將jdbcDF中的id字段重命名為idx。
jdbcDF.withColumnRenamed( "id" , "idx" )
結(jié)果如下:
[圖片上傳失敗...(image-73edbb-1531294703997)]
(2)withColumn:往當(dāng)前DataFrame中新增一列
whtiColumn(colName: String , col: Column)方法根據(jù)指定colName往DataFrame中新增一列,如果colName已存在,則會(huì)覆蓋當(dāng)前列。
以下代碼往jdbcDF中新增一個(gè)名為id2的列,
jdbcDF.withColumn("id2", jdbcDF("id")).show( false)
結(jié)果如下,
[圖片上傳失敗...(image-725494-1531294703997)]
14、行轉(zhuǎn)列
有時(shí)候需要根據(jù)某個(gè)字段內(nèi)容進(jìn)行分割,然后生成多行,這時(shí)可以使用explode方法
下面代碼中,根據(jù)c3字段中的空格將字段內(nèi)容進(jìn)行分割,分割的內(nèi)容存儲(chǔ)在新的字段c3_中,如下所示
jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}
結(jié)果如下,
[圖片上傳失敗...(image-578204-1531294703997)]
15、其他操作
API中還有na, randomSplit, repartition, alias, as方法,待后續(xù)補(bǔ)充。