目標(biāo)
讀取 Excel 數(shù)據(jù)寫入 Postgresql 表,參見另一個(gè)版本
思路
- 將 Excel 數(shù)據(jù)載入 Pandas.DataFrames,對(duì)于 etl 來說 pandas 也是一個(gè)不錯(cuò)的工具
- 拼裝 SQL 語句模板,有點(diǎn)像樂高
- 裝填并執(zhí)行 SQL 語句
Part I 載入
讀取 Excel 文件有多種方法,xlwings 并不適合數(shù)據(jù) ETL,Pandas 使用 xlrd 庫(kù)支持 Excel 文件讀取,參見Pandas 練習(xí)冊(cè) - 如何讀取 EXCEL 文件
第一步,初始化配置
# Excel 文件路徑
filepath = r'D:\O\Smalldata\Data\起租.xlsx'
# 指定要讀取的列
# 也可以不指定,讀入整表,并不影響最后的結(jié)果
inventory_columns_mark = [1,2,3,4,5,9,10,11,12,13,14,15,16,17,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,51,52,53,54,55]
第二步,讀取文件
# Excel Sheet 名稱:新建塔批量起租表
# 表頭:第二行
df = pd.read_excel( filepath, "新建塔批量起租表", heads=1, skiprows=1, usecols=inventory_columns_mark, true_values=['是'] )
數(shù)據(jù)示例
df.iloc[0]
區(qū)縣 風(fēng)縣
業(yè)務(wù)確認(rèn)單編號(hào) CTC-xxx-xxxx-0000-000039
站址編碼 61032000000000
需求確認(rèn)單編號(hào) 1010000000000000
站址名稱 風(fēng)_縣大隊(duì)
鐵塔種類 普通地面塔
機(jī)房配置 一體化機(jī)柜
共享信息 新建共享
掛高 28
天線數(shù)量 3
系統(tǒng)數(shù)量 1
RRU是否上塔 True
當(dāng)前鐵塔共享客戶總數(shù) 1
當(dāng)前機(jī)房及配套存量新增共享客戶總數(shù) 1
其他費(fèi)用(元/年)(不含稅) 0
其它費(fèi)用說明 NaN
鐵塔基準(zhǔn)價(jià)格(元/年)(不含稅) 1xxx.3
機(jī)房及配套基準(zhǔn)價(jià)格(元/年)(不含稅) xxx5.25
維護(hù)費(fèi)(元/年)(不含稅) xxx.51
產(chǎn)品單元數(shù) 1
電力引入費(fèi)(元/年)(不含稅) xx8.8
場(chǎng)地費(fèi)(元/年)(不含稅) xx0
維護(hù)費(fèi)折扣 1
場(chǎng)地費(fèi)折扣 1
電力引入費(fèi)折扣 1
鐵塔共享折扣 1
機(jī)房及配套享折扣 1
服務(wù)起始日期 2015-09-01
服務(wù)結(jié)束日期 2020-08-31
產(chǎn)品服務(wù)費(fèi)合計(jì)(元/年)不含稅 xxx68.8
產(chǎn)品服務(wù)費(fèi)合計(jì)(元/年)含稅 xx57
維護(hù)費(fèi)用原始錄入值 05.66
場(chǎng)地費(fèi)原始錄入值 00
電力引入原始錄入值 560
其他費(fèi)用原始錄入值 0
created_by admin
effective_date 2015-09-01
row_key 1
current_flag True
Name: 0, dtype: object
Part II 拼裝
第一步,初始化配置
# 映射 Excel 與 Postgresql 列標(biāo)簽
map_header = {
'區(qū)縣' : 'district_and_county',
'業(yè)務(wù)確認(rèn)單編號(hào)' : 'business_confirmation_number',
'站址編碼' : 'base_address_id',
'需求確認(rèn)單編號(hào)' : 'demand_confirmation_number',
'站址名稱' : 'base_name',
'鐵塔種類' : 'tower_type',
'機(jī)房配置' : 'room_type',
'共享信息' : 'share_info',
'掛高' : 'antenna_height',
'天線數(shù)量' : 'antenna_number',
'系統(tǒng)數(shù)量' : 'system_number',
'RRU是否上塔' : 'is_rru_on_tower',
'當(dāng)前鐵塔共享客戶總數(shù)' : 'tower_current__shared_customer',
'當(dāng)前機(jī)房及配套存量新增共享客戶總數(shù)' : 'room_and_support_current_shared_customer',
'其他費(fèi)用(元/年)(不含稅)' : 'other_fee',
'其它費(fèi)用說明' : 'other_fee_explanation',
'鐵塔基準(zhǔn)價(jià)格(元/年)(不含稅)' : 'tower_base_price',
'機(jī)房及配套基準(zhǔn)價(jià)格(元/年)(不含稅)' : 'room_and_supporting_base_price',
'維護(hù)費(fèi)(元/年)(不含稅)' : 'maintenance_fee',
'產(chǎn)品單元數(shù)' : 'product_unit',
'電力引入費(fèi)(元/年)(不含稅)' : 'power_incoming_fee',
'場(chǎng)地費(fèi)(元/年)(不含稅)' : 'site_fee',
'維護(hù)費(fèi)折扣' : 'maintenance_fee_discount',
'場(chǎng)地費(fèi)折扣' : 'site_fee_discount',
'電力引入費(fèi)折扣' : 'power_incoming_fee_discount',
'鐵塔共享折扣' : 'tower_share_discount',
'機(jī)房及配套享折扣' : 'room_and_support_share_discount',
'服務(wù)起始日期' : 'service_start_date',
'服務(wù)結(jié)束日期' : 'service_end_date',
'產(chǎn)品服務(wù)費(fèi)合計(jì)(元/年)不含稅' : 'total_product_service_fee',
'產(chǎn)品服務(wù)費(fèi)合計(jì)(元/年)含稅' : 'total_product_service_fee_tax',
'維護(hù)費(fèi)用原始錄入值' : 'maintenance_cost_oev',
'場(chǎng)地費(fèi)原始錄入值' : 'site_fee_oev',
'電力引入原始錄入值' : 'power_incoming_oev',
'其他費(fèi)用原始錄入值' : 'other_fee_oev',
'created_by' : 'created_by',
'effective_date' : 'effective_date',
'row_key' : 'row_key',
'current_flag' : 'current_flag'
}
第二步,拼裝模板
拼裝使用 psycopg2.sql 模塊,參考Psycopg 2.7 無責(zé)任翻譯 - SQL string composition
query = psycopg2.sql.SQL( "insert into business_confirmation ({0}) values ({1})" ).format(
psycopg2.sql.SQL(', ').join( map( psycopg2.sql.Identifier, map_header.values() ) ),
psycopg2.sql.SQL(', ').join( map(psycopg2.sql.Placeholder, map_header.values() ) )
)
數(shù)據(jù)示例
Composed(
[
SQL('insert into business_confirmation ('),
Composed(
[
Identifier('district_and_county'), SQL(', '),
Identifier('business_confirmation_number'), SQL(', '),
Identifier('base_address_id'), SQL(', '),
Identifier('demand_confirmation_number'), SQL(', '),
Identifier('base_name'), SQL(', '),
Identifier('tower_type'), SQL(', '),
Identifier('room_type'), SQL(', '),
Identifier('share_info'), SQL(', '),
Identifier('antenna_height'), SQL(', '),
Identifier('antenna_number'), SQL(', '),
Identifier('system_number'), SQL(', '),
Identifier('is_rru_on_tower'), SQL(', '),
Identifier('tower_current__shared_customer'), SQL(', '),
Identifier('room_and_support_current_shared_customer'), SQL(', '),
Identifier('other_fee'), SQL(', '),
Identifier('other_fee_explanation'), SQL(', '),
Identifier('tower_base_price'), SQL(', '),
Identifier('room_and_supporting_base_price'), SQL(', '),
Identifier('maintenance_fee'), SQL(', '),
Identifier('product_unit'), SQL(', '),
Identifier('power_incoming_fee'), SQL(', '),
Identifier('site_fee'), SQL(', '),
Identifier('maintenance_fee_discount'), SQL(', '),
Identifier('site_fee_discount'), SQL(', '),
Identifier('power_incoming_fee_discount'), SQL(', '),
Identifier('tower_share_discount'), SQL(', '),
Identifier('room_and_support_share_discount'), SQL(', '),
Identifier('service_start_date'), SQL(', '),
Identifier('service_end_date'), SQL(', '),
Identifier('total_product_service_fee'), SQL(', '),
Identifier('total_product_service_fee_tax'), SQL(', '),
Identifier('maintenance_cost_oev'), SQL(', '),
Identifier('site_fee_oev'), SQL(', '),
Identifier('power_incoming_oev'), SQL(', '),
Identifier('other_fee_oev'), SQL(', '),
Identifier('created_by'), SQL(', '),
Identifier('effective_date'), SQL(', '),
Identifier('row_key'), SQL(', '),
Identifier('current_flag')
]
), SQL(') values ('),
Composed(
[
Placeholder('district_and_county'), SQL(', '),
Placeholder('business_confirmation_number'), SQL(', '),
Placeholder('base_address_id'), SQL(', '),
Placeholder('demand_confirmation_number'), SQL(', '),
Placeholder('base_name'), SQL(', '),
Placeholder('tower_type'), SQL(', '),
Placeholder('room_type'), SQL(', '),
Placeholder('share_info'), SQL(', '),
Placeholder('antenna_height'), SQL(', '),
Placeholder('antenna_number'), SQL(', '),
Placeholder('system_number'), SQL(', '),
Placeholder('is_rru_on_tower'), SQL(', '),
Placeholder('tower_current__shared_customer'), SQL(', '),
Placeholder('room_and_support_current_shared_customer'), SQL(', '),
Placeholder('other_fee'), SQL(', '),
Placeholder('other_fee_explanation'), SQL(', '),
Placeholder('tower_base_price'), SQL(', '),
Placeholder('room_and_supporting_base_price'), SQL(', '),
Placeholder('maintenance_fee'), SQL(', '),
Placeholder('product_unit'), SQL(', '),
Placeholder('power_incoming_fee'), SQL(', '),
Placeholder('site_fee'), SQL(', '),
Placeholder('maintenance_fee_discount'), SQL(', '),
Placeholder('site_fee_discount'), SQL(', '),
Placeholder('power_incoming_fee_discount'), SQL(', '),
Placeholder('tower_share_discount'), SQL(', '),
Placeholder('room_and_support_share_discount'), SQL(', '),
Placeholder('service_start_date'), SQL(', '),
Placeholder('service_end_date'), SQL(', '),
Placeholder('total_product_service_fee'), SQL(', '),
Placeholder('total_product_service_fee_tax'), SQL(', '),
Placeholder('maintenance_cost_oev'), SQL(', '),
Placeholder('site_fee_oev'), SQL(', '),
Placeholder('power_incoming_oev'), SQL(', '),
Placeholder('other_fee_oev'), SQL(', '),
Placeholder('created_by'), SQL(', '),
Placeholder('effective_date'), SQL(', '),
Placeholder('row_key'), SQL(', '),
Placeholder('current_flag')
]
), SQL(')')
]
)
Part III 裝填
conn = psycopg2.connect(
host = "10.1.1.110",
port = 5432,
dbname = "bj_wireless_center"
)
cur = conn.cursor()
# cur.mogrify(query, df.rename(columns = map_header).to_dict('records')[0])
cur.executemany( query, df.rename( columns = map_header ).to_dict( 'records' ) )
conn.commit()
cur.close()
conn.close()