用PyQueen處理數(shù)據(jù)

PyQueen

PyQueen 是一個數(shù)據(jù)處理工具箱, 用于構(gòu)建ETL工作流.

文檔

安裝

pip install pyqueen

數(shù)據(jù)讀寫

pyqueen.DataSource: 讀寫數(shù)據(jù)庫, 文件和其他類型的數(shù)據(jù)源

參數(shù):

  • conn_type: 支持 mysql,oracle,mssql,clickhouse,pgsql,sqlite,jdbc,redis,excel,ftp,web
  • username: 可選, 默認:None
    • conn_type 為 (mysql, oracle, mssql, clickhouse, pgsql, redis, ftp) 時
  • password: 可選, 默認:None
    • conn_type 為 (mysql, oracle, mssql, clickhouse, pgsql, redis, ftp) 時
  • port: 可選, 默認:None
    • conn_type 為 (mysql, oracle, mssql, clickhouse, pgsql, redis, ftp) 時
  • db_name: 可選, 默認:None
    • conn_type 為 (mysql, oracle, mssql, clickhouse, pgsql, redis) 時
  • file_path: 可選, 默認:None
    • conn_type 為 (sqlite, excel) 時
  • jdbc_url: 可選, 默認:None
    • conn_type 為 (mysql, oracle, mssql, clickhouse, pgsql, sqlite) 時
    • 指定 jdbc_url 后優(yōu)先用url創(chuàng)建連接
  • cache_dir: 可選, 默認:None
    • conn_type 為 (web, )
  • keep_conn: 可選, 默認:False
    • conn_type 為 (mysql, oracle, mssql, clickhouse, pgsql, jdbc_url, redis) 時
    • 為 False 時, 每次操作數(shù)據(jù)庫都會銷毀連接, 無需關(guān)注連接池情況
    • 為 True 時, 使用后需手動 ds.close_conn()
  • charset: 可選, 默認:None. 根據(jù) conn_type 自動使用最常用的字符集
    • 支持指定字符集

函數(shù)

  • read_sql(sql[, data, engine]): 讀取sql
    • 如果 conn_type 是數(shù)據(jù)庫, 執(zhí)行sql結(jié)果返回為 pd.DataFrame 對象
    • 如果 conn_type 是 excel, 當(dāng)前excel文件每個sheet映射為一張表, 執(zhí)行sql結(jié)果返回為 pd.DataFrame 對象
    • 如果 conn_type 為空, 也可以 傳入 data 每個 df 映射為一張表, 執(zhí)行sql結(jié)果返回為 pd.DataFrame 對象
      • data: {'tb_name1': df, 'tb_name2': df2}
      • engine: 默認 sqlite, 可以用 duckdb
  • get_sql(sql) 功能和 read_sql 一樣, 兼容 1.0.x版本
  • exe_sql(sql[, auto_commit]): 執(zhí)行sql
    • 例如 delete/update/insert語句
    • auto_commit: 默認 False; 用于執(zhí)行 create database 相關(guān)操作
  • to_db(df, tb_name[, how, fast_load, chunksize]): 寫入數(shù)據(jù)庫
    • df: 待寫入 pd.DataFrame() 對象
    • tb_name: 目標(biāo)表名, 目標(biāo)庫沒有的話自動創(chuàng)建
    • how: 可選, 默認 append:追加
    • fast_load: 可選, 默認False; 僅支持MySQL 和 Clickhouse, 將 pd.DataFrame對象寫入臨時csv再快速導(dǎo)入數(shù)據(jù)庫 (如果數(shù)據(jù)包含特殊字符容易出錯, 慎用)
    • chunksize: 可選, 默認10000
  • read_excel([sheet_name, file_path]): 讀取excel表
    • sheet_name: 可選, 默認 None, 取所有sheet
    • file_path: 可選, 默認 None, 取 self.file_path, 可傳入 file_path 重新指定
  • to_excel(sheet_list[, file_path=None, fillna='', fmt=None, font='微軟雅黑', font_color='black', font_size=11, column_width=17]): 寫入excel表
    • sheet_list: [[df1, 'sheet_name1'], [df2, 'sheet_name2'],]
    • file_path: 可選, 默認 None, 取 self.file_path, 可傳入 file_path 重新指定
    • fillna: 可選, 默認 ''
    • fmt: 可選, 默認 None
    • font: 可選, 默認 '微軟雅黑'
    • font_color: 可選, 默認 'black'
    • font_size: 可選, 默認 11
    • column_width: 可選, 默認 17
  • get_v(key): 鍵值數(shù)據(jù)庫取值
  • set_v(key, value): 鍵值數(shù)據(jù)庫更新值
  • download_dir(local_dir, remote_dir)
    • local_dir: 本地目錄
    • remote_dir: 待下載遠程目錄
  • read_page(url)
    • 初始化 DataSource 時指定 cache_dir, 可以緩存頁面, 下次訪問時直接從緩存讀取
  • set_logger([logger])
    • logger: 可選, 默認 None, 可使用預(yù)置的 'file', 也可以傳入自定義函數(shù)
  • row_count(table_name): 統(tǒng)計表行數(shù)
  • get_sql_group(sql, params)
    • sql: sql模板
    • params: 參數(shù)列表
  • to_images(df[, file_path, col_width, font_size])
    • df: pd.DataFrame
    • file_path: 可選, 默認 None 寫入臨時路徑
    • col_width: 可選, 默認 None 自動確定
    • font_size: 可選, 默認 None 自動確定
  • delete_file(path)
  • get_tmp_file()

