Python腳本通過mycat查詢數(shù)據(jù)生成csv文件,壓縮后作為附件,群發(fā)郵件

步驟詳情:
1 定時任務 每天下午4點執(zhí)行
簡易功能代碼如下:
schedule.every().day.at("16:00").do(job)
2 匯總數(shù)據(jù)并生成csv
3 壓縮多個csv文件成一個zip文件
4 發(fā)送郵件(zip文件作為附件發(fā)送)

其他細節(jié):
關閉命令行python腳本也會定時執(zhí)行(生成日志文件到 ItemList_yu_gbk_0214.log),命令如下:
nohup python3 ItemList_yu_gbk_0214.py > ItemList_yu_gbk_0214.log

收到郵件效果


WeChat Screenshot_20230214175616.png

服務器上文件


ME1676368494413.png

總的功能代碼如下

#!/usr/bin/python
#  -*-coding:utf8 -*-
# nohup python3 ItemList_yu_gbk_0214.py > ItemList_yu_gbk_0214.log
from io import DEFAULT_BUFFER_SIZE
from time import sleep
import traceback
import pymysql
import smtplib
import zipfile
from email import encoders
from email.header import Header
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart    # 使用MIMEMultipart來標示這個郵件是多個部分組成的
from email.mime.base import MIMEBase
from email.mime.text import MIMEText   # 定義郵件內容
from email.utils import formataddr
import os,sys,multitesting
import time  
import shutil  
import datetime
import calendar
import schedule
import mimetypes
import pandas as pd
from pathlib import Path



item_list_dict = {}
item_dict = {}
PAUSE_TIME_LIST = [['0:10', '7:00'], ['12:00', '13:30']]
#---init Time---
begintime =  time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))


def isPause():
    _now_time = datetime.datetime.now()

    for _time in PAUSE_TIME_LIST:
        begin_time_str = _time[0]
        end_time_str = _time[1]
        begin_time = datetime.datetime.strptime(begin_time_str, '%H:%M')
        end_time = datetime.datetime.strptime(end_time_str, '%H:%M')

        _now_time_str = _now_time.strftime('%H:%M')
        now_time = datetime.datetime.strptime(_now_time_str, '%H:%M')

        if (now_time >= begin_time) and (now_time <= end_time):
            return True

    return False
def getAlltestID(_yhs, _begin_time, _end_time):
    sql = "SELECT * FROM dertest_test_info where yhstx_nsrtest = '{}' and create_time > '{}'  and create_time <= '{}' and dertest_status ='0'".format(
        _yhs, _begin_time, _end_time)
    global mycat_db
    cursor = mycat_db.cursor()
    cursor.execute(sql)
    rows = cursor.fetchall()

    if rows is None:
        return ''
    if len(rows) == 0:
        return ''
    # print("test:" + str(len(rows)))
    return rows

# '?????,(0:?????;1:?????;2:??????;3:??????;)'
def getdertestIDBytestID(_PID):
    sql = "SELECT * FROM dertest_intext_info WHERE dertest_protext_info_id = '" + str(
        _PID) + "'"
    # print(sql)
    global mycat_db
    cursor = mycat_db.cursor()
    cursor.execute(sql)
    rows = cursor.fetchall()
    if len(rows) == 0:
        return ''
    else:
        print("getdertestIDBytestID:" + str(len(rows)))

    return rows

def getdertestInfoBytestID(_PID):
    sql = "SELECT ifnull(ghf_yh,''),ifnull(ghf_zh,''),ifnull(ghf_dz,''),ifnull(ghf_dh,''),ifnull(nsrmc,''),ifnull(nsrtest,''),ifnull(bz,''),id FROM dertest_info WHERE  test_id = '" + str(_PID) + "'"
    # print(sql)
    global mycat_db
    cursor = mycat_db.cursor()
    cursor.execute(sql)
    rows = cursor.fetchall()
    if len(rows) == 0:
        return ''
    else:
        print("getGhfBankBytestID:" + str(len(rows)))
    return rows

def getSQDHBytestID(_obr_id):
    sql = "SELECT sqdh FROM dertest_batest_request WHERE  id = '" + str(_obr_id) + "'"
    # print(sql)
    global mycat_db
    cursor = mycat_db.cursor()
    cursor.execute(sql)
    row = cursor.fetchone()
    if row is None:
        return ""

    return str(row[0])

def getIsMail(_dertest_id):
   sql = "SELECT * FROM dertest_intest_info_ext WHERE dertest_id ='"+_dertest_id+"'"
   global mycat_db
   cursor = mycat_db.cursor()
   cursor.execute(sql)
   row = cursor.fetchone()
   if row is None:
       return "非郵寄"
   else:
       print("getIsMail:" + str(len(row)))
   return "郵寄"

