PySpark筆記(三):DataFrame

DataFrame是在Spark 1.3中正式引入的一種以RDD為基礎(chǔ)的不可變的分布式數(shù)據(jù)集,類(lèi)似于傳統(tǒng)數(shù)據(jù)庫(kù)的二維表格,數(shù)據(jù)在其中以列的形式被組織存儲(chǔ)。如果熟悉Pandas,其與Pandas DataFrame是非常類(lèi)似的東西。

DataFrame API受到R和Python(Pandas)中的數(shù)據(jù)框架的啟發(fā),但是從底層開(kāi)始設(shè)計(jì)以支持現(xiàn)代大數(shù)據(jù)和數(shù)據(jù)科學(xué)應(yīng)用程序。作為現(xiàn)有RDD API的擴(kuò)展,DataFrame具有以下功能:

  • 能夠從單臺(tái)筆記本電腦上的千字節(jié)數(shù)據(jù)擴(kuò)展到大型群集上的PB級(jí)數(shù)據(jù)
  • 支持各種數(shù)據(jù)格式和存儲(chǔ)系統(tǒng)
  • 通過(guò)Spark SQL Catalyst優(yōu)化器實(shí)現(xiàn)最先進(jìn)的優(yōu)化和代碼生成
  • 通過(guò)Spark無(wú)縫集成所有大數(shù)據(jù)工具和基礎(chǔ)架構(gòu)
  • Python,Java,Scala和R的API(通過(guò)SparkR開(kāi)發(fā))
  • 對(duì)于熟悉其他編程語(yǔ)言數(shù)據(jù)框架的新用戶(hù),此API應(yīng)該讓他們感到賓至如歸。對(duì)于現(xiàn)有的Spark用戶(hù),此擴(kuò)展API將使Spark更易于編程,同時(shí)通過(guò)智能優(yōu)化和代碼生成來(lái)提高性能。

通過(guò)DataFrame與Catalyst優(yōu)化器,現(xiàn)有的Spark程序遷移到DataFrame時(shí)性能得到改善。由于優(yōu)化器生成用于執(zhí)行的JVM字節(jié)碼,因此Python用戶(hù)將體驗(yàn)到與Scala和Java用戶(hù)相同的高性能。


performance.png

創(chuàng)建DataFrame

Spark中有兩種方式可以將數(shù)據(jù)從RDD轉(zhuǎn)化為DataFrame:反射推斷或者編程指定。反射推斷是Spark應(yīng)用程序自動(dòng)識(shí)列的類(lèi)型,然后通過(guò)Spark SQL將行對(duì)象的RDD轉(zhuǎn)換為DataFrame。編程指定則是在運(yùn)行之前,人工從Spark SQL中引入數(shù)據(jù)類(lèi)型分配給不同的列。

使用數(shù)據(jù)結(jié)構(gòu):

data

普通讀取csv為DataFrames數(shù)據(jù)。

# 讀取csv為DataFrame
traffic = spark.read.csv('E:\Documents\Desktop\data.csv', header='true')
# 創(chuàng)建臨時(shí)表
traffic.createOrReplaceTempView("traffic")
# 顯示前10行
traffic.show(10)
show

打印表結(jié)構(gòu),可以看出Spark自動(dòng)將所有列推斷為string,這不是我們想要的類(lèi)型。

traffic.printSchema()
schema

通過(guò)pandas輔助讀取csv。

import pandas as pd 

df = pd.read_csv('E:\Documents\Desktop\data.csv') 
traffic = spark.createDataFrame(df)
traffic.createOrReplaceTempView("traffic")
traffic.printSchema()
schema

反射推斷

traffic = spark.read.csv('E:\Documents\Desktop\data.csv', header='true', inferSchema='true')
traffic.createOrReplaceTempView("traffic")
traffic.show(10)
traffic.printSchema()

inferSchema屬性用來(lái)指示是否使用自動(dòng)推斷,默認(rèn)為False。

schema

編程指定