示例

from pyqueen import DataSource

# conn_type 支持: mysql,oracle,mssql,clickhouse,pgsql,sqlite,jdbc,redis,excel,ftp,web
# 為了兼容1.0版本, 目前 db_type 與 conn_type 都可用
ds = DataSource(conn_type='mysql', host='', username='', password='', port='', db_name='')
ds.set_db('new_db')
# 根據(jù)sql查詢, 返回 pd.DataFrame 對象
df = ds.read_sql(sql='select * from table')

# 返回查詢結(jié)果的第一個值
v = ds.get_value(sql='select count(1) from table')

# 將 pd.DataFrame對象 寫入數(shù)據(jù)庫
### fast_load: 默認False; 僅支持MySQL和Clickhouse, 將 pd.DataFrame對象 寫入臨時csv再快速導(dǎo)入數(shù)據(jù)庫 (如果數(shù)據(jù)包含特殊字符容易出錯, 慎用)
ds.to_db(df=df_to_write, tb_name='')

# 執(zhí)行sql
ds.exe_sql(sql='delete from table')
sql1 = 'delete from table'
sql2 = 'insert into table select * from table2'
sql3 = 'update table set a=1'
ds.exe_sql(sql=[sql1, sql2, sql3])


# 鍵值數(shù)據(jù)庫
ds = DataSource(conn_type='redis', host='', username='', password='', port='', db_name='')
ds.set_v(key='kk', value='vv')
ds.get_v(key='kk')

# pd.DataFrame 轉(zhuǎn)圖片
### 可以指定文件路徑: file_path. 默認生成臨時文件
### 可以用列表為每一列指定寬度 col_width
### 指定字體大小 font_size
path = ds.to_image(df, file_path=None, col_width=None, font_size=None)

# 下載網(wǎng)頁文本
### `cache_dir` 的作用是緩存網(wǎng)頁html到 `cache_dir`, 下次訪問直接從本地加載, 避免頻繁請求頁面
ds = DataSource(cache_dir='')
page = ds.read_page(url='https://www.github.com')

### 去除html字符, 只保留文本 (保留頁面所有文本, 如需精確篩選需要自行解析html)
from pyqueen import Utils
text = Utils.html2text(page)

常用模型

from pyqueen import Model

data = df['待預(yù)測列']  # 也可以是 list形式的數(shù)據(jù)
### forecast_step: 預(yù)測節(jié)點數(shù)
### p,d,q: 自定義arima模型參數(shù). 為空時自動使用最優(yōu)模型
forecast_result = Model.arima(data, forecast_step=10, p=None, d=None, q=None)

ETL輔助功能

# 使用SQL語法查詢 pd.DataFrame 數(shù)據(jù) (功能依賴duckdb包); 可以部分代替 pandas接口 
### 等價python
df_fact = pd.merge(df_fact, df_dim1, on='d1', how='inner')
df_fact = pd.merge(df_fact, df_dim1, on='d2', how='inner')
df_summary = df_fact.groupby(['d1_name', 'd2_name']).agg({'value': np.sum}).reset_index().rename('value':'sum_value')

### 可以用sql實現(xiàn)
from pyqueen import DataSource

ds = DataSource()
data = {'table1': df_fact, 'table2': df_dim1, 'table3': df_dim2}
sql = '''
  select b.d1_name,c.d2_name,sum(a.value) as sum_value
  from table1 a 
  inner join table2 b on a.d1 = b.d1
  inner join table3 c on a.d2 = c.d2
  group by b.d1_name,c.d2_name
'''
df_summary = ds.read_sql(sql=sql, data=data)

