Learning Spark [4] - Spark SQL

Spark SQL為Spark提供了以下幾個特性:

  • 提供高級結(jié)構(gòu)框架的API(見Learning Spark [3])
  • 允許讀取不同格式的數(shù)據(jù)(json, hive, tables, csv, parquet, avro,orc)
  • 允許使用BI工具(Power BI, Tableau)的JDBC/ODBC鏈接器查詢數(shù)據(jù),以及使用RDBMSs(MySQL, PostgreSQL)
  • 提供與儲存在Spark Application的數(shù)據(jù)交互的語法接口
  • 提供一個交互式Shell去執(zhí)行SQL查詢
  • 支持HiveQL


    Spark SQL連接器 & 數(shù)據(jù)源

例子:基礎(chǔ)查詢

# Basic Query Example
import os
from pyspark.sql import SparkSession
from pyspark import SparkFiles
os.chdir('D:/Users/fyrli/Desktop/R work/learning spark/chapter 4')
spark = (SparkSession
         .builder
         .appName('SparkSQLExampleApp')
         .getOrCreate())
# path to data set
departure_delays = 'departuredelays.csv'
# Read & Create a Temp View
df = spark.read.csv(departure_delays, header = True, inferSchema = True)
df.createOrReplaceTempView('us_delay_flights_tbl')
df.show(n = 5)
+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+

這個數(shù)據(jù)集有五列:

  • date為航班日期,String格式,可以轉(zhuǎn)換為日期格式,例如: 02190925對應(yīng)02-19 09:25am
  • delay為延誤時間,單位為分鐘,負數(shù)代表提前出發(fā)
  • distance為航班的飛行距離
  • origindestination代表起飛和降落機場
    查詢distance大于1000英里的航班:
spark.sql("""
    SELECT date, distance, origin, destination        
    FROM us_delay_flights_tbl
    WHERE distance >= 1000
    ORDER BY distance DESC""").show(10)
+-------+--------+------+-----------+
|   date|distance|origin|destination|
+-------+--------+------+-----------+
|3131530|    4330|   HNL|        JFK|
|3071625|    4330|   HNL|        JFK|
|3121530|    4330|   HNL|        JFK|
|3021625|    4330|   HNL|        JFK|
|3061625|    4330|   HNL|        JFK|
|3081530|    4330|   HNL|        JFK|
|3091530|    4330|   HNL|        JFK|
|3011625|    4330|   HNL|        JFK|
|3151530|    4330|   HNL|        JFK|
|3051625|    4330|   HNL|        JFK|
+-------+--------+------+-----------+
only showing top 10 rows

查詢所有再SFO和ORD延誤了兩小時以上的航班

spark.sql("""
    SELECT date, delay, origin, destination 
    FROM us_delay_flights_tbl 
    WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' 
    ORDER by delay DESC""").show(10)
+-------+-----+------+-----------+
|   date|delay|origin|destination|
+-------+-----+------+-----------+
|2190925| 1638|   SFO|        ORD|
|1031755|  396|   SFO|        ORD|
|1022330|  326|   SFO|        ORD|
|1051205|  320|   SFO|        ORD|
|1190925|  297|   SFO|        ORD|
|2171115|  296|   SFO|        ORD|
|1071040|  279|   SFO|        ORD|
|1051550|  274|   SFO|        ORD|
|3120730|  266|   SFO|        ORD|
|1261104|  258|   SFO|        ORD|
+-------+-----+------+-----------+
only showing top 10 rows

添加一列case when來判斷delay的類型

spark.sql("""
    SELECT delay, origin, destination,              
        CASE WHEN delay > 360 THEN 'Very Long Delays'                  
             WHEN delay > 120 AND delay < 360 THEN 'Long Delays'                  
             WHEN delay > 60 AND delay < 120 THEN 'Short Delays'                  
             WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'                  
             WHEN delay = 0 THEN 'No Delays'                  
             ELSE 'Early'               
        END AS Flight_Delays               
    FROM us_delay_flights_tbl               
    ORDER BY origin, delay DESC""").show(10)
+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|  333|   ABE|        ATL|  Long Delays|
|  305|   ABE|        ATL|  Long Delays|
|  275|   ABE|        ATL|  Long Delays|
|  257|   ABE|        ATL|  Long Delays|
|  247|   ABE|        ATL|  Long Delays|
|  247|   ABE|        DTW|  Long Delays|
|  219|   ABE|        ORD|  Long Delays|
|  211|   ABE|        ATL|  Long Delays|
|  197|   ABE|        DTW|  Long Delays|
|  192|   ABE|        ORD|  Long Delays|
+-----+------+-----------+-------------+
only showing top 10 rows

以下的Python代碼類似以上的第一條SQL查詢

(df.select('distance', 'origin', 'destination')
     .where(col('distance') > 1000)
     .orderBy(desc('distance'))).show(10)

建立SQL數(shù)據(jù)庫和表

默認(rèn)情況下,Spark會把表建立再default庫里。接下來使用美國飛機延誤數(shù)據(jù),建立一個managed和一個unmanaged table。

  • managed table,Spark會管理數(shù)據(jù)以及文件儲存
  • unmanaged table,Spark僅會管理數(shù)據(jù)
    首先我們建立一個庫,并使用它:
spark.sql('CREATE DATABASE learn_spark_db')
spark.sql('USE learn_spark_db')

managed table

spark.sql(""" 
    CREATE TABLE managed_us_delay_flights_tbl 
    (date STRING, dalay INT, distanct INT, origin STRING, destination STRING)""")

unmanaged table

spark.sql("""
    CREATE TABLE us_delay_flights_tbl
    (date STRING, delay INT,   distance INT, origin STRING, destination STRING)   
    USING csv OPTIONS (PATH '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')
""") 

Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容