簡(jiǎn)介
APScheduler:強(qiáng)大的任務(wù)調(diào)度工具,可以完成定時(shí)任務(wù),周期任務(wù)等,它是跨平臺(tái)的,用于取代Linux下的cron daemon或者Windows下的task scheduler。
內(nèi)置三種調(diào)度調(diào)度系統(tǒng):
- Cron風(fēng)格
- 間隔性執(zhí)行
- 僅在某個(gè)時(shí)間執(zhí)行一次
作業(yè)存儲(chǔ)的backends支持:
- Memory
- SQLAlchemy (any RDBMS supported by SQLAlchemy works)
- MongoDB
- Redis
- RethinkDB
- ZooKeeper
基本概念:4個(gè)組件
triggers: 描述一個(gè)任務(wù)何時(shí)被觸發(fā),有按日期、按時(shí)間間隔、按cronjob描述式三種觸發(fā)方式
job stores: 任務(wù)持久化倉(cāng)庫,默認(rèn)保存任務(wù)在內(nèi)存中,也可將任務(wù)保存都各種數(shù)據(jù)庫中,任務(wù)中的數(shù)據(jù)序列化后保存到持久化數(shù)據(jù)庫,從數(shù)據(jù)庫加載后又反序列化。
executors: 執(zhí)行任務(wù)模塊,當(dāng)任務(wù)完成時(shí)executors通知schedulers,schedulers收到后會(huì)發(fā)出一個(gè)適當(dāng)?shù)氖录?br>
schedulers: 任務(wù)調(diào)度器,控制器角色,通過它配置job stores和executors,添加、修改和刪除任務(wù)。
插件機(jī)制: 供用戶自由選擇scheduler, job store(s), executor(s) and trigger(s)
scheduler
scheduler的主循環(huán)(main_loop),其實(shí)就是反復(fù)檢查是不是有到時(shí)需要執(zhí)行的任務(wù),完成一次檢查的函數(shù)是_process_jobs, 這個(gè)函數(shù)做這么幾件事:
- 詢問自己的每一個(gè)jobstore,有沒有到期需要執(zhí)行的任務(wù)(jobstore.get_due_jobs())
- 如果有,計(jì)算這些job中每個(gè)job需要運(yùn)行的時(shí)間點(diǎn)(run_times = job._get_run_times(now))如果run_times有多個(gè),這種情況我們上面討論過,有coalesce檢查
提交給executor排期運(yùn)行(executor.submit_job(job, run_times)) - 那么在這個(gè)_process_jobs的邏輯,什么時(shí)候調(diào)用合適呢?如果不間斷地調(diào)用,而實(shí)際上沒有要執(zhí)行的job,是一種浪費(fèi)。每次掉用_process_jobs后,其實(shí)可以預(yù)先判斷一下,下一次要執(zhí)行的job(離現(xiàn)在最近的)還要多長(zhǎng)時(shí)間,作為返回值告訴main_loop, 這時(shí)主循環(huán)就可以去睡一覺,等大約這么長(zhǎng)時(shí)間后再喚醒,執(zhí)行下一次_process_jobs。這里喚醒的機(jī)制就會(huì)有IO模型的區(qū)別了
scheduler由于IO模型的不同,可以有多種實(shí)現(xiàn),內(nèi)置scheduler供選:
-
BlockingScheduler: scheduler在當(dāng)前進(jìn)程的主線程中運(yùn)行,所以調(diào)用start函數(shù)會(huì)阻塞當(dāng)前線程,不能立即返回。 -
BackgroundScheduler: 放到后臺(tái)線程中運(yùn)行,所以調(diào)用start后主線程不會(huì)阻塞 -
AsyncIOScheduler: 使用asyncio模塊 -
GeventScheduler: 使用gevent作為IO模型,和GeventExecutor配合使用 -
TornadoScheduler: 配合TwistedExecutor,用reactor.callLater完成定時(shí)喚醒 -
TwistedScheduler: 使用tornado的IO模型,用ioloop.add_timeout完成定時(shí)喚醒 -
QtScheduler: 使用QTimer完成定時(shí)喚醒
jobstore
jobstore提供給scheduler一個(gè)序列化jobs的統(tǒng)一抽象,提供對(duì)scheduler中job的增刪改查接口,根據(jù)存儲(chǔ)backend的不同,分以下幾種
內(nèi)置job stores供選:
-
MemoryJobStore:沒有序列化,jobs就存在內(nèi)存里,增刪改查也都是在內(nèi)存中操作 -
SQLAlchemyJobStore:所有sqlalchemy支持的數(shù)據(jù)庫都可以做為backend,增刪改查操作轉(zhuǎn)化為對(duì)應(yīng)backend的sql語句 -
MongoDBJobStore:用mongodb作backend -
RedisJobStore: 用redis作backend
除了MemoryJobStore外,其他幾種都使用pickle做序列化工具,所以這里要指出一點(diǎn),如果你不是在用內(nèi)存做jobstore,那么必須確保你提供給job的可執(zhí)行函數(shù)必須是可以被全局訪問的,也就是可以通過ref_to_obj反查出來的,否則無法序列化。
使用數(shù)據(jù)庫做jobstore,就會(huì)發(fā)現(xiàn),其實(shí)創(chuàng)建了一張有三個(gè)域的的jobs表,分別是****id, next_run_time, job_state,其中job_state是job對(duì)象pickle序列化后的二進(jìn)制**,而id和next_run_time則是支持job的兩類查詢(按id和按最近運(yùn)行時(shí)間)
executor
aps把任務(wù)最終的執(zhí)行機(jī)制也抽象了出來,可以根據(jù)IO模型選配,不需要講太多,最常用的是threadpool和processpoll兩種(來自concurrent.futures的線程/進(jìn)程池)。
不同類型的executor實(shí)現(xiàn)自己的_do_submit_job,完成一次實(shí)際的任務(wù)實(shí)例執(zhí)行。以線程/進(jìn)程池實(shí)現(xiàn)為例
內(nèi)置executors供選:
-
ProcessPoolExecutor: 多進(jìn)程,可指定進(jìn)程數(shù),當(dāng)工作負(fù)載為CPU密集型操作時(shí)可以考慮使用它來利用多核CPU -
ThreadPoolExecutor: 多線程,可指定線程數(shù),默認(rèn),可以滿足大多數(shù)用途 AsyncIOExecutorDebugExecutorGeventExecutorProcessPoolExecutorThreadPoolExecutorTwistedExecutor
trigger
trigger是抽象出了“一個(gè)job是何時(shí)被觸發(fā)”這個(gè)策略,每種trigger實(shí)現(xiàn)自己的get_next_fire_time函數(shù)
aps提供的trigger包括:
-
date:一次性指定日期 -
interval:在某個(gè)時(shí)間范圍內(nèi)間隔多長(zhǎng)時(shí)間執(zhí)行一次 -
cron:和unix crontab格式兼容,最為強(qiáng)大
默認(rèn)配置: 使用MemoryJobStore和ThreadPoolExecutor
優(yōu)點(diǎn):插件化思想和抽象出接口,策略與不同實(shí)現(xiàn)機(jī)制分離
User guide
配置scheduler
官網(wǎng)提供了等價(jià)的三種方法,第一種比較簡(jiǎn)潔明了。
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
from apscheduler.schedulers.background import BackgroundScheduler
# 使用默認(rèn)配置,即MemoryJobStore和ThreadPoolExecutor(10)
scheduler = BackgroundScheduler()
啟動(dòng)調(diào)度器
調(diào)用調(diào)度器的start()方法
添加任務(wù)
兩種方式:
- 調(diào)用調(diào)度器的
add_job() - 使用調(diào)度器的
scheduled_job()裝飾器: 很簡(jiǎn)潔,推薦這種。
其他的不常用操作如移除任務(wù)、暫停和恢復(fù)任務(wù)、獲取調(diào)度了的任務(wù)列表、修改任務(wù)、關(guān)停調(diào)度器、暫停/恢復(fù)任務(wù)處理等見文檔:http://apscheduler.readthedocs.io/en/latest/userguide.html
限制并發(fā)執(zhí)行的任務(wù)實(shí)例數(shù)量
默認(rèn)同一時(shí)刻只能有一個(gè)實(shí)例運(yùn)行,通過max_instances=3修改為3個(gè)。
錯(cuò)過執(zhí)行的任務(wù)與合并
misfire_grace_time:如果一個(gè)job本來14:00有一次執(zhí)行,但是由于某種原因沒有被調(diào)度上,現(xiàn)在14:01了,這個(gè)14:00的運(yùn)行實(shí)例被提交時(shí),會(huì)檢查它預(yù)訂運(yùn)行的時(shí)間和當(dāng)下時(shí)間的差值(這里是1分鐘),大于我們?cè)O(shè)置的30秒限制,那么這個(gè)運(yùn)行實(shí)例不會(huì)被執(zhí)行。
合并:最常見的情形是scheduler被shutdown后重啟,某個(gè)任務(wù)會(huì)積攢了好幾次沒執(zhí)行如5次,下次這個(gè)job被submit給executor時(shí),執(zhí)行5次。將coalesce=True后,只會(huì)執(zhí)行一次
Scheduler 事件
監(jiān)聽Scheduler發(fā)出的事件并作出處理,如任務(wù)執(zhí)行完、任務(wù)出錯(cuò)等
def my_listener(event):
if event.exception:
print('The job crashed :(') # or logger.fatal('The job crashed :(')
else:
print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)