### 用sql查詢excel文件
# excel:
ds = DataSource(conn_type='excel', file_path='myexcel.xlsx')
sql = '''
select * from sheet1 union all select * from sheet2
'''
df_summary = ds.read_sql(sql=sql)

下載FTP文件

from pyqueen import DataSource

ds = DataSource(host='', username='', password='', port='', db_type='ftp')
ds.download_dir(local_dir='保存目錄', remote_dir='遠程目錄')

圖表

import pandas as pd
from pyqueen import Chart

df = pd.DataFrame()

# 折線圖/柱狀圖/散點圖/氣泡圖
# img_path 不為None時保存圖片, show為False時靜默運行不彈出圖片窗口
Chart.line(x=df['x_col'], y=df['y_col'], x_label='', y_label='', img_path='demo.png', show=True)
Chart.bar(x=df['x_col'], y=df['y_col'], x_label='', y_label='', img_path='demo.png', show=True)
Chart.scatter(x=df['x_col'], y=df['y_col'], x_label='', y_label='', img_path='demo.png', show=True)
Chart.bubble(x=df['x_col'], y=df['y_col'], v=df['value_col'], c=df['color'], x_label='', y_label='',img_path='demo.png', show=True)

寫入Excel文件

  • 將 pd.DataFrame對象 寫入Excel文件
  • file_path 文件路徑 (須以 .xlsx 結(jié)尾)
  • sheet_list 待寫入數(shù)據(jù), 二維列表, 每個 pd.DataFrame對象 對應(yīng)一個 sheet
  • fillna='' 空值填充
  • fmt=None 字段格式,可以按字段名指定
  • font='微軟雅黑' 字體
  • font_color='black' 字體顏色
  • font_size=11 字體大小
  • column_width=17 單元格寬度
from pyqueen import DataSource

ds = DataSource(file_path='file_path.xlsx')

sheet_list = [
    [df1, 'sheet_name1'],
    [df2, 'sheet_name2']
]
fmt = {
    'col1': '#,##0',
    'col2': '#,##0.0',
    'col3': '0%',
    'col4': '0.00%',
    'col5': 'YYYY-MM-DD'
}
ds.to_excel(sheet_list=sheet_list, fmt=fmt)
# or 
ds.to_excel(file_path='/new_path/file.xlsx', sheet_list=sheet_list, fmt=fmt)

# sql on Excel
## 將一個Excel作為一個數(shù)據(jù)庫, 每個sheet作為一張表, 通過sql查詢
sql = '''
select * from df1 union all select * from df2
'''
df_new = ds.read_sql(sql=sql)

時間處理工具

from pyqueen import TimeKit

# 按當(dāng)前時間
tk = TimeKit()
# 指定日期, 時間
tk = TimeKit(theday=20200101, thetime=120000)

# 常用屬性
tk.today  # 當(dāng)前日期或初始化指定日期
tk.now  # 當(dāng)前時間或初始化指定時間
tk.hour  # 當(dāng)前小時
tk.minute  # 當(dāng)前分鐘
tk.second  # 當(dāng)前秒
tk.nday_of_week  # 1-7對應(yīng)周一到周日
tk.week_start  # 本周一日期
tk.lw_start  # 上周開始日期
tk.lw_end  # 上周結(jié)束日期
tk.lw2_start  # 上上周開始日期
tk.lw2_end  # 上上周結(jié)束日期
tk.month_start  # 本月初
tk.lm_start  # 上月初
tk.lm_end  # 上月末
tk.lm2_start  # 上上月初
tk.lm2_end  # 上上月末

# 時間加減
# flag: 加減單位: years,months,days,hours,minutes,seconds 或者 年,月,日,時,分,秒
# value: 加減值
# thetime之前 value 寫負值
# thetime之后 value 寫正值

## 按當(dāng)前時間
## 如果需要長時期時間 (14位int) 指定 short=False. 默認為 True
new_day = tk.delta('days', -30)

## 按任意日期
new_day = tk.time_delta('20230101', 'days', -30)

# 獲取日期列表
day_list = tk.get_day_list(20200101, 20200201)
# 獲取自然周列表
week_list = tk.get_week_list(20200101, 20200201)
# 獲取自然月列表
month_list = tk.get_month_list(20200101, 20200901)
# 按天數(shù)拆分日期為列表
time_list = tk.date_div(20200101, 20200901, 10)
# 查詢?nèi)我馊掌谑切瞧趲?n = tk.get_nday_of_week(20200101)
# 數(shù)值型日期轉(zhuǎn)字符串
date_str = tk.int2str(20200101, sep='-')

