APScheduler管理及監(jiān)控平臺(tái)

背景

APScheduler是一個(gè)非常好用的調(diào)度平臺(tái),不過(guò)目前所有Scheduler的JOB信息都無(wú)法通過(guò)可視化的方式展示,只能通過(guò)后臺(tái)日志來(lái)查看調(diào)度信息,對(duì)于管理上非常不便。

但是APScheduler非常的強(qiáng)大,已經(jīng)預(yù)留的event功能可以幫助來(lái)實(shí)現(xiàn)此功能,對(duì)于APScheduler原理還不太理解的話,可以參考之前的一篇文章Python定時(shí)庫(kù)APScheduler原理及用法

在使用Flask進(jìn)行管理后,通過(guò)Flask-APScheduler插件來(lái)實(shí)現(xiàn)對(duì)APScheduler的管理以及動(dòng)態(tài)增刪JOB的接口實(shí)現(xiàn),以此完成對(duì)APScheduler的全方位管理。

目的

本文的目的主要有兩部分功能塊,第一部分是利用APScheduler的event機(jī)制來(lái)實(shí)現(xiàn)以下兩個(gè)功能并進(jìn)行可視化查看

  • 將APScheduler中所有添加的JOB進(jìn)行狀態(tài)跟蹤
  • APScheduler中每個(gè)JOB的生命周期進(jìn)行跟蹤

第二部分是在Flask框架上構(gòu)建的管理平臺(tái)上集成Flask-APScheduler插件,完成對(duì)APScheduler的管理以及動(dòng)態(tài)增刪JOB的接口實(shí)現(xiàn)。

實(shí)現(xiàn)

集成Flask-APScheduler插件完成APScheduler的動(dòng)態(tài)管理

將APScheduler集成到Flask中

config_name = os.getenv('FLASK_CONFIG') or 'default'
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
# 初始化Sqlarchemy
db.app = app
db.init_app(app)
# 初始化 flask_apscheduler,將scheduler嵌入到flask管理,本地在flask_apscheduler插件中增加add_listener監(jiān)聽(tīng)所有的job生命周期
flask_apscheduler = CustomAPScheduler(db.session, app=app)
# 啟動(dòng)apscheduler
flask_apscheduler.start()

配置Flask-APScheduler開(kāi)啟對(duì)外接口

class Config:
    # apscheduler默認(rèn)的jobstore
    SCHEDULER_JOBSTORES = {}
    # flask_apscheduler是否對(duì)外提供接口
    SCHEDULER_API_ENABLED = True

Flask-APScheduler提供的api如下

def _load_api(self):
    """
    Add the routes for the scheduler API.
    """
    self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
    self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
    self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
    self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
    self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
    self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
    self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
    self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
    self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')

啟動(dòng)后,通過(guò)提供的接口進(jìn)行動(dòng)態(tài)管理

直接動(dòng)態(tài)調(diào)用接口添加, 具體的參數(shù)需要到apscheduler的源碼進(jìn)行查看

添加JOB舉例說(shuō)明(add_job)

請(qǐng)求添加接口:http://127.0.0.1:5000/scheduler/jobs
請(qǐng)求方法:POST
請(qǐng)求header:
{
    "Content-Type": "application/json"
}
請(qǐng)求body:
{
    "id": "test_add_job",
    "name":"管理平臺(tái)添加job測(cè)試",
    "func": "app:jobs.test.test_job", # 這里就是模塊:函數(shù),本地定義的方法保證可以import
    "trigger": "date" # 觸發(fā)器為指定時(shí)間,這里時(shí)間沒(méi)有指定,就是立馬執(zhí)行
}
返回結(jié)果:
{
    "id": "test_add_job",
    "name": "管理平臺(tái)添加job測(cè)試",
    "func": "app:jobs.test.test_job",
    "args": [],
    "kwargs": {},
    "trigger": "date",
    "run_date": "2021-03-05T15:17:10.107210+08:00",
    "misfire_grace_time": 1,
    "max_instances": 1,
    "next_run_time": "2021-03-05T15:17:10.107210+08:00"
}

充分利用APScheduler的Event機(jī)制

