需要實(shí)現(xiàn)的功能
- 新建數(shù)據(jù)庫(kù)之前,檢查是否已存在
- 新建數(shù)據(jù)表之前,檢查是否已存在
- 測(cè)試用的數(shù)據(jù)庫(kù)及數(shù)據(jù)表,在測(cè)試完后刪除
以下為具體實(shí)現(xiàn)
判斷數(shù)據(jù)庫(kù)存在并創(chuàng)建
使用SqlAlchemy連接MySQL數(shù)據(jù)庫(kù)的步驟:
1、創(chuàng)建連接用的url
2、判斷該url是否為一個(gè)已存在的數(shù)據(jù)庫(kù)。若不存在,則應(yīng)首先創(chuàng)建該數(shù)據(jù)庫(kù)。
3、新建連接engine
4、連接數(shù)據(jù)庫(kù)
創(chuàng)建連接用的 url
db_url = ‘mysql+pymysql://username:password@localhost:port/db_name
檢查數(shù)據(jù)庫(kù)是否存在
需要從`sqlalchemy_utils`庫(kù)中導(dǎo)入`database_exists`和`create_database`
from sqlalchemy_utils import database_exists, create_database`
# 檢查數(shù)據(jù)庫(kù)是否存在,并創(chuàng)建
if not database_exists(db_url):
create_database(db_url, encoding='utf8')
# 新建連接engine
engine = create_engine(db_url)
# 連接數(shù)據(jù)庫(kù)
conn = engine.connect()
判斷數(shù)據(jù)表存在并創(chuàng)建
通過engine的table_names()方法返回當(dāng)前數(shù)據(jù)庫(kù)里所有表名稱,從而判斷某個(gè)數(shù)據(jù)表table_name是否存在
由于是通過pandas來存儲(chǔ)和讀取數(shù)據(jù)表,因此即便表不存在,可以直接用pandas.to_sql保存,不必提前創(chuàng)建表結(jié)構(gòu),省去了寫sql語(yǔ)句的麻煩。
if table_name not in engine.table_names():
df.to_sql(table_name, con=conn, index=False) # 如果表不存在,則直接保存
若表存在,則最好先檢查待存DataFrame的列標(biāo)簽是否與當(dāng)前表的表頭一致。此處需要用到sqlalchemy.inspect,具體為:
from sqlalchemy import inspect
inspector = inspect(engine)
# 返回某個(gè)數(shù)據(jù)表里的列名
columns_dict_list = inspector(table_name) # 返回的是一個(gè)字典的列表,列表中每個(gè)元素的’name’鍵對(duì)應(yīng)該數(shù)據(jù)表的一個(gè)列名
columns = [item[‘name’] for item in columns_dict_list]
比較兩組數(shù)據(jù)表的列名,無誤后即可繼續(xù)使用pandas.to_sql向已有的數(shù)據(jù)表添加數(shù)據(jù),其中參數(shù)if_exists的值為append:
if df_columns == columns: # df_columns是待存儲(chǔ)DataFrame的列名列表
df.to_sql(table_name, con=conn, if_exists=‘a(chǎn)ppend’)
刪除測(cè)試時(shí)添加的數(shù)據(jù)庫(kù)表
筆者目前沒有在sqlalchemy中找到刪除數(shù)據(jù)庫(kù)表的方法,所以只能使用sqlalchemy_utils庫(kù)的drop_database來刪除數(shù)據(jù)庫(kù),使用pymysql庫(kù)及sql語(yǔ)句來刪除數(shù)據(jù)表。
刪除數(shù)據(jù)表
import pymysql
# 使用pymysql建立與數(shù)據(jù)庫(kù)的連接:
pymysql_conn = pymysql.connect(‘localhost, username, password, db_name, charset=‘utf8’)
# 刪除數(shù)據(jù)表的sql語(yǔ)句:
drop_table_sql = ‘drop table %s’ %table_name
cursor = pymysql_conn.cursor()
cursor.execute(drop_table_sql)
pymysql_conn.close()
刪除數(shù)據(jù)庫(kù)
from sqlalchemy_utils import drop_database
drop_database(db_url) # db_url是通過sqlalchemy連接數(shù)據(jù)庫(kù)所使用的url