ETL日志

  • 記錄所有 DataSource 類函數(shù)的調(diào)用過程和相應(yīng)參數(shù)
  • 如需啟用日志, 添加: ds.set_logger(logger)
  • 其中 logger 為日志處理函數(shù), 默認為: print
  • 自定義 logger 參考 example/etl_with_log.py
  • etl_log 所有 key
    • py_path: 調(diào)用腳本路徑
    • func_name: 調(diào)用函數(shù)名
    • start_time: 過程開始時間
    • end_time: 過程結(jié)束時間
    • duration: 過程耗時(秒)
    • message: (如有) 備注信息
    • file_path: (如有) 文件路徑
    • sql_text: (如有) sql
    • host: (如有) 服務(wù)器地址
    • db_type: (如有) 數(shù)據(jù)庫類型
    • port: (如有) 端口
    • db_name: (如有) 數(shù)據(jù)庫名
    • table_name: (如有) 表名

發(fā)送信息

  • 郵件
  • 釘釘
  • 企業(yè)微信
from pyqueen import Email

# 初始化
email = Email(username='', password='', host='', port='')

# 發(fā)送文本郵件
# subject: 郵件主題,content: 郵件內(nèi)容,to_user: 收件人,cc_user: 抄送人,bcc_user: 密抄人
# type: 文本或html格式,默認文本格式
email.send_text(subject='', content='', to_user=[], cc_user=None, bcc_user=None, type='plain')

# 發(fā)送附件郵件
# subject: 郵件主題,content: 郵件內(nèi)容,to_user: 收件人,cc_user: 抄送人,bcc_user: 密抄人
# type: 文本或html格式,默認文本格式,file_path_list: 附件路徑列表
email.send_file(subject='', content='', file_path_list=[], to_user=[], cc_user=None, bcc_user=None, type='plain')
from pyqueen import Wechat

# 初始化
wechat = Wechat(key='')

# content不為None時,發(fā)送文本
# mentioned_list: userid的列表,提醒群中的指定成員(@某個成員),@all表示提醒所有人
# mentioned_mobile_list: 手機號列表,提醒手機號對應(yīng)的群成員(@某個成員),@all表示提醒所有人
# file_path不為None時,發(fā)送文件
# img_path不為None時,發(fā)送圖片
wechat.send(content=None, mentioned_list=None, mentioned_mobile_lis=None, file_path=None, img_path=None)
from pyqueen import Dingtalk

# 初始化
dingtalk = Dingtalk(access_token='')

# content不為None時,發(fā)送文本
# mentioned_list: userid的列表,提醒群中的指定成員(@某個成員),@all表示提醒所有人
# mentioned_mobile_list: 手機號列表,提醒手機號對應(yīng)的群成員(@某個成員),@all表示提醒所有人
dingtalk.send(content=None, mentioned_list=None, mentioned_mobile_list=None)

小工具

from pyqueen import Utils

# 壓縮/解壓縮
Utils.zip(from_path='', to_path='')
Utils.unzip(from_path='', to_path='')
# 刪除文件
# 刪除文件夾/子文件夾/文件
Utils.delete_file(path='')
# 計算md5值
Utils.md5(text='')
# 列表按n個一組拆分
Utils.div_list(listTemp=[1, 2, 3], n=2)
# 用正則從sql里提取用到的表
### kw: (可選)指定匹配關(guān)鍵詞
### strip: (可選)指定需要清除的字符
Utils.sql2table(sql_text='', kw=None, strip=None)
# 多進程執(zhí)行
### func: 待執(zhí)行函數(shù)
### args_list: 每個子任務(wù)的參數(shù)
### max_process = 1: 最大進程數(shù), 默認為 1
### 以list返回每個子進程執(zhí)行結(jié)果, 和 args_list 順序一致
result = Utils.mult_run(func, args_list=[], max_process=1)
# html轉(zhuǎn)文本
### 去除html字符, 只保留文本
text = Utils.html2text(html)

命令行

用法: pyqueen command args1,args2,...
---
command: 
    #1  sql2table [file_path] 從sql解析用到的表(通過正則解析, 有誤差) (不帶參數(shù)時讀取剪切板)
    
    #2  detcode file_path: 檢測文件編碼
    
    #3  md5 基于剪切板文本生成md5
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容