功能: 通過SQLContext提供的reader讀取器讀取外部數(shù)據(jù)源的數(shù)據(jù),并形成DataFrame
1.源碼的主要方法
format:給定數(shù)據(jù)源數(shù)據(jù)格式類型,eg: json、parquet
schema:給定讀入數(shù)據(jù)的數(shù)據(jù)schema,可以不給定,不給定的情況下,進(jìn)行數(shù)據(jù)類型推斷
option:添加參數(shù),這些參數(shù)在數(shù)據(jù)解析的時(shí)候可能會(huì)用到
load:
有參數(shù)的指從參數(shù)給定的path路徑中加載數(shù)據(jù),比如:JSON、Parquet...
無參數(shù)的指直接加載數(shù)據(jù)(根據(jù)option相關(guān)的參數(shù))
jdbc:讀取關(guān)系型數(shù)據(jù)庫(kù)的數(shù)據(jù)
json:讀取json格式數(shù)據(jù)
parquet:讀取parquet格式數(shù)據(jù)
orc: 讀取orc格式數(shù)據(jù)
table:直接讀取關(guān)聯(lián)的Hive數(shù)據(jù)庫(kù)中的對(duì)應(yīng)表數(shù)據(jù)
val df=sqlContext.read.format("json").load("spark/sql/people.json")
功能:將DataFrame的數(shù)據(jù)寫出到外部數(shù)據(jù)源
1.源碼主要方法
mode: 給定數(shù)據(jù)輸出的模式
`overwrite`: overwrite the existing data.
`append`: append the data.?
`ignore`: ignore the operation (i.e. no-op).
`error`: default option, throw an exception at runtime.
format:給定輸出文件所屬類型, eg: parquet、json
option: 給定參數(shù)
partitionBy:給定分區(qū)字段(要求輸出的文件類型支持?jǐn)?shù)據(jù)分區(qū))
save: 觸發(fā)數(shù)據(jù)保存操作 --> 當(dāng)該API被調(diào)用后,數(shù)據(jù)已經(jīng)寫出到具體的數(shù)據(jù)保存位置了
jdbc:將數(shù)據(jù)輸出到關(guān)系型數(shù)據(jù)庫(kù)
當(dāng)mode為append的時(shí)候,數(shù)據(jù)追加方式是:
先將表中的所有索引刪除
再追加數(shù)據(jù)
沒法實(shí)現(xiàn),數(shù)據(jù)不存在就添加,存在就更新的需求
讀取Hive表數(shù)據(jù)形成DataFrame
val df = sqlContext.read.table("common.emp")
結(jié)果保存json格式
df.select("empno","ename").write.mode("ignore").format("json").save("/beifeng/result/json")
df.select("empno","ename").write.mode("error").format("json").save("/beifeng/result/json")
df.select("empno","ename", "sal").write.mode("overwrite").format("json").save("/beifeng/result/json")
df.select("empno","ename").write.mode("append").format("json").save("/beifeng/result/json")\
上面雖然在追加的時(shí)候加上了sal,但是解析沒有問題
sqlContext.read.format("json").load("/beifeng/result/json").show()
結(jié)果保存parquet格式
df.select("empno", "ename", "deptno").write.format("parquet").save("/beifeng/result/parquet01")
df.select("empno", "ename","sal", "deptno").write.mode("append").format("parquet").save("/beifeng/result/parquet01") ## 加上sal導(dǎo)致解析失敗,讀取數(shù)據(jù)的時(shí)候
sqlContext.read.format("parquet").load("/beifeng/result/parquet01").show(100)
sqlContext.read.format("parquet").load("/beifeng/result/parquet01/part*").show(100)
partitionBy按照給定的字段進(jìn)行分區(qū)
df.select("empno", "ename", "deptno").write.format("parquet").partitionBy("deptno").save("/beifeng/result/parquet02")
sqlContext.read.format("parquet").load("/beifeng/result/parquet02").show(100)