class CustomAPScheduler(APScheduler):
    # scheduler事件映射本地狀態(tài)
    STATUS_MAPPING = {
        EVENT_JOB_ADDED: 0,
        EVENT_JOB_MODIFIED: 1,
        EVENT_JOB_SUBMITTED: 2,
        EVENT_JOB_EXECUTED: 3,
        EVENT_JOB_REMOVED: 4,
        EVENT_JOB_ERROR: 5,
        EVENT_JOB_MISSED: 6,
        EVENT_ALL_JOBS_REMOVED: 7,
        EVENT_JOB_MAX_INSTANCES: 8
    }

    def __init__(self, session, scheduler=None, app=None):
        super(CustomAPScheduler, self).__init__(scheduler, app)
        self.session = session

    def listener_all_job(self, event):
        """
        監(jiān)控job的生命周期,可視化監(jiān)控,并且可增加后續(xù)的沒(méi)有觸發(fā)任務(wù)等監(jiān)控
        添加到線程做處理
        :param event:
        :return:
        """
        job_id = None
        args = []
        if event.code != EVENT_ALL_JOBS_REMOVED:
            job_id = event.job_id
        if job_id:
            jobstore_alias = event.jobstore
            job = self.scheduler.get_job(job_id, jobstore_alias)
            if job:
                name = job.name
                func = str(job.func_ref)
                trigger = job.trigger if isinstance(job.trigger, str) else str(job.trigger).split("[")[0]
                next_run_time = str(job.next_run_time).split(".")[0]
            else:
                name = None
                func = None
                trigger = None
                next_run_time = None
            args = [name, func, trigger, next_run_time]
        traceback = event.traceback if hasattr(event, 'traceback') else "",
        args.append(traceback)
        t = threading.Thread(target=self.handle_listener_all_job, args=[event.code, job_id, *args])
        t.start()
        t.join()

    def handle_listener_all_job(self, event_type, *args):
        """
        實(shí)際處理IO操作
        如何處理一個(gè)job_id重復(fù)使用的問(wèn)題,采用本地id自增,如果真有job_id重復(fù)的情況,則認(rèn)為指定的是最后一個(gè)job_id對(duì)應(yīng)的任務(wù)
        """
        try:
            if event_type == EVENT_JOB_ADDED:
                # 添加任務(wù)定義表
                job = ApschedulerJobInfo()
                job.job_id = args[0]
                job.job_name = args[1]
                job.job_func = args[2]
                job.job_trigger = args[3]
                job.job_next_run_time = args[4]
                job.job_status = 0
                self.session.add(job)
                self.session.flush()
                # 增加任務(wù)事件表
                job_event = ApschedulerJobEventInfo()
                job_event.job_info_id = job.id
                job_event.event = self.STATUS_MAPPING[event_type]
                self.session.add(job_event)
                self.session.commit()
            elif event_type == EVENT_JOB_MODIFIED:
                # 修改job[取數(shù)據(jù)庫(kù)表中job_id最后一個(gè)進(jìn)行修改]
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_name = args[1]
                    job.job_func = args[2]
                    job.job_trigger = args[3]
                    job.job_next_run_time = args[4]
                    job.job_status = 0

                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_SUBMITTED:
                # 提交job執(zhí)行
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_EXECUTED:
                # 執(zhí)行job
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 1

                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_REMOVED:
                # 刪除job
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 5

                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_ERROR:
                # 執(zhí)行job出錯(cuò)
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 2
                    job.job_traceback = args[5]
                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_MISSED:
                # job執(zhí)行錯(cuò)過(guò)
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 3
                    job.job_traceback = args[5]
                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_ALL_JOBS_REMOVED:
                # 刪除所有job
                all_jobs = ApschedulerJobInfo.query.filter(ApschedulerJobInfo.job_status == 0).all()
                for job in all_jobs:
                    job.job_status = 6
                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
            elif event_type == EVENT_JOB_MAX_INSTANCES:
                # job超過(guò)最大實(shí)例
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 4
                    job.job_traceback = args[5]
                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
        except:
            LOGGER.exception("執(zhí)行任務(wù)異常")

    def init_app(self, app):
        super(CustomAPScheduler, self).init_app(app)

        # 增加監(jiān)聽(tīng)函數(shù),監(jiān)聽(tīng)所有job的生命周期
        self.add_listener(self.listener_all_job,
                          EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_MAX_INSTANCES | EVENT_ALL_JOBS_REMOVED | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_MODIFIED | EVENT_JOB_EXECUTED | EVENT_JOB_SUBMITTED)

收集完成數(shù)據(jù)后進(jìn)行展示及管理

  • JOB管理


    JOB管理
  • JOB事件執(zhí)行明細(xì)


    JOB事件執(zhí)行明細(xì)

關(guān)注公眾號(hào)“戰(zhàn)渣渣”,回復(fù)“調(diào)度”獲得源碼

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容