Spark 結(jié)構(gòu)化API DataFrames

結(jié)構(gòu)化API

結(jié)構(gòu)化API是處理各種數(shù)據(jù)的工具,從非結(jié)構(gòu)化log文件到半結(jié)構(gòu)化CSV文件和高度結(jié)構(gòu)化的Parquet文件,spark中三種主要的三類結(jié)構(gòu)化API為:

  • Datasets
  • DataFrames
  • SQL tables and views

App dag stage task

DataFrame Dataset

DataFrame、 Dataset是(分布式)1-。4類表集合具有定義良好的行和列
DataFrame無類型(runtime check)
Dataset有類型 (compile time)

對于Spark來說, DataFrame是類型為 RowDataset
特點:

  • immutable
  • lazily evaluated plans

.Scala type reference

Schemas

模式定義了一個DataFrame的列名和類型。
可以手動定義schema或者schema on read
Spark類型直接映射到Spark維護的不同語言api,在Scala、Java、Python、SQLR中,每種api都有一個查詢表,簡單的說最終代碼 使用純spark執(zhí)行( Spark’s internal Catalyst representation)

結(jié)構(gòu)化```API的執(zhí)行流程

  1. 編寫DataFrame/Dataset/SQL 代碼.
  2. 如果代碼是正確的spark將其轉(zhuǎn)化為Logical Plan.
  3. Logical Plan 轉(zhuǎn)為 Physical Plan
    4.Spark 在集群上執(zhí)行Physical Plan (RDD 操作)

Logical Planning

catalogy是一個包含所有tableDataFrame信息的倉庫,用于check代碼是否有問題 (eg:table column 不存在)
check 通過的plan 通過Catalyst Optimizer優(yōu)化
用戶可以擴展Catalyst自定義優(yōu)化規(guī)則

Physical Planning

DataFrams 相關(guān)操作

加載數(shù)據(jù)

df=spark.read.json("file:///usr/local/xldw/2015-summary.json")

或者

df = spark.read.format("json").load("file:///usr/local/xldw/2015-summary.json")

查看schema

df.schema
df.printSchema()
org.apache.spark.sql.types.StructType = ...
StructType(StructField(DEST_COUNTRY_NAME,StringType,true),
StructField(ORIGIN_COUNTRY_NAME,StringType,true),
StructField(count,LongType,true))
  • spark 可以根據(jù)文件的前幾行推斷出schema(schema on read)
  • schemaStructType 實例
  • StructType 由一個StructFields 構(gòu)成
  • Boolean代表這個列是否可以為空

手動指定Schema

// in Scala
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata
val myManualSchema = StructType(Array(StructField("DEST_COUNTRY_NAME", StringType,true),
  StructField("ORIGIN_COUNTRY_NAME", StringType, true),
  StructField("count", LongType, false,
  Metadata.fromJson("{\"hello\":\"world\"}"))
))
val df = spark.read.format("json").schema(myManualSchema).load("/data/flight-data/json/2015-summary.json")

Columns

spark來說, 列是一種邏輯結(jié)構(gòu),它僅表示通過表達式按每個記錄計算的值. 這意味著要為列賦實值,我們需要有一行;為了得到一行,我們需要一個DataFrame

構(gòu)造和引用列的兩種最基本的方式:
colcolumn 方法

Column作為表達式

expr函數(shù)實際上可以解析字符串中的轉(zhuǎn)換和列引用,然后可以將它們傳遞到進一步的轉(zhuǎn)換中,下面三者等價:

col("someCol") - 5
expr("someCol - 5")
expr("someCol") - 5

Columns 只是表達式
這些列和這些列的轉(zhuǎn)換編譯成與解析表達式相同的邏輯計劃

這意味著您可以將表達式編寫為DataFrame代碼或SQL表達式,并獲得完全相同的性能特征

from pyspark.sql.functions import expr, col, column
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"))\
.show(2)

Records and Rows

spark中一行使用Row對象表示,Spark使用列表達式操作行對象,以生成可用的值,行對象在內(nèi)部表示字節(jié)數(shù)組。字節(jié)數(shù)組接口永遠不會顯示給用戶,因為我們只使用列表達式來操作它們

展示

df.show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+

select and selectExpr

df.select("DEST_COUNTRY_NAME").show(2)
-- in SQL
SELECT DEST_COUNTRY_NAME FROM dfTable LIMIT 2

# select muti columns
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

# select function favor
df.select(expr("DEST_COUNTRY_NAME").alias("dest")).show(2)

# select sql favor
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

# selectExpr
df.selectExpr("DEST_COUNTRY_NAME as dest", "ORIGIN_COUNTRY_NAME").show(2)
# selectExpr  opens up the true power of Spark
df.selectExpr("*",
              "(DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME) as withinCountry").show(2)


df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(1)

字面量

df.select(expr("*"), lit(1).alias("One")).show(2)

添加列

df.withColumn("numberOne", lit(1)).show(2)
-- in SQL
SELECT *, 1 as numberOne FROM dfTable LIMIT 2

df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))
.show(2)
//rename
df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns

重命名列

df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

保留字符和關(guān)鍵字

withColumn不用特殊處理
selectExpr需要加上`

dfWithLongColumnName = df.withColumn("this is a long column", expr("ORIGIN_COUNTRY_NAME"))
dfWithLongColumnName.selectExpr("`this is a long column` as `long column`").show(2)

大小寫敏感性

默認spark大小寫不敏感,可以通過配置開啟大小寫敏感

set spark.sql.caseSensitive true

刪除列

df.drop("ORIGIN_COUNTRY_NAME").columns

改變列的類型cast

df.withColumn("count2", col("count").cast("long"))
-- in SQL
SELECT *, cast(count as long) AS count2 FROM dfTabl

過濾行

wherefilter有一樣的過濾功能

df.filter(col("count") < 2).sho
df.where("count < 2").show(2)

spark自動在同一時間執(zhí)行所有過濾操作,而不管過濾器的順序如何。這意味著,如果您想指定多個過濾器,只需按順序?qū)⑺鼈冩溄悠饋?,其余的?code>Spark處理

df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
.show(2)

Unique

df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
-- in SQL
SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable

隨機分片

dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False

Union

from pyspark.sql import Row
schema = df.schema
newRows = [
Row("New Country", "Other Country", 5L),
Row("New Country 2", "Other Country 3", 1L)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")\
.show()

sort

sortorderBy 有相同的排序功能

df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
-- in SQL
SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2

一個高級技巧是使用asc_nulls_first、desc_nulls_first、asc_nulls_lastdesc_nulls_last來指定希望DataFrame空值按順序出現(xiàn)在哪里

出于優(yōu)化目的,有時建議先對每個分區(qū)排序然后執(zhí)行之后的transformations

spark.read.format("json").load("/data/flight-data/json/*-summary.json")\
.sortWithinPartitions("count")

Limit

df.limit(5).show()
-- in SQL
SELECT * FROM dfTable LIMIT 6

df.orderBy(expr("count desc")).limit(6).show()
-- in SQL
SELECT * FROM dfTable ORDER BY count desc LIMIT 6

重新分區(qū)和合并

Sparkdriver程序中維護集群的狀態(tài)。有時您需要向driver程序收集一些數(shù)據(jù),以便在本地機器上對其進行操作
到目前為止,我們還沒有明確定義這個操作。然而,我們使用了幾種不同的方法來實現(xiàn)這一點,它們實際上都是一樣的
collect從整個DataFrame獲取所有數(shù)據(jù),take獲取前N行,show以表格樣式打印

collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()
collectDF.toLocalIterator()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容