Pyspark DataFrame讀寫

1. 連接本地spark

import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('my_first_app_name') \
    .getOrCreate()

2. 創(chuàng)建DataFrame

2.1 從變量創(chuàng)建

stringrdd = spark.sparkContext.parallelize([
    (123, "Katie", 19, "brown"),
    (234, "Michael", 22, "green"),
    (345, "Simone", 23, "blue")
])
# 指定模式, StructField(name,dataType,nullable)
# 其中:
#   name: 該字段的名字,
#   dataType:該字段的數(shù)據(jù)類型,
#   nullable: 指示該字段的值是否為空
import pyspark.sql.types as typ
labels =  [('id',typ.LongType()),
          ('name',typ.StringType()),
          ('age',typ.LongType()),
          ('eyecolor',typ.StringType())]
schema = typ.StructType([typ.StructField(i[0],i[1],False)for i in labels])
# 對RDD應(yīng)用該模式并且創(chuàng)建DataFrame
data = spark.createDataFrame(stringrdd,schema=schema)
# 利用DataFrame創(chuàng)建一個臨時視圖
data.registerTempTable("swimmers")
data.show()

2.2 使用自動類型推斷的方式創(chuàng)建dataframe

data = [(123, "Katie", 19, "brown"),
        (234, "Michael", 22, "green"),
        (345, "Simone", 23, "blue")]
df = spark.createDataFrame(data, schema=['id', 'name', 'age', 'eyccolor'])
df.show()
df.count()

2.3 讀取json文件

file = r"D:\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\people.json"
df = spark.read.json(file)
df.show()

2.4 讀取csv文件

file = r'C:\Users\Administrator\Desktop\kaggle泰坦尼克號獲救率預(yù)測數(shù)據(jù)集\train.csv'
df = spark.read.csv(file,header=True,inferSchema=True)
df.show(5)

2.5 讀取mysql

# 此時需要將mysql-connector-java-8.0.13.jar驅(qū)動放到spark-2.2.0-bin-hadoop2.7\jars下面
# 單機環(huán)境可行,集群環(huán)境不行
# 重新執(zhí)行
sql = '(select * from sc where C =01) t'
url = 'jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT'
table = sql
properties = {"user":"root","password":"xdk520"}
df = spark.read.jdbc(url,table,properties=properties)
df.show()

2.6 從pandas.dataframe創(chuàng)建

data = pd.DataFrame([(123, "Katie", 19, "brown"),
        (234, "Michael", 22, "green"),
        (345, "Simone", 23, "blue")],columns= ['id', 'name', 'age', 'eyccolor'])
df = spark.createDataFrame(data)
df.show()

2.7 從列式存儲的parquet讀取

file = r"D:\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\users.parquet"
data = spark.read.parquet(file)
data.show()

2.8 讀取hive

# 如果已經(jīng)配置spark連接hive的參數(shù),可以直接讀取hive數(shù)據(jù)
spark = SparkSession \
        .builder \
        .enableHiveSupport() \      
        .master("172.31.100.170:7077") \
        .appName("my_first_app_name") \
        .getOrCreate()

df=spark.sql("select * from hive_tb_name")
df.show()

3. 保存文件

3.1 寫進CSV

file1=r"D:\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\test.csv"
df.write.csv(path=file1, header=True, sep=",", mode='overwrite')
#保留第一行,以逗號作為分隔符,#overwrite 清空后再寫入

3.2 保存到parquet

file2=r"D:\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\test.parquet"
df.write.parquet(path=file2,mode='overwrite')

3.3 寫入sql

# 會自動對齊字段,也就是說,df 的列不一定要全部包含MySQL的表的全部列才行

# overwrite 清空表再導(dǎo)入
url = 'jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&autoReconnect=true&useSSL=false&serverTimezone=GMT'
table = 'eye'
properties = {"user":"root","password":"xdk520"}
df.write.jdbc(url,table,mode='overwrite',properties=properties)

# mode= 'append '追加方式
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容