盡管自動(dòng)推斷比較方便,如果啟用了inferSchema,則函數(shù)將數(shù)據(jù)全部讀入以確定輸入模式。要避免遍歷整個(gè)數(shù)據(jù)一次,應(yīng)該使用模式明確指定模式。

StructField(field, data_type=None, nullable=True, metadata=None)

  • field – Either the name of the field or a StructField object
  • data_type – If present, the DataType of the StructField to create
  • nullable – Whether the field to add should be nullable (default True)
  • metadata – Any additional metadata (default None)
from pyspark.sql.types import *

# 指定DataFrame每個(gè)列的模式
schema = StructType([
... StructField("detectorid", IntegerType()),
... StructField("starttime",StringType()),
... StructField("volume", IntegerType()),
... StructField("speed", FloatType()),
... StructField("occupancy", FloatType())])

# 使用指定模式讀入
traffic = spark.read.csv('E:\Documents\Desktop\data.csv', header='true', schema=schema)
traffic.createOrReplaceTempView("traffic")
traffic.show(10)
traffic.printSchema()
schema

DataFrame查詢(xún)

常用API

select()
投影一組表達(dá)式并返回一個(gè)新的DataFrame。
參數(shù):cols - 列名稱(chēng)(字符串)或表達(dá)式(列)的列表。 如果其中一個(gè)列名是'*',則該列將展開(kāi)以包含當(dāng)前DataFrame中的所有列。

>>> traffic.select("speed").show(5)
+-----+
|speed|
+-----+
|56.52|
|53.54|
|54.64|
|54.94|
|51.65|
+-----+
only showing top 5 rows

filter()
使用給定的條件過(guò)濾行。where()是filter()的別名。
參數(shù):condition - 類(lèi)型的一列.BooleanType或一個(gè)SQL表達(dá)式的字符串。

>>> traffic.filter(traffic.speed > 50).show(5)
+----------+--------------+------+-----+---------+
|detectorid|     starttime|volume|speed|occupancy|
+----------+--------------+------+-----+---------+
|    100625|2015/12/1 0:00|    48|56.52|     1.29|
|    100625|2015/12/1 0:15|    50|53.54|     1.48|
|    100625|2015/12/1 0:30|    25|54.64|     0.62|
|    100625|2015/12/1 0:45|    34|54.94|     0.85|
|    100625|2015/12/1 1:00|    23|51.65|      0.6|
+----------+--------------+------+-----+---------+
only showing top 5 rows
>>> traffic.where(traffic.volume > 50).show(5)
+----------+--------------+------+-----+---------+
|detectorid|     starttime|volume|speed|occupancy|
+----------+--------------+------+-----+---------+
|    100625|2015/12/1 3:45|    61|57.62|     1.65|
|    100625|2015/12/1 4:00|    69| 56.7|     1.89|
|    100625|2015/12/1 4:15|    94|56.53|     2.69|
|    100625|2015/12/1 4:30|    87|55.53|     2.58|
|    100625|2015/12/1 4:45|   161|55.51|     4.62|
+----------+--------------+------+-----+---------+
only showing top 5 rows

drop()
返回刪除指定列的新DataFrame。
參數(shù):cols - 要?jiǎng)h除的列的字符串名稱(chēng),要?jiǎng)h除的列或要?jiǎng)h除的列的字符串名稱(chēng)的列表。

>>> traffic.drop("speed").show(5)
+----------+--------------+------+---------+
|detectorid|     starttime|volume|occupancy|
+----------+--------------+------+---------+
|    100625|2015/12/1 0:00|    48|     1.29|
|    100625|2015/12/1 0:15|    50|     1.48|
|    100625|2015/12/1 0:30|    25|     0.62|
|    100625|2015/12/1 0:45|    34|     0.85|
|    100625|2015/12/1 1:00|    23|      0.6|
+----------+--------------+------+---------+
only showing top 5 rows

cache()
使用默認(rèn)存儲(chǔ)級(jí)別(MEMORY_AND_DISK)持久保存DataFrame。

