import logging
import logging.handlers
import queue
import time
import threading
import schedule
import os
import sys
import datetime
import hashlib
from pathlib import Path
import subprocess
import filecmp
import git
from git import Repo
import shutil
import os
# 第一次跑讀取當(dāng)前的config文件和tasks目錄下的py文件存到內(nèi)存,定時(shí)任務(wù)跑git pull 下載下來的config文件和tasks目錄下的py文件和內(nèi)存作對(duì)比,
# 有修改了的py腳本對(duì)應(yīng)的schedule清掉,再新加schedule,其他schedule正常跑
# 對(duì)比config每一行配置是否改變,/對(duì)比tasks下文件數(shù)量、哪個(gè)py文件內(nèi)容、文件名是否改變,變了就停掉這一行對(duì)應(yīng)的py文件的schedule,其他schedule不停
def modify(pull_modi):
global files
global name_path
global sem
global run_job
# 刪除本地
subprocess.run('rm -rf ./tasks')
subprocess.run('rm ./config.csv')
#主線程join 運(yùn)行完再走后面的
repo = Repo("./")
git = repo.git
print(git.fetch('--all'))
git.reset('--hard', 'origin')
print("git pull is running", datetime.datetime.now())
logger.info("git pull is running")
if not Path('./tasks').exists():
logger.error("git拉取最新文件./tasks還沒完成,不能開啟新schedule,這個(gè)緩沖時(shí)間內(nèi)舊的schedule也會(huì)找不到文件")
print("%s is running" % pull_modi, datetime.datetime.now())
# new_job 定時(shí)任務(wù)列表
# print(new_job)
# 讀新下載下來的配置文件
with open('config.csv', 'r') as f2:
# csv文件第一行是格式注釋,第一行不讀跳過
files2 = f2.readlines()[1:]
# 每行內(nèi)容文件名和頻率
file1 = []
file2 = []
for x in files:
file1.append(x.rstrip())
for y in files2:
file2.append(y.rstrip())
# 舊py路徑
print('name_path',
name_path) # name_path {'1.py': './tasks\\1.py', '2.py': './tasks\\2.py', '3.py': './tasks\\3.py', '4.py': './tasks\\4.py', '5.py': './tasks\\5.py', '6.py': './tasks\\6.py'}
logger.info("old_py_path:%s" % name_path)
old_files = []
for key in name_path.keys():
old_files.append(key)
# 新tasks文件夾下py文件的路徑
for root_new, dirs_new, py_new in os.walk('./tasks'):
print(len(py_new))
new_path = {}
for file in py_new:
# print(os.path.join(root_new,file))
new_path[file] = os.path.join(root_new, file)
print('new_path',
new_path)
logger.info("new_py_path:%s" % new_path)
# 新舊py文件名對(duì)比
con_py = []
for i in range(0, len(files)):
config = {}
args = files[i].rstrip('\n').split(',')
if not args[0].endswith('.py'):
# print('%s格式不正確,不會(huì)定時(shí)跑,其他繼續(xù)遍歷' % args[0])
continue
else:
con_py.append(args[0])
print('old_py', con_py) # con_py ['1.py', '2.py', '3.py', '4.py', '5.py', '6.py', '8.py']
logger.info("old_py:%s" % con_py)
con_new_py = []
for i in range(0, len(files2)):
config = {}
args = files2[i].rstrip('\n').split(',')
if not args[0].endswith('.py'):
# print('%s格式不正確,不會(huì)定時(shí)跑,其他繼續(xù)遍歷' % args[0])
continue
else:
con_new_py.append(args[0])
print('new_py', con_new_py)
logger.info("new_py:%s" % con_new_py)
# 文件名稱沒變
same_py = list(set(con_new_py).intersection(set(con_py)))
print('same_py', same_py)
logger.info("same_py:%s" % same_py)
print('old_config', file1)
logger.info("old_config:%s" % file1)
print('new_config', file2)
logger.info("new_config:%s" % file2)
# 頻率文件名稱都沒變
same_py_con = list(set(file1).intersection(set(file2)))
print('same_py_con', same_py_con)
clear = []
add = []
print('new_tasks', py_new)
print('old_tasks', old_files)
logger.info("new_tasks:%s" % py_new)
logger.info("old_tasks:%s" % old_files)
# 頻率or文件名稱變化了
for new in file2:
if new not in file1 and new.split(',')[0] in py_new:
add.append(new.split(',')[0])
for old in file1:
if old not in file2:
clear.append(old.split(',')[0])
# 頻率文件名稱都沒變,文件增刪改
for ss in same_py_con:
sspy = ss.split(',')[0]
# 文件還要在最新的tasks目錄下
if sspy in py_new and sspy in old_files:
print('新舊tasks文件夾都有這個(gè)py文件%s,git pull后文件夾下是新的,python xx.py運(yùn)行的是最新的' % sspy)
logger.info('新舊tasks文件夾都有這個(gè)py文件%s,git pull后文件夾下是新的,python xx.py運(yùn)行的是最新的' % sspy)
elif sspy not in py_new and sspy not in old_files:
print('新舊tasks文件夾都無py文件%s,不清除舊的,也不加新的' % sspy)
logger.info('新舊tasks文件夾都無py文件%s,不清除舊的,也不加新的' % sspy)
elif sspy in py_new and sspy not in old_files:
add.append(sspy)
logger.info('新開啟%s' % sspy)
elif sspy not in py_new and sspy in old_files:
clear.append(sspy)
logger.info('清除舊的%s' % sspy)
print('con_py', con_py)
print('con_new_py', con_new_py)
print('add', add)
print('clear', clear)
logger.info('新開啟的任務(wù)一共有%s' % add)
logger.info('清除的任務(wù)一共有%s' % clear)
jobs = []
try:
for ad in add:
if not ad.endswith('.py'):
logger.warning('%s格式不正確,不會(huì)定時(shí)跑,其他繼續(xù)遍歷' % ad)
continue
for fi in range(0, len(file2)): # config py[1,3,5,7,8,11]
config = {}
args = file2[fi].rstrip('\n').split(',')
if ad == args[0]:
print('會(huì)跑',ad)
logger.info('新跑%s' % ad)
config['command'] = 'python ./tasks/' + args[0]
else:
continue
config['at'] = args[1]
if args[2].isdigit() and args[3].isdigit() and args[4].isdigit():
config['seconds'] = args[2]
config['minutes'] = args[3]
config['hours'] = args[4]
else:
logger.warning('%s時(shí)/分/秒格式不正確,不會(huì)定時(shí)跑,其他繼續(xù)遍歷' % args[0])
continue
if args[5].isdigit() and int(args[5]) in range(0,8):
config['day'] = args[5]
else:
logger.warning('%s星期%s格式不正確,不會(huì)定時(shí)跑,其他繼續(xù)遍歷' % (args[0],args[5]))
continue
jobs.append(config)
except NameError:
logger.error('配置文件變量獲取失敗')
except ValueError:
logger.error('配置定時(shí)參數(shù)類型轉(zhuǎn)換異常')
except:
logger.error('配置定時(shí)參數(shù)異常')
logger.info('配置文件和定時(shí)參數(shù)如下')
logger.info(jobs)
# 獲取各文件名和配置的頻率,抽出來所有不為空的值和它對(duì)應(yīng)的鍵,存到數(shù)組
each = []
try:
for jo in jobs:
content = {}
for key, value in jo.items():
if value != '0':
content[key] = value
each.append(content)
except NameError:
logger.error('配置定時(shí)參數(shù)不全為0的內(nèi)容獲取失敗')
except:
logger.error('定時(shí)參數(shù)不全為0的獲取異常')
com_job = []
try:
for eac in each:
key = list(eac.keys())
if len(key) != 2:
print('%s所配置的頻率不支持,這個(gè)文件不跑,其他文件正常跑' % eac['command']) # 命令和頻率放在同一個(gè)字典,key長度不為2則不添加schedule列表
logger.warning('%s所配置的頻率不支持,這個(gè)文件不跑,其他文件正常跑' % eac['command'])
# 判斷條件不能是key[1] == 'at',key每次排序都不一樣
elif 'at' in key:
# schedule.every().day.at("19:13").do(thread,job,"python tasks/4.py")
s = 'schedule.every().day.at(\"%s\").do(thread,sem,job,\"%s\")' % (eac['at'], eac['command'])
print('每天幾點(diǎn)跑', s)
# com_job.append(s + '.tag("3.py")')
com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
elif 'day' in key:
k = int(eac['day']) - 1
s = 'schedule.every().%s.do(thread,sem,job,\"%s\")' % (weekday[k], eac['command'])
print('每星期幾跑', s)
com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
elif key[1] or key[0] in ['seconds', 'minutes', 'hours']:
print('每幾秒、分、時(shí)跑', key)
# key[0] key[1]每次跑的結(jié)果不一樣,所有不能以角標(biāo)取固定的值
if 'seconds' in key:
s = 'schedule.every(%s).seconds.do(thread,sem,job,\"%s\")' % (eac['seconds'], eac['command'])
com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
elif 'minutes' in key:
s = 'schedule.every(%s).minutes.do(thread,sem,job,\"%s\")' % (eac['minutes'], eac['command'])
com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
elif 'hours' in key:
s = 'schedule.every(%s).hours.do(thread,sem,job,\"%s\")' % (eac['hours'], eac['command'])
com_job.append('%s.tag("%s")' % (s, eac['command'].split('/')[2]))
print('配置的定時(shí)任務(wù)是:', com_job)
logger.info('新配置的定時(shí)任務(wù)是:%s' % com_job)
except NameError:
logger.error('schedule語句拼接失敗,其中有參數(shù)獲取異常')
except:
logger.error('schedule語句錯(cuò)誤')
for cl in clear:
print('schedule.clear("%s")' % cl)
logger.info('schedule.clear("%s")' % cl)
eval('schedule.clear("%s")' % cl)
sem = threading.Semaphore(len(run_job) + 1-len(clear)+len(add))
logger.info('新設(shè)置最大子線程數(shù)量%d' % int(len(run_job)-len(clear)+len(add)))
print('新設(shè)置最大子線程數(shù)量%d' % int(len(run_job)-len(clear)+len(add)))
try:
for new in com_job:
print('new schedule', new)
logger.info('new schedule%s' % new)
eval(new)
except:
print('new schedule運(yùn)行異常')
# log打印,先創(chuàng)建文件夾
log_dir = Path('logs')
if not log_dir.exists():
os.makedirs('logs')
# 創(chuàng)建Logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# 文件Handler
fileHandler = logging.handlers.TimedRotatingFileHandler('logs/run_tasks%s.log' % datetime.datetime.now().strftime('%y%m%d'), encoding='UTF-8', when='D', interval=1, backupCount=7)
fileHandler.setLevel(logging.NOTSET) #輸出warning級(jí)別以上的日志
# Formatter
formatter = logging.Formatter('%(asctime)s-line %(lineno)d in %(filename)s - %(levelname)s - %(message)s')
fileHandler.setFormatter(formatter)
# 添加到Logger中
logger.addHandler(fileHandler)
files = ''
py_files = ''
weekday = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
name_path = {}
files2= ''
#封裝讀配置文件方法
def config_read():
global files
global weekday
global name_path
try:
# 讀取配置文件,文件名+頻率
with open('config.csv', 'r') as f:
# csv文件第一行是格式注釋,第一行不讀跳過
files = f.readlines()[1:]
if len(files)==0:
logger.error("配置文件無內(nèi)容,請(qǐng)?zhí)詈门渲梦募僦匦逻\(yùn)行")
sys.exit()
print('配置的腳本文件個(gè)數(shù)',len(files))
logger.info('配置的腳本文件個(gè)數(shù)%d' % len(files))
except FileNotFoundError:
logger.error("無配置文件,請(qǐng)?zhí)詈门渲梦募僦匦逻\(yùn)行")
sys.exit()
except OSError:
logger.error('配置文件打開失敗')
except:
logger.error('配置文件讀取異常')
try:
# 對(duì)比config.csv里的文件個(gè)數(shù)和要跑的Py文件數(shù)量是否一致
for root, dirs, py_files in os.walk('./tasks'):
print(len(py_files))
for file in py_files:
# print(os.path.join(root,file))
name_path[file] = os.path.join(root, file)
if len(files) != len(py_files):
logger.warning('config.csv里的文件個(gè)數(shù)和要跑的Py文件數(shù)量不一致,請(qǐng)檢查')
except:
logger.error('腳本文件夾讀取異常')
# 定時(shí)單位 {'count','at','seconds', 'minutes', 'minute', 'hours', 'hour','days', 'day', 'start', 'end', 'weekday'}刪減后只支持'at','seconds','minutes','hours','day'
# filename.py,'at','seconds','minutes','hours','day' 只支持配一個(gè)參數(shù)值,如果每天跑,必須傳at其他不認(rèn),day 1-7 對(duì)應(yīng)星期一~星期日
jobs = []
# 文件夾下面的文件
for py_name in os.walk('./tasks'):
print(py_name[2])
try:
for i in range(0, len(files)):
config = {}
args = files[i].rstrip('\n').split(',')
print('配置文件第%d行內(nèi)容是%s' % (i+1, files[i]))
# 配置不合法的處理
if args[1] == '0' and args[2] == '0' and args[3] == '0' and args[4] == '0' and args[5] == '0':
print('%s頻率不能全為空' % args[0])
continue
if args[0] not in py_name[2]:
print('%s文件不在tasks文件夾' % args[0])
continue
if args[0].endswith('.py'):
config['command'] = 'python ./tasks/' + args[0] # 換用+拼接,用string.join()不能傳變量
else:
logger.warning('%s格式不正確,不會(huì)定時(shí)跑,其他繼續(xù)遍歷' % args[0])
continue
config['at'] = args[1]
if args[2].isdigit() and args[3].isdigit() and args[4].isdigit():
config['seconds'] = args[2]
config['minutes'] = args[3]
config['hours'] = args[4]
else:
logger.warning('%s時(shí)/分/秒格式不正確,不會(huì)定時(shí)跑,其他繼續(xù)遍歷' % args[0])
continue
if args[5].isdigit() and int(args[5]) in range(0,8):
config['day'] = args[5]
else:
logger.warning('%s星期%s格式不正確,不會(huì)定時(shí)跑,其他繼續(xù)遍歷' % (args[0],args[5]))
continue
jobs.append(config)
except NameError:
logger.error('配置文件變量獲取失敗')
except ValueError:
logger.error('配置定時(shí)參數(shù)類型轉(zhuǎn)換異常')
except:
logger.error('配置定時(shí)參數(shù)異常')
logger.info('配置文件和定時(shí)參數(shù)如下')
logger.info(jobs)
# 獲取各文件名和配置的頻率,抽出來所有不為空的值和它對(duì)應(yīng)的鍵,存到數(shù)組
each = []
try:
for i in jobs:
content = {}
for key, value in i.items():
if value != '0':
content[key] = value
each.append(content)
except NameError:
logger.error('配置定時(shí)參數(shù)不全為0的內(nèi)容獲取失敗')
except:
logger.error('定時(shí)參數(shù)不全為0的獲取異常')
# 拼接command_job語句
com_job = []
try:
for e in each:
key = list(e.keys())
if len(key) != 2:
print('%s所配置的頻率不支持,這個(gè)文件不跑,其他文件正常跑' % e['command']) # 命令和頻率放在同一個(gè)字典,key長度不為2則不添加schedule列表
logger.warning('%s所配置的頻率不支持,這個(gè)文件不跑,其他文件正常跑' % e['command'])
# 判斷條件不能是key[1] == 'at',key每次排序都不一樣
elif 'at' in key:
# schedule.every().day.at("19:13").do(thread,job,"python tasks/4.py")
s = 'schedule.every().day.at(\"%s\").do(thread,sem,job,\"%s\")' % (e['at'], e['command'])
print('每天幾點(diǎn)跑', s)
# com_job.append(s + '.tag("tasks")')
com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
elif 'day' in key:
k = int(e['day']) - 1
s = 'schedule.every().%s.do(thread,sem,job,\"%s\")' % (weekday[k], e['command'])
print('每星期幾跑', s)
com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
elif key[1] or key[0] in ['seconds', 'minutes', 'hours']:
print('每幾秒、分、時(shí)跑', key)
# key[0] key[1]每次跑的結(jié)果不一樣,所有不能以角標(biāo)取固定的值
if 'seconds' in key:
s = 'schedule.every(%s).seconds.do(thread,sem,job,\"%s\")' % (e['seconds'], e['command'])
com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
elif 'minutes' in key:
s = 'schedule.every(%s).minutes.do(thread,sem,job,\"%s\")' % (e['minutes'], e['command'])
com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
elif 'hours' in key:
s = 'schedule.every(%s).hours.do(thread,sem,job,\"%s\")' % (e['hours'], e['command'])
com_job.append('%s.tag("%s")' % (s, e['command'].split('/')[2]))
print('配置的定時(shí)任務(wù)是:', com_job)
except NameError:
logger.error('schedule語句拼接失敗,其中有參數(shù)獲取異常')
except:
logger.error('schedule語句錯(cuò)誤')
return com_job
# 通用方法:命令執(zhí)行py文件
def job(command):
# command 如'python ./tasks/1.py '
subprocess.run(command)
# 開啟多線程 判斷線程沒等待就開啟新線程 子線程發(fā)生的異常打印到log日志
def thread(sem_ar,job_func, command):
if sem_ar.acquire(False):
t = MyThread(sem_ar, job_func, command)
t.start()
else:
logger.warning('%s子線程等待中' % command)
class MyThread(threading.Thread):
def __init__(self,my_sem,func,command):
super().__init__()
self.my_sem = my_sem
self.func = func
self.command = command
def run(self):
try:
self.func(self.command)
except:
logger.error('子線程異常')
self.my_sem.release()
# 控制最大子線程數(shù)
run_job = config_read()
sem = threading.Semaphore(len(run_job)+1)
logger.info('設(shè)置最大子線程數(shù)量%d' % (len(run_job)+1))
print(('設(shè)置最大子線程數(shù)量%d' % (len(run_job)+1)))
# 每個(gè)配置文件的頻率各起線程跑 如'schedule.every(1).seconds.do(thread,sem,job,"python tasks/1.py").tag("1.py")'
for j in range(0, len(run_job)):
logger.info('運(yùn)行定時(shí)任務(wù)%s' % run_job[j])
eval(run_job[j])
#git pull & modify
schedule.every().day.at("20:06").do(thread,sem, modify,"pull_modify")
# #獲取當(dāng)前線程數(shù)量和內(nèi)容
# def get_thread():
# print('當(dāng)前線程', len(threading.enumerate()),threading.enumerate())
# #每五秒獲取一次
# schedule.every(5).seconds.do(get_thread)
try:
while 1:
schedule.run_pending()
except TypeError:
logger.error('schedule傳參異常')
except:
logger.error('schedule運(yùn)行異常')
批量定時(shí)任務(wù)框架,支持動(dòng)態(tài)修改文件和頻率
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。