《Spark: The Definitive Guide 》第9章:數(shù)據(jù)源 中文學(xué)習(xí)筆記

Spark 可以從6大核心數(shù)據(jù)源中獲取數(shù)據(jù),和其他不同的社區(qū)數(shù)據(jù)源獲取數(shù)據(jù),該章節(jié)重點(diǎn)介紹6大核心數(shù)據(jù)源的獲取和社區(qū)數(shù)據(jù)源該如何配置.
6大核心數(shù)據(jù)源:

  • CSV
  • JSON
  • Parquet
  • ORC
  • JDBC/ODBC connections
  • Plain-text files

一些示例社區(qū)數(shù)據(jù)源:

  • Cassandra
  • HBase
  • MongoDB
  • AWS Redshift
  • XML
  • And many, many others

9.1 數(shù)據(jù)源的API結(jié)構(gòu)

讀取數(shù)據(jù)源的API結(jié)構(gòu)

DataFrameReader.format(...).option("key", "value").schema(...).load()

讀取時,所有的format,option,schema,read mode都是可選的,但是需要給定一個讀取數(shù)據(jù)的路徑,這是一個示例:

spark.read.format("csv")
  .option("mode", "FAILFAST")
  .option("inferSchema", "true")
  .option("path", "path/to/file(s)")
  .schema(someSchema)
  .load()

從外部讀取數(shù)據(jù)肯定會遇到錯誤格式的數(shù)據(jù),當(dāng)這種情況發(fā)生時Spark根據(jù)你選擇的不同模式會采取不同的處理方法:

Read Mode 描述
permissive(默認(rèn)) 遇到錯誤時將所有的字段設(shè)為null,并將錯誤的記錄放在一個叫 _corrupt_record的string字段中。
dropMalformed 去掉所有的錯誤字段。
failFast 遇到錯誤字段立即提示失敗。

寫出數(shù)據(jù)的API結(jié)構(gòu)

DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

PartitionBy, bucketBy, sortBy 只用在基于file的數(shù)據(jù)源,你可以用它們控制指定文件的布局。
示例:

dataframe.write.format("csv")
  .option("mode", "OVERWRITE")
  .option("dateFormat", "yyyy-MM-dd")
  .option("path", "path/to/file(s)")
  .save()
Save modes 描述
append 將輸出文件附加在已經(jīng)存在的文件中。
overwrite 覆蓋已有文件。
errorIfExists(默認(rèn)) 如果指定位置有同名文件則拋出異常。
ignore 如果文件存在,則不作任何操作。。

9.2 CSV 文件

讀取CSV文件

示例:

spark.read.format("csv")
  .option("header", "true")
  .option("mode", "FAILFAST")
  .option("inferSchema", "true")
  .load("some/path/to/file.csv")

option:包含schema(頭文件),模式是FAILFAST ,推斷Schema類型

我們可以自己確定schema以及schema格式來保證數(shù)據(jù)是我們期望的:

import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
  new StructField("DEST_COUNTRY_NAME", StringType, true),
  new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
  new StructField("count", LongType, false)
))
spark.read.format("csv")
  .option("header", "true")
  .option("mode", "FAILFAST")
  .schema(myManualSchema)
  .load("/data/flight-data/csv/2010-summary.csv")
  .show(5)

寫入CSV文件

寫入csv是讀取csv的子集,示例如下:

val csvFile = spark.read.format("csv")
  .option("header", "true").option("mode", "FAILFAST").schema(myManualSchema)
  .load("/data/flight-data/csv/2010-summary.csv")

9.3 JSON 文件

讀取JSON文件

spark.read.format("json").option("mode", "FAILFAST").schema(myManualSchema)
  .load("/data/flight-data/json/2010-summary.json").show(5)

寫入JSON文件

csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")

9.4 Parquet 文件

Parquet格式與Spark配合極好,實(shí)際上就是默認(rèn)的格式。

讀取Parquet文件

spark.read.format("parquet").load("/data/flight-data/parquet/2010-summary.parquet").show(5)

寫入Parquet文件

csvFile.write.format("parquet").mode("overwrite").save("/tmp/my-parquet-file.parquet")

9.5 ORC 文件

ORC格式與Hadoop配合的很好。

讀取ORC文件

spark.read.format("orc").load("/data/flight-data/orc/2010-summary.orc").show(5)

寫入ORC文件

csvFile.write.format("orc").mode("overwrite").save("/tmp/my-json-file.orc")

9.6 SQL 數(shù)據(jù)庫

從數(shù)據(jù)庫讀取數(shù)據(jù)

使用spark-shell時需要加載jdbc驅(qū)動(以postgresql驅(qū)動為例):
./bin/spark-shell
--driver-class-path postgresql-9.4.1207.jar
--jars postgresql-9.4.1207.jar

SQLite示例:

val driver =  "org.sqlite.JDBC"
val path = "/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:/${path}"
val tablename = "flight_info"

val dbDataFrame = spark.read.format("jdbc").option("url", url)
  .option("dbtable", tablename).option("driver",  driver).load()

PostgreSQL sample:

val pgDF = spark.read
  .format("jdbc")
  .option("driver", "org.postgresql.Driver")
  .option("url", "jdbc:postgresql://database_server")
  .option("dbtable", "schema.tablename")
  .option("user", "username").option("password","my-secret-password").load()

Query Pushdown

Spark也能夠指定一個查詢語句而不是表名,但需要將查詢語句括起來然后給一個別名:

val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info)
  AS flight_info"""
val dbDataFrame = spark.read.format("jdbc")
  .option("url", url).option("dbtable", pushdownQuery).option("driver",  driver)
  .load()

Spark也能和文件一樣并行讀取數(shù)據(jù)庫,只需要手動指定數(shù)量:

val dbDataFrame = spark.read.format("jdbc")
  .option("url", url).option("dbtable", tablename).option("driver", driver)
  .option("numPartitions", 10).load()

寫入數(shù)據(jù)庫

寫入數(shù)據(jù)庫也一樣簡單,下面用之前從CSV建好的數(shù)據(jù)用overwrite全表覆蓋模式寫入:

val newPath = "jdbc:sqlite://tmp/my-sqlite.db"
csvFile.write.mode("overwrite").jdbc(newPath, tablename, props)

可以通過下面語句查看是否寫入:

spark.read.jdbc(newPath, tablename, props).count()

9.7 Text 文件

讀取文本文件非常簡單:只需將類型指定為textFile:

讀取Text

spark.read.textFile("/data/flight-data/csv/2010-summary.csv")
.selectExpr("split(value, ',') as rows").show()

寫入Text

寫入Text時要確保只用一String列:

csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt")

9.8 高級 I/O 概念

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容