pyspark save_mongo

功能: 通過SQLContext提供的reader讀取器讀取外部數(shù)據(jù)源的數(shù)據(jù),并形成DataFrame

1.源碼的主要方法

format:給定數(shù)據(jù)源數(shù)據(jù)格式類型,eg: json、parquet

schema:給定讀入數(shù)據(jù)的數(shù)據(jù)schema,可以不給定,不給定的情況下,進(jìn)行數(shù)據(jù)類型推斷

option:添加參數(shù),這些參數(shù)在數(shù)據(jù)解析的時(shí)候可能會(huì)用到

load:

有參數(shù)的指從參數(shù)給定的path路徑中加載數(shù)據(jù),比如:JSON、Parquet...

無參數(shù)的指直接加載數(shù)據(jù)(根據(jù)option相關(guān)的參數(shù))

jdbc:讀取關(guān)系型數(shù)據(jù)庫(kù)的數(shù)據(jù)

json:讀取json格式數(shù)據(jù)

parquet:讀取parquet格式數(shù)據(jù)

orc: 讀取orc格式數(shù)據(jù)

table:直接讀取關(guān)聯(lián)的Hive數(shù)據(jù)庫(kù)中的對(duì)應(yīng)表數(shù)據(jù)

val df=sqlContext.read.format("json").load("spark/sql/people.json")

功能:將DataFrame的數(shù)據(jù)寫出到外部數(shù)據(jù)源


1.源碼主要方法

mode: 給定數(shù)據(jù)輸出的模式

  `overwrite`: overwrite the existing data.

  `append`: append the data.?

  `ignore`: ignore the operation (i.e. no-op).

  `error`: default option, throw an exception at runtime.

format:給定輸出文件所屬類型, eg: parquet、json

option: 給定參數(shù)

partitionBy:給定分區(qū)字段(要求輸出的文件類型支持?jǐn)?shù)據(jù)分區(qū))

save: 觸發(fā)數(shù)據(jù)保存操作 --> 當(dāng)該API被調(diào)用后,數(shù)據(jù)已經(jīng)寫出到具體的數(shù)據(jù)保存位置了

jdbc:將數(shù)據(jù)輸出到關(guān)系型數(shù)據(jù)庫(kù)

  當(dāng)mode為append的時(shí)候,數(shù)據(jù)追加方式是:

    先將表中的所有索引刪除

    再追加數(shù)據(jù)

  沒法實(shí)現(xiàn),數(shù)據(jù)不存在就添加,存在就更新的需求

讀取Hive表數(shù)據(jù)形成DataFrame

val df = sqlContext.read.table("common.emp")

結(jié)果保存json格式

df.select("empno","ename").write.mode("ignore").format("json").save("/beifeng/result/json")

df.select("empno","ename").write.mode("error").format("json").save("/beifeng/result/json")

df.select("empno","ename", "sal").write.mode("overwrite").format("json").save("/beifeng/result/json")

df.select("empno","ename").write.mode("append").format("json").save("/beifeng/result/json")\

上面雖然在追加的時(shí)候加上了sal,但是解析沒有問題

sqlContext.read.format("json").load("/beifeng/result/json").show()

結(jié)果保存parquet格式

df.select("empno", "ename", "deptno").write.format("parquet").save("/beifeng/result/parquet01")

df.select("empno", "ename","sal", "deptno").write.mode("append").format("parquet").save("/beifeng/result/parquet01") ## 加上sal導(dǎo)致解析失敗,讀取數(shù)據(jù)的時(shí)候

sqlContext.read.format("parquet").load("/beifeng/result/parquet01").show(100)

sqlContext.read.format("parquet").load("/beifeng/result/parquet01/part*").show(100)

partitionBy按照給定的字段進(jìn)行分區(qū)

df.select("empno", "ename", "deptno").write.format("parquet").partitionBy("deptno").save("/beifeng/result/parquet02")

sqlContext.read.format("parquet").load("/beifeng/result/parquet02").show(100)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容