pyspark--MySQL數(shù)據(jù)庫讀寫

pyspark是Spark對Python的api接口,可以在Python環(huán)境中通過調(diào)用pyspark模塊來操作spark,完成大數(shù)據(jù)框架下的數(shù)據(jù)分析與挖掘。其中,數(shù)據(jù)的讀寫是基礎(chǔ)操作,pyspark的子模塊pyspark.sql 可以完成大部分類型的數(shù)據(jù)讀寫。文本介紹在pyspark中讀寫Mysql數(shù)據(jù)庫。

1 軟件版本

在Python中使用Spark,需要安裝配置Spark,這里跳過配置的過程,給出運行環(huán)境和相關(guān)程序版本信息。

  • win10 64bit
  • java 13.0.1
  • spark 3.0
  • python 3.8
  • pyspark 3.0
  • pycharm 2019.3.4

2 環(huán)境配置

pyspark連接Mysql是通過java實現(xiàn)的,所以需要下載連接Mysql的jar包。

下載地址:https://dev.mysql.com/downloads/

1.png

選擇下載Connector/J,然后選擇操作系統(tǒng)為Platform Independent,下載壓縮包到本地。

2.png

然后解壓文件,將其中的jar包mysql-connector-java-8.0.19.jar放入spark的安裝目錄下,例如D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\jars。

3.png

環(huán)境配置完成!

3 讀取Mysql

腳本如下:

from pyspark.sql import SQLContext, SparkSession

if __name__ == '__main__':
    # spark 初始化
    spark = SparkSession. \
        Builder(). \
        appName('sql'). \
        master('local'). \
        getOrCreate()
    # mysql 配置(需要修改)
    prop = {'user': 'xxx', 
            'password': 'xxx', 
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(需要修改)
    url = 'jdbc:mysql://host:port/database'
    # 讀取表
    data = spark.read.jdbc(url=url, table='tb_newCity', properties=prop)
    # 打印data數(shù)據(jù)類型
    print(type(data))
    # 展示數(shù)據(jù)
    data.show()
    # 關(guān)閉spark會話
    spark.stop()

注意點:

  1. prop參數(shù)需要根據(jù)實際情況修改,文中用戶名和密碼用xxx代替了,driver參數(shù)也可以不需要;
  2. url參數(shù)需要根據(jù)實際情況修改,格式為jdbc:mysql://主機:端口/數(shù)據(jù)庫;
  3. 通過調(diào)用方法read.jdbc進行讀取,返回的數(shù)據(jù)類型為spark DataFrame;

運行腳本,輸出如下:

4.png

4 寫入Mysql

腳本如下:

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

if __name__ == '__main__':
    # spark 初始化
    sc = SparkContext(master='local', appName='sql')
    spark = SQLContext(sc)
    # mysql 配置(需要修改)
    prop = {'user': 'xxx',
            'password': 'xxx',
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(需要修改)
    url = 'jdbc:mysql://host:port/database'

    # 創(chuàng)建spark DataFrame
    # 方式1:list轉(zhuǎn)spark DataFrame
    l = [(1, 12), (2, 22)]
    # 創(chuàng)建并指定列名
    list_df = spark.createDataFrame(l, schema=['id', 'value']) 
    
    # 方式2:rdd轉(zhuǎn)spark DataFrame
    rdd = sc.parallelize(l)  # rdd
    col_names = Row('id', 'value')  # 列名
    tmp = rdd.map(lambda x: col_names(*x))  # 設(shè)置列名
    rdd_df = spark.createDataFrame(tmp)  
    
    # 方式3:pandas dataFrame 轉(zhuǎn)spark DataFrame
    df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
    pd_df = spark.createDataFrame(df)

    # 寫入數(shù)據(jù)庫
    pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
    # 關(guān)閉spark會話
    sc.stop()

注意點:

  1. propurl參數(shù)同樣需要根據(jù)實際情況修改;
  2. 寫入數(shù)據(jù)庫要求的對象類型是spark DataFrame,提供了三種常見數(shù)據(jù)類型轉(zhuǎn)spark DataFrame的方法;
  3. 通過調(diào)用write.jdbc方法進行寫入,其中的model參數(shù)控制寫入數(shù)據(jù)的行為。
model 參數(shù)解釋
error 默認值,原表存在則報錯
ignore 原表存在,不報錯且不寫入數(shù)據(jù)
append 新數(shù)據(jù)在原表行末追加
overwrite 覆蓋原表

當數(shù)據(jù)庫無寫入的表時,這四種模式都會根據(jù)設(shè)定的表名稱自動創(chuàng)建表,無需在Mysql里先建表。

5 常見報錯

Access denied for user ...

5.png

原因:mysql配置參數(shù)出錯

解決辦法:檢查user,password拼寫,檢查賬號密碼是否正確,用其他工具測試mysql是否能正常連接,做對比檢查。

No suitable driver

6.png

原因:沒有配置運行環(huán)境

解決辦法:下載jar包進行配置,具體過程參考本文的2 環(huán)境配置。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容