步驟詳情:
1 定時任務 每天下午4點執(zhí)行
簡易功能代碼如下:
schedule.every().day.at("16:00").do(job)
2 匯總數(shù)據(jù)并生成csv
3 壓縮多個csv文件成一個zip文件
4 發(fā)送郵件(zip文件作為附件發(fā)送)
其他細節(jié):
關閉命令行python腳本也會定時執(zhí)行(生成日志文件到 ItemList_yu_gbk_0214.log),命令如下:
nohup python3 ItemList_yu_gbk_0214.py > ItemList_yu_gbk_0214.log
收到郵件效果

WeChat Screenshot_20230214175616.png
服務器上文件

ME1676368494413.png
總的功能代碼如下
#!/usr/bin/python
# -*-coding:utf8 -*-
# nohup python3 ItemList_yu_gbk_0214.py > ItemList_yu_gbk_0214.log
from io import DEFAULT_BUFFER_SIZE
from time import sleep
import traceback
import pymysql
import smtplib
import zipfile
from email import encoders
from email.header import Header
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart # 使用MIMEMultipart來標示這個郵件是多個部分組成的
from email.mime.base import MIMEBase
from email.mime.text import MIMEText # 定義郵件內容
from email.utils import formataddr
import os,sys,multitesting
import time
import shutil
import datetime
import calendar
import schedule
import mimetypes
import pandas as pd
from pathlib import Path
item_list_dict = {}
item_dict = {}
PAUSE_TIME_LIST = [['0:10', '7:00'], ['12:00', '13:30']]
#---init Time---
begintime = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
def isPause():
_now_time = datetime.datetime.now()
for _time in PAUSE_TIME_LIST:
begin_time_str = _time[0]
end_time_str = _time[1]
begin_time = datetime.datetime.strptime(begin_time_str, '%H:%M')
end_time = datetime.datetime.strptime(end_time_str, '%H:%M')
_now_time_str = _now_time.strftime('%H:%M')
now_time = datetime.datetime.strptime(_now_time_str, '%H:%M')
if (now_time >= begin_time) and (now_time <= end_time):
return True
return False
def getAlltestID(_yhs, _begin_time, _end_time):
sql = "SELECT * FROM dertest_test_info where yhstx_nsrtest = '{}' and create_time > '{}' and create_time <= '{}' and dertest_status ='0'".format(
_yhs, _begin_time, _end_time)
global mycat_db
cursor = mycat_db.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
if rows is None:
return ''
if len(rows) == 0:
return ''
# print("test:" + str(len(rows)))
return rows
# '?????,(0:?????;1:?????;2:??????;3:??????;)'
def getdertestIDBytestID(_PID):
sql = "SELECT * FROM dertest_intext_info WHERE dertest_protext_info_id = '" + str(
_PID) + "'"
# print(sql)
global mycat_db
cursor = mycat_db.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
if len(rows) == 0:
return ''
else:
print("getdertestIDBytestID:" + str(len(rows)))
return rows
def getdertestInfoBytestID(_PID):
sql = "SELECT ifnull(ghf_yh,''),ifnull(ghf_zh,''),ifnull(ghf_dz,''),ifnull(ghf_dh,''),ifnull(nsrmc,''),ifnull(nsrtest,''),ifnull(bz,''),id FROM dertest_info WHERE test_id = '" + str(_PID) + "'"
# print(sql)
global mycat_db
cursor = mycat_db.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
if len(rows) == 0:
return ''
else:
print("getGhfBankBytestID:" + str(len(rows)))
return rows
def getSQDHBytestID(_obr_id):
sql = "SELECT sqdh FROM dertest_batest_request WHERE id = '" + str(_obr_id) + "'"
# print(sql)
global mycat_db
cursor = mycat_db.cursor()
cursor.execute(sql)
row = cursor.fetchone()
if row is None:
return ""
return str(row[0])
def getIsMail(_dertest_id):
sql = "SELECT * FROM dertest_intest_info_ext WHERE dertest_id ='"+_dertest_id+"'"
global mycat_db
cursor = mycat_db.cursor()
cursor.execute(sql)
row = cursor.fetchone()
if row is None:
return "非郵寄"
else:
print("getIsMail:" + str(len(row)))
return "郵寄"
def getItemsBydertestID(_OID,_sqdh,_ddh,_fpdm, _fphm, _yhs_mc, _yhs, _ghf_mc, _ghf_nsrtest, _ghf_bank, _ghf_address,_kprq,_kpr,_kplx,_fpbeizhu,_kpzt,_qingdanbzhi,_zuofeibz,_dingdanlx,_youjiFangshi):
sql = "SELECT * FROM dertest_item_info WHERE dertest_info_id = '" + str(_OID) + "'"
global mycat_db
global item_list_dict
cursor = mycat_db.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
# print("item:" + str(len(rows)))
for _item in rows:
if isBadItem(_item):
continue
itemInfo = _sqdh + ','
itemInfo +=str(_item[0]).split("*")[2]+','
itemInfo +=str(_item[0]).split("*")[1]+','
itemInfo += _youjiFangshi
writeItemInfo(_yhs,itemInfo)
def makeTitle(_yhs):
itemInfo = "申號,據(jù)號,銀行賬號,地址電話合計金額(含稅),合計金額(不含稅),合計稅額,稅率,訂單類型,商品名稱,簡稱,郵式"
writeItemInfo(_yhs, itemInfo)
# 'xmmc,ggxh,xmdw,xmdj,xmsl,xmje'
def isBadItem(_item):
# if _item[3] == '1090505010000000000':
# return False
# if _item[3] == '1090414010000000000':
# return False
# if _item[3] == '1030204020000000000':
# return False
return False
def makeALLItemsToList(_yhs, my_pid_list):
# print("----------------------------????????????????-----------------------")
makeTitle(_yhs)
for _pid in my_pid_list:
dertestid_list = getdertestIDBytestID(_pid[0])
if dertestid_list == '':
continue
ghf_info = getdertestInfoBytestID(_pid[0])
dertest_id = str(ghf_info[0][7])
youjifangshi = getIsMail(dertest_id)
qingdanbz = str(dertestid_list[0][-2])
OID = dertestid_list[0][0]
obr_id = str(_pid[3])
if qingdanbz == "1":
qingdanbz = "有"
if qingdanbz == "0":
qingdanbz = "無"
if (dertestid_list == ''):
continue
for _dertestid in dertestid_list:
while (isPause() == True):
sleep(60)
print('Sleeping........')
continue
getItemsBydertestID(OID,rere,ddrh,saf,sdf,nsr_mc,_yhs, dgf,gd,yui,gd,vgh,
iiy,a,gd,kpzt,gd,zfbz,h,fgh)
def writeItemInfo(yhs_mc,infos):
temp_s = infos
# path = yhs_mc+".csv"
path ='/home/tom/data/yu/'+yhs_mc+".csv"
f = open(path, "a+", encoding="gbk")
f.write("%s" % (temp_s) + "\n")
f.close()
def getyhsList():
file = open("yhs_list.txt")
# file = open("C:\\Users\\test\\Desktop\\111\\yhs_list.txt")
yhs_list = file.readlines()
return yhs_list
def countAllItems(_yhs_list, _begin_time, _end_time):
global item_dict
global item_list_dict
yhs_index = 0
for _yhs in _yhs_list:
yhs_index += 1
if os.path.exists(_yhs.strip('\n')+".csv"):
os.remove(_yhs.strip('\n')+".csv")
test_list = getAlltestID(_yhs.strip('\n'), _begin_time, _end_time)
makeALLItemsToList(_yhs.strip('\n'), test_list)
def makeDBTOConnection():
MYCAT_HOST = "101.72.237.88"
MYCAT_PORT = 8066
MYCAT_USER = "txds"
MYCAT_PASSWORD = "2erOxFSAyrIOeLKJODSA3g6k"
MYCAT_DATABASE = "txds_sales_dertest"
toDBcon = pymysql.connect(
host=MYCAT_HOST, # IP??MySQL??????????IP???
port=MYCAT_PORT, # ???????3306???????????
user=MYCAT_USER, # ??????????
password=MYCAT_PASSWORD, # ???????????
database=MYCAT_DATABASE, # ???????????
charset='utf8' # ????????????'utf-8'
)
return toDBcon
def makeDBTestConnection():
MYCAT_HOST = "102.14.234.89"
MYCAT_PORT = 3306
MYCAT_USER = "txds"
MYCAT_PASSWORD = "txds@123"
MYCAT_DATABASE = "txds_sales_dertest_hbq"
toDBcon = pymysql.connect(
host=MYCAT_HOST, # IP??MySQL??????????IP???
port=MYCAT_PORT, # ???????3306???????????
user=MYCAT_USER, # ??????????
password=MYCAT_PASSWORD, # ???????????
database=MYCAT_DATABASE, # ???????????
charset='utf8' # ????????????'utf-8'
)
return toDBcon
def sendtxtmail():
filepath = '/home/tom/data/yuData/騰訊讀書每日發(fā)數(shù)明細匯總'+datetime.datetime.now().strftime("%Y-%m-%d")+".zip" #壓縮后的文件名
smtp_server = "smtp.mxhichina.com" # 發(fā)送郵箱服務器
username = 'yang@163.com'
password = 'IPMS*5873957'
sender = 'yang@163.com' # 發(fā)送者的郵箱
receivers = ["gao@163.com","guo@163.com",
"guo@.163.cn","wang@.163.cn"]
EMAIL_FROM_NAME = '稅組' # 自定義發(fā)件人名稱
# time = datetime.datetime.today().strftime("%m-%d %H:%M")
msg = MIMEMultipart()
# 郵件正文
msg.attach(MIMEText(" \r\n 騰訊讀書每日發(fā)數(shù)明細匯總,結果請查看附件。",'plain','utf-8')) # 文本內容換行\(zhòng)r\n
msg['From'] = formataddr(pair=(EMAIL_FROM_NAME, sender)) # 自定義發(fā)件人的名稱
msg['To'] = ";".join(receivers) # 發(fā)送給多個好友
# subject = "{}騰訊讀書每日發(fā)數(shù)明細匯總".format(time)
subject = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+"騰訊讀書發(fā)數(shù)明細匯總"
msg['Subject'] = subject
data = open(filepath, 'rb')
ctype, encoding = mimetypes.guess_type(filepath)
if ctype is None or encoding is not None:
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
file_msg = MIMEBase(maintype, subtype)
file_msg.set_payload(data.read())
data.close()
encoders.encode_base64(file_msg) # 把附件編碼
file_msg.add_header('Content-Disposition', 'attachment', filename="騰訊讀書每日發(fā)數(shù)明細匯總"+datetime.datetime.now().strftime("%Y-%m-%d")+".zip") # 修改郵件頭
msg.attach(file_msg)
try:
server = smtplib.SMTP(smtp_server)
server.login(username,password)
server.sendmail(sender,receivers,msg.as_string())
server.quit()
print("=================================== 郵件發(fā)送成功 =================================== %s" % begintime)
print("刪除壓縮包文件路徑:", filepath)
# 發(fā)送成功后刪除壓縮包文件
if os.path.exists(filepath):
os.remove(filepath)
print("=================================== 壓縮包刪除成功 =================================== %s" % begintime)
shutil.rmtree('/home/tom/data/yu')
os.mkdir('/home/tom/data/yu')
print("=================================== 壓縮包刪除成功 =================================== %s" % begintime)
except Exception as err:
print("=================================== 郵件發(fā)送失敗 =================================== %s" % begintime)
print(err)
def batest_zip(start_dir,zip_file):
# start_dir要壓縮的文件路徑
# zip_file 輸出zip文件的路徑
zip_file = zip_file + '.zip'
z = zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED)
print(z)
for path, dirname, file_name in os.walk(start_dir):
# print("文件夾根路徑:", path)
fpath = path.replace(start_dir, '') # 去除根路徑名稱
# print("--去除根路徑:", fpath)
fpath = fpath and fpath + os.sep # 在原fpath加上\
# print("****去除根路徑+\ :", fpath)
for filename in file_name: # 逐個循環(huán)讀取文檔名稱
# print('--', fpath+filename)
# fpath + filename完整構成每個文檔的去根絕對路徑
# s = os.path.join(path, filename) # 補齊全部的絕對路徑
# print('*-*',s)
z.write(os.path.join(path, filename), fpath + filename) # 實現(xiàn)在輸出路徑的Zip壓縮操作
z.close()
return zip_file
# 定時任務調用的函數(shù)
def job():
print("=================================== 匯總數(shù)據(jù)開始 =================================== %s" % begintime)
global mycat_db
mycat_db = makeDBTOConnection()
yhs_list = getyhsList()
# 1匯總數(shù)據(jù)
countAllItems(yhs_list, datetime.datetime.now().strftime("%Y-%m")+"-01 00:00:00", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print("=================================== 匯總數(shù)據(jù)結束 =================================== %s" % begintime)
# 2壓縮文件
print("=================================== 壓縮文件開始 =================================== %s" % begintime)
# start_dir要壓縮的文件路徑
# zip_file 輸出zip文件的路徑
batest_zip("/home/tom/data/yu","/home/tom/data/yuData/騰訊讀書每日發(fā)數(shù)明細匯總"+datetime.datetime.now().strftime("%Y-%m-%d"))
print("=================================== 壓縮文件結束 =================================== %s" % begintime)
time.sleep(1)
# 3發(fā)送郵件并刪除文件
print("=================================== 郵件開始發(fā)送 =================================== %s" % begintime)
sendtxtmail()
return 'main func over'
# 設置定時任務啟動的時間 ,每天16:00 啟動
schedule.every().day.at("16:00").do(job)
while True:
schedule.run_pending()