Spark SQL為Spark提供了以下幾個特性:
- 提供高級結(jié)構(gòu)框架的API(見Learning Spark [3])
- 允許讀取不同格式的數(shù)據(jù)(json, hive, tables, csv, parquet, avro,orc)
- 允許使用BI工具(Power BI, Tableau)的JDBC/ODBC鏈接器查詢數(shù)據(jù),以及使用RDBMSs(MySQL, PostgreSQL)
- 提供與儲存在Spark Application的數(shù)據(jù)交互的語法接口
- 提供一個交互式Shell去執(zhí)行SQL查詢
-
支持HiveQL
Spark SQL連接器 & 數(shù)據(jù)源
例子:基礎(chǔ)查詢
# Basic Query Example
import os
from pyspark.sql import SparkSession
from pyspark import SparkFiles
os.chdir('D:/Users/fyrli/Desktop/R work/learning spark/chapter 4')
spark = (SparkSession
.builder
.appName('SparkSQLExampleApp')
.getOrCreate())
# path to data set
departure_delays = 'departuredelays.csv'
# Read & Create a Temp View
df = spark.read.csv(departure_delays, header = True, inferSchema = True)
df.createOrReplaceTempView('us_delay_flights_tbl')
df.show(n = 5)
+-------+-----+--------+------+-----------+
| date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245| 6| 602| ABE| ATL|
|1020600| -8| 369| ABE| DTW|
|1021245| -2| 602| ABE| ATL|
|1020605| -4| 602| ABE| ATL|
|1031245| -4| 602| ABE| ATL|
+-------+-----+--------+------+-----------+
這個數(shù)據(jù)集有五列:
- date為航班日期,String格式,可以轉(zhuǎn)換為日期格式,例如: 02190925對應(yīng)02-19 09:25am
- delay為延誤時間,單位為分鐘,負數(shù)代表提前出發(fā)
- distance為航班的飛行距離
- origin和destination代表起飛和降落機場
查詢distance大于1000英里的航班:
spark.sql("""
SELECT date, distance, origin, destination
FROM us_delay_flights_tbl
WHERE distance >= 1000
ORDER BY distance DESC""").show(10)
+-------+--------+------+-----------+
| date|distance|origin|destination|
+-------+--------+------+-----------+
|3131530| 4330| HNL| JFK|
|3071625| 4330| HNL| JFK|
|3121530| 4330| HNL| JFK|
|3021625| 4330| HNL| JFK|
|3061625| 4330| HNL| JFK|
|3081530| 4330| HNL| JFK|
|3091530| 4330| HNL| JFK|
|3011625| 4330| HNL| JFK|
|3151530| 4330| HNL| JFK|
|3051625| 4330| HNL| JFK|
+-------+--------+------+-----------+
only showing top 10 rows
查詢所有再SFO和ORD延誤了兩小時以上的航班
spark.sql("""
SELECT date, delay, origin, destination
FROM us_delay_flights_tbl
WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD'
ORDER by delay DESC""").show(10)
+-------+-----+------+-----------+
| date|delay|origin|destination|
+-------+-----+------+-----------+
|2190925| 1638| SFO| ORD|
|1031755| 396| SFO| ORD|
|1022330| 326| SFO| ORD|
|1051205| 320| SFO| ORD|
|1190925| 297| SFO| ORD|
|2171115| 296| SFO| ORD|
|1071040| 279| SFO| ORD|
|1051550| 274| SFO| ORD|
|3120730| 266| SFO| ORD|
|1261104| 258| SFO| ORD|
+-------+-----+------+-----------+
only showing top 10 rows
添加一列case when來判斷delay的類型
spark.sql("""
SELECT delay, origin, destination,
CASE WHEN delay > 360 THEN 'Very Long Delays'
WHEN delay > 120 AND delay < 360 THEN 'Long Delays'
WHEN delay > 60 AND delay < 120 THEN 'Short Delays'
WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
WHEN delay = 0 THEN 'No Delays'
ELSE 'Early'
END AS Flight_Delays
FROM us_delay_flights_tbl
ORDER BY origin, delay DESC""").show(10)
+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
| 333| ABE| ATL| Long Delays|
| 305| ABE| ATL| Long Delays|
| 275| ABE| ATL| Long Delays|
| 257| ABE| ATL| Long Delays|
| 247| ABE| ATL| Long Delays|
| 247| ABE| DTW| Long Delays|
| 219| ABE| ORD| Long Delays|
| 211| ABE| ATL| Long Delays|
| 197| ABE| DTW| Long Delays|
| 192| ABE| ORD| Long Delays|
+-----+------+-----------+-------------+
only showing top 10 rows
以下的Python代碼類似以上的第一條SQL查詢
(df.select('distance', 'origin', 'destination')
.where(col('distance') > 1000)
.orderBy(desc('distance'))).show(10)
建立SQL數(shù)據(jù)庫和表
默認(rèn)情況下,Spark會把表建立再default庫里。接下來使用美國飛機延誤數(shù)據(jù),建立一個managed和一個unmanaged table。
- managed table,Spark會管理數(shù)據(jù)以及文件儲存
- unmanaged table,Spark僅會管理數(shù)據(jù)
首先我們建立一個庫,并使用它:
spark.sql('CREATE DATABASE learn_spark_db')
spark.sql('USE learn_spark_db')
managed table
spark.sql("""
CREATE TABLE managed_us_delay_flights_tbl
(date STRING, dalay INT, distanct INT, origin STRING, destination STRING)""")
unmanaged table
spark.sql("""
CREATE TABLE us_delay_flights_tbl
(date STRING, delay INT, distance INT, origin STRING, destination STRING)
USING csv OPTIONS (PATH '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')
""")
Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee
