APScheduler - Advanced Python Scheduler

簡(jiǎn)介

APScheduler:強(qiáng)大的任務(wù)調(diào)度工具,可以完成定時(shí)任務(wù),周期任務(wù)等,它是跨平臺(tái)的,用于取代Linux下的cron daemon或者Windows下的task scheduler。

內(nèi)置三種調(diào)度調(diào)度系統(tǒng):

  • Cron風(fēng)格
  • 間隔性執(zhí)行
  • 僅在某個(gè)時(shí)間執(zhí)行一次

作業(yè)存儲(chǔ)的backends支持:

  • Memory
  • SQLAlchemy (any RDBMS supported by SQLAlchemy works)
  • MongoDB
  • Redis
  • RethinkDB
  • ZooKeeper

基本概念:4個(gè)組件
triggers: 描述一個(gè)任務(wù)何時(shí)被觸發(fā),有按日期、按時(shí)間間隔、按cronjob描述式三種觸發(fā)方式
job stores: 任務(wù)持久化倉(cāng)庫,默認(rèn)保存任務(wù)在內(nèi)存中,也可將任務(wù)保存都各種數(shù)據(jù)庫中,任務(wù)中的數(shù)據(jù)序列化后保存到持久化數(shù)據(jù)庫,從數(shù)據(jù)庫加載后又反序列化。
executors: 執(zhí)行任務(wù)模塊,當(dāng)任務(wù)完成時(shí)executors通知schedulers,schedulers收到后會(huì)發(fā)出一個(gè)適當(dāng)?shù)氖录?br> schedulers: 任務(wù)調(diào)度器,控制器角色,通過它配置job stores和executors,添加、修改和刪除任務(wù)。

插件機(jī)制: 供用戶自由選擇scheduler, job store(s), executor(s) and trigger(s)

scheduler

scheduler的主循環(huán)(main_loop),其實(shí)就是反復(fù)檢查是不是有到時(shí)需要執(zhí)行的任務(wù),完成一次檢查的函數(shù)是_process_jobs, 這個(gè)函數(shù)做這么幾件事:

  • 詢問自己的每一個(gè)jobstore,有沒有到期需要執(zhí)行的任務(wù)(jobstore.get_due_jobs())
  • 如果有,計(jì)算這些job中每個(gè)job需要運(yùn)行的時(shí)間點(diǎn)(run_times = job._get_run_times(now))如果run_times有多個(gè),這種情況我們上面討論過,有coalesce檢查
    提交給executor排期運(yùn)行(executor.submit_job(job, run_times))
  • 那么在這個(gè)_process_jobs的邏輯,什么時(shí)候調(diào)用合適呢?如果不間斷地調(diào)用,而實(shí)際上沒有要執(zhí)行的job,是一種浪費(fèi)。每次掉用_process_jobs后,其實(shí)可以預(yù)先判斷一下,下一次要執(zhí)行的job(離現(xiàn)在最近的)還要多長(zhǎng)時(shí)間,作為返回值告訴main_loop, 這時(shí)主循環(huán)就可以去睡一覺,等大約這么長(zhǎng)時(shí)間后再喚醒,執(zhí)行下一次_process_jobs。這里喚醒的機(jī)制就會(huì)有IO模型的區(qū)別了

scheduler由于IO模型的不同,可以有多種實(shí)現(xiàn),內(nèi)置scheduler供選:

  • BlockingScheduler: scheduler在當(dāng)前進(jìn)程的主線程中運(yùn)行,所以調(diào)用start函數(shù)會(huì)阻塞當(dāng)前線程,不能立即返回。
  • BackgroundScheduler: 放到后臺(tái)線程中運(yùn)行,所以調(diào)用start后主線程不會(huì)阻塞
  • AsyncIOScheduler: 使用asyncio模塊
  • GeventScheduler: 使用gevent作為IO模型,和GeventExecutor配合使用
  • TornadoScheduler: 配合TwistedExecutor,用reactor.callLater完成定時(shí)喚醒
  • TwistedScheduler: 使用tornado的IO模型,用ioloop.add_timeout完成定時(shí)喚醒
  • QtScheduler: 使用QTimer完成定時(shí)喚醒

jobstore

jobstore提供給scheduler一個(gè)序列化jobs的統(tǒng)一抽象,提供對(duì)scheduler中job的增刪改查接口,根據(jù)存儲(chǔ)backend的不同,分以下幾種
內(nèi)置job stores供選:

  • MemoryJobStore:沒有序列化,jobs就存在內(nèi)存里,增刪改查也都是在內(nèi)存中操作
  • SQLAlchemyJobStore:所有sqlalchemy支持的數(shù)據(jù)庫都可以做為backend,增刪改查操作轉(zhuǎn)化為對(duì)應(yīng)backend的sql語句
  • MongoDBJobStore:用mongodb作backend
  • RedisJobStore: 用redis作backend

除了MemoryJobStore外,其他幾種都使用pickle做序列化工具,所以這里要指出一點(diǎn),如果你不是在用內(nèi)存做jobstore,那么必須確保你提供給job的可執(zhí)行函數(shù)必須是可以被全局訪問的,也就是可以通過ref_to_obj反查出來的,否則無法序列化。
使用數(shù)據(jù)庫做jobstore,就會(huì)發(fā)現(xiàn),其實(shí)創(chuàng)建了一張有三個(gè)域的的jobs表,分別是****id, next_run_time, job_state,其中job_state是job對(duì)象pickle序列化后的二進(jìn)制**,而id和next_run_time則是支持job的兩類查詢(按id和按最近運(yùn)行時(shí)間)

executor

aps把任務(wù)最終的執(zhí)行機(jī)制也抽象了出來,可以根據(jù)IO模型選配,不需要講太多,最常用的是threadpool和processpoll兩種(來自concurrent.futures的線程/進(jìn)程池)。

不同類型的executor實(shí)現(xiàn)自己的_do_submit_job,完成一次實(shí)際的任務(wù)實(shí)例執(zhí)行。以線程/進(jìn)程池實(shí)現(xiàn)為例
內(nèi)置executors供選:

  • ProcessPoolExecutor: 多進(jìn)程,可指定進(jìn)程數(shù),當(dāng)工作負(fù)載為CPU密集型操作時(shí)可以考慮使用它來利用多核CPU
  • ThreadPoolExecutor: 多線程,可指定線程數(shù),默認(rèn),可以滿足大多數(shù)用途
  • AsyncIOExecutor
  • DebugExecutor
  • GeventExecutor
  • ProcessPoolExecutor
  • ThreadPoolExecutor
  • TwistedExecutor

trigger

trigger是抽象出了“一個(gè)job是何時(shí)被觸發(fā)”這個(gè)策略,每種trigger實(shí)現(xiàn)自己的get_next_fire_time函數(shù)
aps提供的trigger包括:

  • date:一次性指定日期
  • interval:在某個(gè)時(shí)間范圍內(nèi)間隔多長(zhǎng)時(shí)間執(zhí)行一次
  • cron:和unix crontab格式兼容,最為強(qiáng)大

默認(rèn)配置: 使用MemoryJobStore和ThreadPoolExecutor
優(yōu)點(diǎn):插件化思想和抽象出接口,策略與不同實(shí)現(xiàn)機(jī)制分離

User guide

配置scheduler
官網(wǎng)提供了等價(jià)的三種方法,第一種比較簡(jiǎn)潔明了。

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
from apscheduler.schedulers.background import BackgroundScheduler
# 使用默認(rèn)配置,即MemoryJobStore和ThreadPoolExecutor(10)
scheduler = BackgroundScheduler()

啟動(dòng)調(diào)度器
調(diào)用調(diào)度器的start()方法

添加任務(wù)
兩種方式:

  • 調(diào)用調(diào)度器的add_job()
  • 使用調(diào)度器的scheduled_job()裝飾器: 很簡(jiǎn)潔,推薦這種。

其他的不常用操作如移除任務(wù)、暫停和恢復(fù)任務(wù)、獲取調(diào)度了的任務(wù)列表、修改任務(wù)、關(guān)停調(diào)度器、暫停/恢復(fù)任務(wù)處理等見文檔:http://apscheduler.readthedocs.io/en/latest/userguide.html

限制并發(fā)執(zhí)行的任務(wù)實(shí)例數(shù)量
默認(rèn)同一時(shí)刻只能有一個(gè)實(shí)例運(yùn)行,通過max_instances=3修改為3個(gè)。

錯(cuò)過執(zhí)行的任務(wù)與合并
misfire_grace_time:如果一個(gè)job本來14:00有一次執(zhí)行,但是由于某種原因沒有被調(diào)度上,現(xiàn)在14:01了,這個(gè)14:00的運(yùn)行實(shí)例被提交時(shí),會(huì)檢查它預(yù)訂運(yùn)行的時(shí)間和當(dāng)下時(shí)間的差值(這里是1分鐘),大于我們?cè)O(shè)置的30秒限制,那么這個(gè)運(yùn)行實(shí)例不會(huì)被執(zhí)行。
合并:最常見的情形是scheduler被shutdown后重啟,某個(gè)任務(wù)會(huì)積攢了好幾次沒執(zhí)行如5次,下次這個(gè)job被submit給executor時(shí),執(zhí)行5次。將coalesce=True后,只會(huì)執(zhí)行一次

Scheduler 事件
監(jiān)聽Scheduler發(fā)出的事件并作出處理,如任務(wù)執(zhí)行完、任務(wù)出錯(cuò)等

def my_listener(event):
    if event.exception:
        print('The job crashed :(') # or logger.fatal('The job crashed :(')
    else:
        print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

參考資料:
http://www.cnblogs.com/quijote/p/4385774.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容