Spark 可以從6大核心數(shù)據(jù)源中獲取數(shù)據(jù),和其他不同的社區(qū)數(shù)據(jù)源獲取數(shù)據(jù),該章節(jié)重點(diǎn)介紹6大核心數(shù)據(jù)源的獲取和社區(qū)數(shù)據(jù)源該如何配置.
6大核心數(shù)據(jù)源:
- CSV
- JSON
- Parquet
- ORC
- JDBC/ODBC connections
- Plain-text files
一些示例社區(qū)數(shù)據(jù)源:
- Cassandra
- HBase
- MongoDB
- AWS Redshift
- XML
- And many, many others
9.1 數(shù)據(jù)源的API結(jié)構(gòu)
讀取數(shù)據(jù)源的API結(jié)構(gòu)
DataFrameReader.format(...).option("key", "value").schema(...).load()
讀取時,所有的format,option,schema,read mode都是可選的,但是需要給定一個讀取數(shù)據(jù)的路徑,這是一個示例:
spark.read.format("csv")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()
從外部讀取數(shù)據(jù)肯定會遇到錯誤格式的數(shù)據(jù),當(dāng)這種情況發(fā)生時Spark根據(jù)你選擇的不同模式會采取不同的處理方法:
| Read Mode | 描述 |
|---|---|
| permissive(默認(rèn)) | 遇到錯誤時將所有的字段設(shè)為null,并將錯誤的記錄放在一個叫 _corrupt_record的string字段中。 |
| dropMalformed | 去掉所有的錯誤字段。 |
| failFast | 遇到錯誤字段立即提示失敗。 |
寫出數(shù)據(jù)的API結(jié)構(gòu)
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
PartitionBy, bucketBy, sortBy 只用在基于file的數(shù)據(jù)源,你可以用它們控制指定文件的布局。
示例:
dataframe.write.format("csv")
.option("mode", "OVERWRITE")
.option("dateFormat", "yyyy-MM-dd")
.option("path", "path/to/file(s)")
.save()
| Save modes | 描述 |
|---|---|
| append | 將輸出文件附加在已經(jīng)存在的文件中。 |
| overwrite | 覆蓋已有文件。 |
| errorIfExists(默認(rèn)) | 如果指定位置有同名文件則拋出異常。 |
| ignore | 如果文件存在,則不作任何操作。。 |
9.2 CSV 文件
讀取CSV文件
示例:
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.load("some/path/to/file.csv")
option:包含schema(頭文件),模式是FAILFAST ,推斷Schema類型
我們可以自己確定schema以及schema格式來保證數(shù)據(jù)是我們期望的:
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
new StructField("DEST_COUNTRY_NAME", StringType, true),
new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
new StructField("count", LongType, false)
))
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/data/flight-data/csv/2010-summary.csv")
.show(5)
寫入CSV文件
寫入csv是讀取csv的子集,示例如下:
val csvFile = spark.read.format("csv")
.option("header", "true").option("mode", "FAILFAST").schema(myManualSchema)
.load("/data/flight-data/csv/2010-summary.csv")
9.3 JSON 文件
讀取JSON文件
spark.read.format("json").option("mode", "FAILFAST").schema(myManualSchema)
.load("/data/flight-data/json/2010-summary.json").show(5)
寫入JSON文件
csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")
9.4 Parquet 文件
Parquet格式與Spark配合極好,實(shí)際上就是默認(rèn)的格式。
讀取Parquet文件
spark.read.format("parquet").load("/data/flight-data/parquet/2010-summary.parquet").show(5)
寫入Parquet文件
csvFile.write.format("parquet").mode("overwrite").save("/tmp/my-parquet-file.parquet")
9.5 ORC 文件
ORC格式與Hadoop配合的很好。
讀取ORC文件
spark.read.format("orc").load("/data/flight-data/orc/2010-summary.orc").show(5)
寫入ORC文件
csvFile.write.format("orc").mode("overwrite").save("/tmp/my-json-file.orc")
9.6 SQL 數(shù)據(jù)庫
從數(shù)據(jù)庫讀取數(shù)據(jù)
使用spark-shell時需要加載jdbc驅(qū)動(以postgresql驅(qū)動為例):
./bin/spark-shell
--driver-class-path postgresql-9.4.1207.jar
--jars postgresql-9.4.1207.jar
SQLite示例:
val driver = "org.sqlite.JDBC"
val path = "/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:/${path}"
val tablename = "flight_info"
val dbDataFrame = spark.read.format("jdbc").option("url", url)
.option("dbtable", tablename).option("driver", driver).load()
PostgreSQL sample:
val pgDF = spark.read
.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://database_server")
.option("dbtable", "schema.tablename")
.option("user", "username").option("password","my-secret-password").load()
Query Pushdown
Spark也能夠指定一個查詢語句而不是表名,但需要將查詢語句括起來然后給一個別名:
val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info)
AS flight_info"""
val dbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.load()
Spark也能和文件一樣并行讀取數(shù)據(jù)庫,只需要手動指定數(shù)量:
val dbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", tablename).option("driver", driver)
.option("numPartitions", 10).load()
寫入數(shù)據(jù)庫
寫入數(shù)據(jù)庫也一樣簡單,下面用之前從CSV建好的數(shù)據(jù)用overwrite全表覆蓋模式寫入:
val newPath = "jdbc:sqlite://tmp/my-sqlite.db"
csvFile.write.mode("overwrite").jdbc(newPath, tablename, props)
可以通過下面語句查看是否寫入:
spark.read.jdbc(newPath, tablename, props).count()
9.7 Text 文件
讀取文本文件非常簡單:只需將類型指定為textFile:
讀取Text
spark.read.textFile("/data/flight-data/csv/2010-summary.csv")
.selectExpr("split(value, ',') as rows").show()
寫入Text
寫入Text時要確保只用一String列:
csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt")