Spark SQL和DataFrames重要的類有:
- pyspark.sql.SQLContext: DataFrame和SQL方法的主入口
- pyspark.sql.DataFrame: 將分布式數(shù)據(jù)集分組到指定列名的數(shù)據(jù)框中
- pyspark.sql.Column :DataFrame中的列
- pyspark.sql.Row: DataFrame數(shù)據(jù)的行
- pyspark.sql.HiveContext: 訪問Hive數(shù)據(jù)的主入口
- pyspark.sql.GroupedData: 由DataFrame.groupBy()創(chuàng)建的聚合方法集
- pyspark.sql.DataFrameNaFunctions: 處理丟失數(shù)據(jù)(空數(shù)據(jù))的方法
- pyspark.sql.DataFrameStatFunctions: 統(tǒng)計(jì)功能的方法
-pyspark.sql.functions DataFrame:可用的內(nèi)置函數(shù)
- pyspark.sql.types: 可用的數(shù)據(jù)類型列表
- pyspark.sql.Window: 用于處理窗口函數(shù)
3.class pyspark.sql.GroupedData(jdf,sql_ctx):由DataFrame.groupBy()創(chuàng)建的DataFrame上的一組聚合方法。
3.1.agg(*exprs):計(jì)算聚合并將結(jié)果作為DataFrame返回,可用的聚合函數(shù)有avg,min,max,sum,count.如果exprs是從字符串到字符串的單個(gè)字典映射,那么鍵是要執(zhí)行聚合的列,值是聚合函數(shù)。另外,exprs也可以是聚合列表達(dá)式的列表
gdf = df.GroupBy(df.name)
gdf.agg({'*':count'}).collect()
[Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)]
from pyspark.sql import function as F
gdf.agg(G.min(df.age).collect())
[Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]
3.2.avg(*exprs):計(jì)算每個(gè)組的每個(gè)數(shù)字的列的平均值,mean()是avg()的別名。
>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.groupBy().avg('age').collect()
[Row(avg(age)=3.5)]
>>> l3=[('Alice',2,85),('Bob',5,80)]
>>> df3 = sqlContext.createDataFrame(l3,['name','age','height'])
>>> df3.groupBy().avg('age', 'height').collect()
[Row(avg(age)=3.5, avg(height)=82.5)]
3.3.count(*exprs):統(tǒng)計(jì)每個(gè)組的記錄數(shù)。
>>> df.groupBy(df.age).count().collect()
[Row(age=2, count=1), Row(age=5, count=1)]
3.4.max(*exprs):計(jì)算每個(gè)組的每個(gè)數(shù)字列的最大值。
>>> df.groupBy().max('age').collect()
[Row(max(age)=5)]
>>> df3.groupBy().max('age', 'height').collect()
[Row(max(age)=5, max(height)=85)]
3.5.mean(*exprs):計(jì)算每個(gè)組的每個(gè)數(shù)字列的平均值。mean()是avg()的別名。
3.6.min(*exprs):計(jì)算每個(gè)組的每個(gè)數(shù)字列的最小值。
>>> df.groupBy().min('age').collect()
[Row(min(age)=2)]
>>> df3.groupBy().min('age', 'height').collect()
[Row(min(age)=2, min(height)=80)]
3.7.pivot(pivot_col,value=None): 旋轉(zhuǎn)當(dāng)前DataFrame的列并執(zhí)行指定的聚合。有兩個(gè)版本的透視函數(shù):一個(gè)需要調(diào)用者指定不同值的列表以進(jìn)行透視,另一個(gè)不需要。后者更簡潔但效率低,因?yàn)閟park需要首先在內(nèi)部計(jì)算不同值的列表。
1.pivot_col:要旋轉(zhuǎn)的列的名稱
2.values:將被旋轉(zhuǎn)為輸出DataFrame中的列表值的列表
// 計(jì)算每個(gè)課程每年的收入總和作為一個(gè)單獨(dú)的列
>>> l4=[(2012,'dotNET',10000),(2012,'dotNET',5000),(2012,'Java',20000),(2013,'dotNET',48000),(2013,'Java',30000)]
>>> df4 = sqlContext.createDataFrame(l4,['year','course','earnings'])
>>> df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect()
[Row(year=2012, dotNET=15000, Java=20000), Row(year=2013, dotNET=48000, Java=30000)]
// 或者不指定列值(效率較低)
>>> df4.groupBy("year").pivot("course").sum("earnings").collect()
[Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)]
3.6.sum(*exprs):計(jì)算每個(gè)組的每個(gè)數(shù)字列的總和。
>>> df.groupBy().sum('age').collect()
[Row(sum(age)=7)]
>>> df3.groupBy().sum('age', 'height').collect()
[Row(sum(age)=7, sum(height)=165)]