def getItemsBydertestID(_OID,_sqdh,_ddh,_fpdm, _fphm, _yhs_mc, _yhs, _ghf_mc, _ghf_nsrtest, _ghf_bank, _ghf_address,_kprq,_kpr,_kplx,_fpbeizhu,_kpzt,_qingdanbzhi,_zuofeibz,_dingdanlx,_youjiFangshi):
    sql = "SELECT * FROM dertest_item_info WHERE dertest_info_id = '" + str(_OID) + "'"
    global mycat_db

    global item_list_dict
    cursor = mycat_db.cursor()
    cursor.execute(sql)
    rows = cursor.fetchall()
    # print("item:" + str(len(rows)))
    for _item in rows:
        if isBadItem(_item):
            continue
        itemInfo = _sqdh + ','
        itemInfo +=str(_item[0]).split("*")[2]+','
        itemInfo +=str(_item[0]).split("*")[1]+','
        itemInfo += _youjiFangshi

        writeItemInfo(_yhs,itemInfo)

def makeTitle(_yhs):
    itemInfo = "申號,據(jù)號,銀行賬號,地址電話合計金額(含稅),合計金額(不含稅),合計稅額,稅率,訂單類型,商品名稱,簡稱,郵式"
    writeItemInfo(_yhs, itemInfo)
# 'xmmc,ggxh,xmdw,xmdj,xmsl,xmje'
def isBadItem(_item):
    #    if _item[3] == '1090505010000000000':
    #        return False
    #    if _item[3] == '1090414010000000000':
    #        return False
    #    if _item[3] == '1030204020000000000':
    #        return False

    return False

def makeALLItemsToList(_yhs, my_pid_list):
    # print("----------------------------????????????????-----------------------")
    makeTitle(_yhs)
    for _pid in my_pid_list:
        dertestid_list = getdertestIDBytestID(_pid[0])
        if dertestid_list == '':
            continue
        ghf_info = getdertestInfoBytestID(_pid[0])
        dertest_id = str(ghf_info[0][7])
        youjifangshi = getIsMail(dertest_id)
        qingdanbz = str(dertestid_list[0][-2])
        OID = dertestid_list[0][0]
        obr_id = str(_pid[3])
        if qingdanbz == "1":
            qingdanbz = "有"
        if qingdanbz == "0":
            qingdanbz = "無"

        if (dertestid_list == ''):
            continue
        for _dertestid in dertestid_list:
            while (isPause() == True):
                sleep(60)
                print('Sleeping........')
                continue

            getItemsBydertestID(OID,rere,ddrh,saf,sdf,nsr_mc,_yhs, dgf,gd,yui,gd,vgh,
                              iiy,a,gd,kpzt,gd,zfbz,h,fgh)

def writeItemInfo(yhs_mc,infos):
    temp_s = infos
    # path = yhs_mc+".csv"
    path ='/home/tom/data/yu/'+yhs_mc+".csv"
    f = open(path, "a+", encoding="gbk")
    f.write("%s" % (temp_s) + "\n")
    f.close()

def getyhsList():
    file = open("yhs_list.txt")
    # file = open("C:\\Users\\test\\Desktop\\111\\yhs_list.txt")
    yhs_list = file.readlines()
    return yhs_list

def countAllItems(_yhs_list, _begin_time, _end_time):
    global item_dict
    global item_list_dict
    yhs_index = 0

    for _yhs in _yhs_list:
        yhs_index += 1
        if os.path.exists(_yhs.strip('\n')+".csv"):
            os.remove(_yhs.strip('\n')+".csv")
        test_list = getAlltestID(_yhs.strip('\n'), _begin_time, _end_time)
        makeALLItemsToList(_yhs.strip('\n'), test_list)

def makeDBTOConnection():
    MYCAT_HOST = "101.72.237.88"
    MYCAT_PORT = 8066
    MYCAT_USER = "txds"
    MYCAT_PASSWORD = "2erOxFSAyrIOeLKJODSA3g6k"
    MYCAT_DATABASE = "txds_sales_dertest"
    toDBcon = pymysql.connect(

        host=MYCAT_HOST,  # IP??MySQL??????????IP???
        port=MYCAT_PORT,  # ???????3306???????????
        user=MYCAT_USER,  # ??????????
        password=MYCAT_PASSWORD,  # ???????????
        database=MYCAT_DATABASE,  # ???????????
        charset='utf8'  # ????????????'utf-8'
    )
    return toDBcon

def makeDBTestConnection():
    MYCAT_HOST = "102.14.234.89"
    MYCAT_PORT = 3306
    MYCAT_USER = "txds"
    MYCAT_PASSWORD = "txds@123"
    MYCAT_DATABASE = "txds_sales_dertest_hbq"
    toDBcon = pymysql.connect(

        host=MYCAT_HOST,  # IP??MySQL??????????IP???
        port=MYCAT_PORT,  # ???????3306???????????
        user=MYCAT_USER,  # ??????????
        password=MYCAT_PASSWORD,  # ???????????
        database=MYCAT_DATABASE,  # ???????????
        charset='utf8'  # ????????????'utf-8'
    )
    return toDBcon