traffic.cache()

collect()
以Row列表形式返回所有記錄。

traffic.collect()

show()
將前n行打印到控制臺(tái)。
參數(shù):
n - 要顯示的行數(shù)。
truncate - 如果設(shè)置為T(mén)rue,則默認(rèn)截?cái)喑^(guò)20個(gè)字符的字符串。 如果設(shè)置為大于1的數(shù)字,則截?cái)嚅L(zhǎng)字符串以截?cái)嚅L(zhǎng)度并將其右對(duì)齊。

>>> traffic.show(5)
+----------+--------------+------+-----+---------+
|detectorid|     starttime|volume|speed|occupancy|
+----------+--------------+------+-----+---------+
|    100625|2015/12/1 0:00|    48|56.52|     1.29|
|    100625|2015/12/1 0:15|    50|53.54|     1.48|
|    100625|2015/12/1 0:30|    25|54.64|     0.62|
|    100625|2015/12/1 0:45|    34|54.94|     0.85|
|    100625|2015/12/1 1:00|    23|51.65|      0.6|
+----------+--------------+------+-----+---------+
only showing top 5 rows

count()
返回此DataFrame中的行數(shù)。

>>> traffic.count()
17814

columns
以列表形式返回所有列名稱(chēng)。

>>> traffic.columns
['detectorid', 'starttime', 'volume', 'speed', 'occupancy']

dtypes
將所有列名稱(chēng)及其數(shù)據(jù)類(lèi)型作為列表返回。

>>> traffic.dtypes
[('detectorid', 'int'), ('starttime', 'string'), ('volume', 'int'), ('speed', 'double'), ('occupancy', 'double')]

fillna()
替換的空值,別名na.fill()。

參數(shù):
value - int,long,float,string或dict。 用來(lái)替換空值的值。 如果值是字典,則子集將被忽略,并且值必須是從列名(字符串)到替換值的映射。 替換值必須是int,long,float,boolean或string。
子集 - 要考慮的列名稱(chēng)的可選列表。 子集中指定的不具有匹配數(shù)據(jù)類(lèi)型的列將被忽略。 例如,如果value是一個(gè)字符串,并且子集包含一個(gè)非字符串列,則非字符串列將被忽略。

>>> traffic.na.fill(10)
>>> traffic.na.fill({'volume': 0, 'speed': '0'})

corr()
以雙精度值計(jì)算DataFrame的兩列的相關(guān)性。 目前只支持Pearson Correlation Coefficient。 DataFrame.corr()和DataFrameStatFunctions.corr()是彼此的別名。

參數(shù):
col1 - 第一列的名稱(chēng)
col2 - 第二列的名稱(chēng)
方法 - 相關(guān)方法。 目前只支持“Pearson”

>>> traffic.corr("volume", "speed")
-0.588695158526705

cov()
計(jì)算給定列的樣本協(xié)方差(由它們的名稱(chēng)指定)作為雙精度值。 DataFrame.cov()和DataFrameStatFunctions.cov()是別名。

參數(shù):
col1 - 第一列的名稱(chēng)
col2 - 第二列的名稱(chēng)

>>> traffic.cov("volume", "speed")
-1166.285227777989

describe()
計(jì)算數(shù)字和字符串列的統(tǒng)計(jì)信息。
這包括count,mean,stddev,min和max。 如果未給出列,則此函數(shù)將計(jì)算所有數(shù)字或字符串列的統(tǒng)計(jì)信息。

