背景
APScheduler是一個(gè)非常好用的調(diào)度平臺(tái),不過(guò)目前所有Scheduler的JOB信息都無(wú)法通過(guò)可視化的方式展示,只能通過(guò)后臺(tái)日志來(lái)查看調(diào)度信息,對(duì)于管理上非常不便。
但是APScheduler非常的強(qiáng)大,已經(jīng)預(yù)留的event功能可以幫助來(lái)實(shí)現(xiàn)此功能,對(duì)于APScheduler原理還不太理解的話,可以參考之前的一篇文章Python定時(shí)庫(kù)APScheduler原理及用法
在使用Flask進(jìn)行管理后,通過(guò)Flask-APScheduler插件來(lái)實(shí)現(xiàn)對(duì)APScheduler的管理以及動(dòng)態(tài)增刪JOB的接口實(shí)現(xiàn),以此完成對(duì)APScheduler的全方位管理。
目的
本文的目的主要有兩部分功能塊,第一部分是利用APScheduler的event機(jī)制來(lái)實(shí)現(xiàn)以下兩個(gè)功能并進(jìn)行可視化查看
- 將APScheduler中所有添加的JOB進(jìn)行狀態(tài)跟蹤
- APScheduler中每個(gè)JOB的生命周期進(jìn)行跟蹤
第二部分是在Flask框架上構(gòu)建的管理平臺(tái)上集成Flask-APScheduler插件,完成對(duì)APScheduler的管理以及動(dòng)態(tài)增刪JOB的接口實(shí)現(xiàn)。
實(shí)現(xiàn)
集成Flask-APScheduler插件完成APScheduler的動(dòng)態(tài)管理
將APScheduler集成到Flask中
config_name = os.getenv('FLASK_CONFIG') or 'default'
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
# 初始化Sqlarchemy
db.app = app
db.init_app(app)
# 初始化 flask_apscheduler,將scheduler嵌入到flask管理,本地在flask_apscheduler插件中增加add_listener監(jiān)聽(tīng)所有的job生命周期
flask_apscheduler = CustomAPScheduler(db.session, app=app)
# 啟動(dòng)apscheduler
flask_apscheduler.start()
配置Flask-APScheduler開(kāi)啟對(duì)外接口
class Config:
# apscheduler默認(rèn)的jobstore
SCHEDULER_JOBSTORES = {}
# flask_apscheduler是否對(duì)外提供接口
SCHEDULER_API_ENABLED = True
Flask-APScheduler提供的api如下
def _load_api(self):
"""
Add the routes for the scheduler API.
"""
self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
啟動(dòng)后,通過(guò)提供的接口進(jìn)行動(dòng)態(tài)管理
直接動(dòng)態(tài)調(diào)用接口添加, 具體的參數(shù)需要到apscheduler的源碼進(jìn)行查看
添加JOB舉例說(shuō)明(add_job)
請(qǐng)求添加接口:http://127.0.0.1:5000/scheduler/jobs
請(qǐng)求方法:POST
請(qǐng)求header:
{
"Content-Type": "application/json"
}
請(qǐng)求body:
{
"id": "test_add_job",
"name":"管理平臺(tái)添加job測(cè)試",
"func": "app:jobs.test.test_job", # 這里就是模塊:函數(shù),本地定義的方法保證可以import
"trigger": "date" # 觸發(fā)器為指定時(shí)間,這里時(shí)間沒(méi)有指定,就是立馬執(zhí)行
}
返回結(jié)果:
{
"id": "test_add_job",
"name": "管理平臺(tái)添加job測(cè)試",
"func": "app:jobs.test.test_job",
"args": [],
"kwargs": {},
"trigger": "date",
"run_date": "2021-03-05T15:17:10.107210+08:00",
"misfire_grace_time": 1,
"max_instances": 1,
"next_run_time": "2021-03-05T15:17:10.107210+08:00"
}
充分利用APScheduler的Event機(jī)制
class CustomAPScheduler(APScheduler):
# scheduler事件映射本地狀態(tài)
STATUS_MAPPING = {
EVENT_JOB_ADDED: 0,
EVENT_JOB_MODIFIED: 1,
EVENT_JOB_SUBMITTED: 2,
EVENT_JOB_EXECUTED: 3,
EVENT_JOB_REMOVED: 4,
EVENT_JOB_ERROR: 5,
EVENT_JOB_MISSED: 6,
EVENT_ALL_JOBS_REMOVED: 7,
EVENT_JOB_MAX_INSTANCES: 8
}
def __init__(self, session, scheduler=None, app=None):
super(CustomAPScheduler, self).__init__(scheduler, app)
self.session = session
def listener_all_job(self, event):
"""
監(jiān)控job的生命周期,可視化監(jiān)控,并且可增加后續(xù)的沒(méi)有觸發(fā)任務(wù)等監(jiān)控
添加到線程做處理
:param event:
:return:
"""
job_id = None
args = []
if event.code != EVENT_ALL_JOBS_REMOVED:
job_id = event.job_id
if job_id:
jobstore_alias = event.jobstore
job = self.scheduler.get_job(job_id, jobstore_alias)
if job:
name = job.name
func = str(job.func_ref)
trigger = job.trigger if isinstance(job.trigger, str) else str(job.trigger).split("[")[0]
next_run_time = str(job.next_run_time).split(".")[0]
else:
name = None
func = None
trigger = None
next_run_time = None
args = [name, func, trigger, next_run_time]
traceback = event.traceback if hasattr(event, 'traceback') else "",
args.append(traceback)
t = threading.Thread(target=self.handle_listener_all_job, args=[event.code, job_id, *args])
t.start()
t.join()
def handle_listener_all_job(self, event_type, *args):
"""
實(shí)際處理IO操作
如何處理一個(gè)job_id重復(fù)使用的問(wèn)題,采用本地id自增,如果真有job_id重復(fù)的情況,則認(rèn)為指定的是最后一個(gè)job_id對(duì)應(yīng)的任務(wù)
"""
try:
if event_type == EVENT_JOB_ADDED:
# 添加任務(wù)定義表
job = ApschedulerJobInfo()
job.job_id = args[0]
job.job_name = args[1]
job.job_func = args[2]
job.job_trigger = args[3]
job.job_next_run_time = args[4]
job.job_status = 0
self.session.add(job)
self.session.flush()
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
elif event_type == EVENT_JOB_MODIFIED:
# 修改job[取數(shù)據(jù)庫(kù)表中job_id最后一個(gè)進(jìn)行修改]
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 更新JOB表
job.job_name = args[1]
job.job_func = args[2]
job.job_trigger = args[3]
job.job_next_run_time = args[4]
job.job_status = 0
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
elif event_type == EVENT_JOB_SUBMITTED:
# 提交job執(zhí)行
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
elif event_type == EVENT_JOB_EXECUTED:
# 執(zhí)行job
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 更新JOB表
job.job_status = 1
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
elif event_type == EVENT_JOB_REMOVED:
# 刪除job
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 更新JOB表
job.job_status = 5
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
elif event_type == EVENT_JOB_ERROR:
# 執(zhí)行job出錯(cuò)
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 更新JOB表
job.job_status = 2
job.job_traceback = args[5]
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
elif event_type == EVENT_JOB_MISSED:
# job執(zhí)行錯(cuò)過(guò)
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 更新JOB表
job.job_status = 3
job.job_traceback = args[5]
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
elif event_type == EVENT_ALL_JOBS_REMOVED:
# 刪除所有job
all_jobs = ApschedulerJobInfo.query.filter(ApschedulerJobInfo.job_status == 0).all()
for job in all_jobs:
job.job_status = 6
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
elif event_type == EVENT_JOB_MAX_INSTANCES:
# job超過(guò)最大實(shí)例
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 更新JOB表
job.job_status = 4
job.job_traceback = args[5]
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
except:
LOGGER.exception("執(zhí)行任務(wù)異常")
def init_app(self, app):
super(CustomAPScheduler, self).init_app(app)
# 增加監(jiān)聽(tīng)函數(shù),監(jiān)聽(tīng)所有job的生命周期
self.add_listener(self.listener_all_job,
EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_MAX_INSTANCES | EVENT_ALL_JOBS_REMOVED | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_MODIFIED | EVENT_JOB_EXECUTED | EVENT_JOB_SUBMITTED)
收集完成數(shù)據(jù)后進(jìn)行展示及管理
-
JOB管理
JOB管理 -
JOB事件執(zhí)行明細(xì)
JOB事件執(zhí)行明細(xì)
關(guān)注公眾號(hào)“戰(zhàn)渣渣”,回復(fù)“調(diào)度”獲得源碼