def sendtxtmail():
    filepath = '/home/tom/data/yuData/騰訊讀書每日發(fā)數(shù)明細匯總'+datetime.datetime.now().strftime("%Y-%m-%d")+".zip" #壓縮后的文件名

    smtp_server = "smtp.mxhichina.com"  # 發(fā)送郵箱服務器
    username = 'yang@163.com'
    password = 'IPMS*5873957'
    sender = 'yang@163.com'  # 發(fā)送者的郵箱 
    receivers = ["gao@163.com","guo@163.com",
                 "guo@.163.cn","wang@.163.cn"]
    EMAIL_FROM_NAME = '稅組'   # 自定義發(fā)件人名稱

    # time = datetime.datetime.today().strftime("%m-%d %H:%M")
    msg = MIMEMultipart()
    # 郵件正文
    msg.attach(MIMEText(" \r\n  騰訊讀書每日發(fā)數(shù)明細匯總,結果請查看附件。",'plain','utf-8'))   # 文本內容換行\(zhòng)r\n
    msg['From'] = formataddr(pair=(EMAIL_FROM_NAME, sender))     # 自定義發(fā)件人的名稱
    msg['To'] = ";".join(receivers)  # 發(fā)送給多個好友
    # subject = "{}騰訊讀書每日發(fā)數(shù)明細匯總".format(time)
    subject = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+"騰訊讀書發(fā)數(shù)明細匯總"
    msg['Subject'] = subject

    data = open(filepath, 'rb')
    ctype, encoding = mimetypes.guess_type(filepath)
    if ctype is None or encoding is not None:
        ctype = 'application/octet-stream'
    maintype, subtype = ctype.split('/', 1)
    file_msg = MIMEBase(maintype, subtype)
    file_msg.set_payload(data.read())
    data.close()
    encoders.encode_base64(file_msg)  # 把附件編碼
    file_msg.add_header('Content-Disposition', 'attachment', filename="騰訊讀書每日發(fā)數(shù)明細匯總"+datetime.datetime.now().strftime("%Y-%m-%d")+".zip")  # 修改郵件頭
    msg.attach(file_msg)
    try:
        server = smtplib.SMTP(smtp_server)
        server.login(username,password)
        server.sendmail(sender,receivers,msg.as_string())
        server.quit()
        print("=================================== 郵件發(fā)送成功 =================================== %s" % begintime)
        print("刪除壓縮包文件路徑:", filepath)
        # 發(fā)送成功后刪除壓縮包文件
        if os.path.exists(filepath):
            os.remove(filepath)
            print("=================================== 壓縮包刪除成功 =================================== %s" % begintime)
        shutil.rmtree('/home/tom/data/yu')  
        os.mkdir('/home/tom/data/yu')  
        print("=================================== 壓縮包刪除成功 =================================== %s" % begintime)
    except Exception as err:
        print("=================================== 郵件發(fā)送失敗 =================================== %s" % begintime)
        print(err)

def batest_zip(start_dir,zip_file):
    # start_dir要壓縮的文件路徑
    # zip_file 輸出zip文件的路徑
    zip_file = zip_file + '.zip'
    z = zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED)
    print(z)
    for path, dirname, file_name in os.walk(start_dir):
#         print("文件夾根路徑:", path)
        fpath = path.replace(start_dir, '') # 去除根路徑名稱
#         print("--去除根路徑:", fpath)
        fpath = fpath and fpath + os.sep   # 在原fpath加上\
#         print("****去除根路徑+\ :", fpath)

        for filename in file_name: # 逐個循環(huán)讀取文檔名稱
#             print('--', fpath+filename)
#             fpath + filename完整構成每個文檔的去根絕對路徑
#             s = os.path.join(path, filename)   # 補齊全部的絕對路徑
#             print('*-*',s)

            z.write(os.path.join(path, filename), fpath + filename) # 實現(xiàn)在輸出路徑的Zip壓縮操作
    z.close()
    return zip_file

# 定時任務調用的函數(shù)
def job():
    print("=================================== 匯總數(shù)據(jù)開始 =================================== %s" % begintime)
    global mycat_db    
    mycat_db = makeDBTOConnection()
    yhs_list = getyhsList()
    # 1匯總數(shù)據(jù)
    countAllItems(yhs_list, datetime.datetime.now().strftime("%Y-%m")+"-01 00:00:00", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    print("=================================== 匯總數(shù)據(jù)結束 =================================== %s" % begintime)

    # 2壓縮文件
    print("=================================== 壓縮文件開始 =================================== %s" % begintime)
    # start_dir要壓縮的文件路徑
    # zip_file 輸出zip文件的路徑
    batest_zip("/home/tom/data/yu","/home/tom/data/yuData/騰訊讀書每日發(fā)數(shù)明細匯總"+datetime.datetime.now().strftime("%Y-%m-%d"))
    
    print("=================================== 壓縮文件結束 =================================== %s" % begintime)
    time.sleep(1)
    # 3發(fā)送郵件并刪除文件
    print("=================================== 郵件開始發(fā)送 =================================== %s" % begintime)
    sendtxtmail()
    return 'main func over'

# 設置定時任務啟動的時間 ,每天16:00 啟動
schedule.every().day.at("16:00").do(job)
while True:
    schedule.run_pending()
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容