>>> df.describe().show()
+-------+--------------+--------------+------------------+------------------+------------------+
|summary|    detectorid|     starttime|            volume|             speed|         occupancy|
+-------+--------------+--------------+------------------+------------------+------------------+
|  count|         17814|         17814|             17814|             17737|             17814|
|   mean|      100627.5|          null|208.72779836083978| 45.94760105993146|13.775621421354007|
| stddev|1.707873064514|          null|  129.673023730382|15.010086497913619|13.391984211880049|
|    min|        100625|2015/12/1 0:00|                 0|              1.14|               0.0|
|    max|        100630|2015/12/9 9:45|               528|             69.33|             73.25|
+-------+--------------+--------------+------------------+------------------+------------------+
>>> traffic.describe(['speed']).show()
+-------+------------------+
|summary|             speed|
+-------+------------------+
|  count|             17737|
|   mean| 45.94760105993146|
| stddev|15.010086497913619|
|    min|              1.14|
|    max|             69.33|
+-------+------------------+

distinct()
返回包含此DataFrame中不同行的新DataFrame。

>>> traffic.distinct().count()
17814

createOrReplaceGlobalTempView()
使用給定名稱(chēng)創(chuàng)建或替換全局臨時(shí)視圖。
此臨時(shí)視圖的生命周期與此Spark應(yīng)用程序相關(guān)聯(lián)。

>>> traffic.createOrReplaceGlobalTempView("traffic")
>>> df = spark.sql("select * from traffic")
>>> df.count()
17814

createOrReplaceTempView()
使用此DataFrame創(chuàng)建或替換本地臨時(shí)視圖。
此臨時(shí)表的生命周期與用于創(chuàng)建此DataFrame的SparkSession相關(guān)聯(lián)。

>>> traffic.createOrReplaceTempView("traffic")
>>> df = spark.sql("select * from traffic")
>>> df.count()
17814

使用SQL查詢(xún)

由于創(chuàng)建了臨時(shí)表,我們可以對(duì)臨時(shí)表執(zhí)行sql操作。

>>> spark.sql("select * from traffic where volume > 50 and speed > 50").show()
+----------+---------------+------+-----+---------+
|detectorid|      starttime|volume|speed|occupancy|
+----------+---------------+------+-----+---------+
|    100625| 2015/12/1 3:45|    61|57.62|     1.65|
|    100625| 2015/12/1 4:00|    69| 56.7|     1.89|
|    100625| 2015/12/1 4:15|    94|56.53|     2.69|
|    100625| 2015/12/1 4:30|    87|55.53|     2.58|
|    100625| 2015/12/1 4:45|   161|55.51|     4.62|
|    100625| 2015/12/1 5:00|   203|55.41|     5.96|
|    100625| 2015/12/1 5:15|   185|55.14|     6.61|
|    100625| 2015/12/1 5:30|   308|52.39|     9.87|
|    100625| 2015/12/1 5:45|   343|51.01|    11.49|
|    100625|2015/12/1 10:15|   306| 50.6|    11.98|
|    100625|2015/12/1 10:30|   334|51.42|    11.53|
|    100625|2015/12/1 10:45|   349|52.67|    11.51|
|    100625|2015/12/1 11:00|   262|52.36|    10.54|
|    100625|2015/12/1 12:00|   255|52.47|     9.36|
|    100625|2015/12/1 12:15|   346|50.25|    13.44|
|    100625|2015/12/1 12:30|   367| 51.2|    12.47|
|    100625|2015/12/1 12:45|   330|52.78|    11.56|
|    100625|2015/12/1 13:00|   306|52.36|    12.01|
|    100625|2015/12/1 13:30|   371|50.28|    13.93|
|    100625|2015/12/1 13:45|   294|50.62|    12.92|
+----------+---------------+------+-----+---------+
only showing top 20 rows

Dataset

除了DataFrame,Spark 1.6中還引入了Dataset API,其提供了一種類(lèi)型安全的面向?qū)ο蟮木幊探涌?,但是其只能在Java與Scala中使用。Python不能使用該API的原因是因?yàn)槠浔旧聿皇且环N類(lèi)型安全的語(yǔ)言。在Spark 2.0中DataFrame API被整合入如Dataset API,DataFrame是Dataset未類(lèi)型化API的一個(gè)別名。

未類(lèi)型化的API:DataFrame = Dataset[Row]
類(lèi)型化的API:Dataset[T]
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容