APScheduler使用文檔

安裝 APScheduler

$ pip3 install apscheduler

快速開始

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

@scheduler.scheduled_job('cron', hour='8-23')
def request_update_status():
    print('Doing job')

scheduler.start()

基本概念

APScheduler四大組件:

觸發(fā)器 triggers :用于設定觸發(fā)任務的條件
任務儲存器 job stores:用于存放任務,把任務存放在內存或數(shù)據(jù)庫中
執(zhí)行器 executors: 用于執(zhí)行任務,可以設定執(zhí)行模式為單線程或線程池
調度器 schedulers: 把上方三個組件作為參數(shù),通過創(chuàng)建調度器實例來運行

觸發(fā)器

每一個任務都有自己的觸發(fā)器,觸發(fā)器用于決定任務下次運行的時間。

任務儲存器

默認情況下,任務存放在內存中。也可以配置存放在不同類型的數(shù)據(jù)庫中。如果任務存放在數(shù)據(jù)庫中,那么任務的存取有一個序列化和反序列化的過程,同時修改和搜索任務的功能也是由任務儲存器實現(xiàn)。

注!一個任務儲存器不要共享給多個調度器,否則會導致狀態(tài)混亂

執(zhí)行器

任務會被執(zhí)行器放入線程池或進程池去執(zhí)行,執(zhí)行完畢后,執(zhí)行器會通知調度器。

調度器

一個調度器由上方三個組件構成,一般來說,一個程序只要有一個調度器就可以了。開發(fā)者也不必直接操作任務儲存器、執(zhí)行器以及觸發(fā)器,因為調度器提供了統(tǒng)一的接口,通過調度器就可以操作組件,比如任務的增刪改查。

調度器組件詳解

根據(jù)開發(fā)需求選擇相應的組件,下面是不同的調度器組件:

BlockingScheduler 阻塞式調度器:適用于只跑調度器的程序。
BackgroundScheduler 后臺調度器:適用于非阻塞的情況,調度器會在后臺獨立運行。
AsyncIOScheduler AsyncIO調度器,適用于應用使用AsnycIO的情況。
GeventScheduler Gevent調度器,適用于應用通過Gevent的情況。
TornadoScheduler Tornado調度器,適用于構建Tornado應用。
TwistedScheduler Twisted調度器,適用于構建Twisted應用。
QtScheduler Qt調度器,適用于構建Qt應用。
任務儲存器的選擇,要看任務是否需要持久化。如果你運行的任務是無狀態(tài)的,選擇默認任務儲存器MemoryJobStore就可以應付。但是,如果你需要在程序關閉或重啟時,保存任務的狀態(tài),那么就要選擇持久化的任務儲存器。如果,作者推薦使用SQLAlchemyJobStore并搭配PostgreSQL作為后臺數(shù)據(jù)庫。這個方案可以提供強大的數(shù)據(jù)整合與保護功能。

執(zhí)行器的選擇,同樣要看你的實際需求。默認的ThreadPoolExecutor線程池執(zhí)行器方案可以滿足大部分需求。如果,你的程序是計算密集型的,那么最好用ProcessPoolExecutor進程池執(zhí)行器方案來充分利用多核算力。也可以將ProcessPoolExecutor作為第二執(zhí)行器,混合使用兩種不同的執(zhí)行器。

配置一個任務,就要設置一個任務觸發(fā)器。觸發(fā)器可以設定任務運行的周期、次數(shù)和時間。APScheduler有三種內置的觸發(fā)器:

date 日期:觸發(fā)任務運行的具體日期
interval 間隔:觸發(fā)任務運行的時間間隔
cron 周期:觸發(fā)任務運行的周期
一個任務也可以設定多種觸發(fā)器,比如,可以設定同時滿足所有觸發(fā)器條件而觸發(fā),或者滿足一項即觸發(fā)。復合觸發(fā)器,請查閱一下文檔:鏈接

觸發(fā)器詳解

date 在指定時間點觸發(fā)任務

from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

def my_job(text):
    print(text)

# 在2009年11月6日執(zhí)行
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])

sched.start()

其中run_date參數(shù)可以是date類型、datetime類型或文本類型。

datetime類型(用于精確時間)

# 在2009年11月6日 16:30:05執(zhí)行
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

文本類型

sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text'])

未指定時間,則會立即執(zhí)行

# 未顯式指定,那么則立即執(zhí)行
sched.add_job(my_job, args=['text'])

interval 周期觸發(fā)任務

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def job_function():
    print("Hello World")

sched = BlockingScheduler()

# 每2小時觸發(fā)
sched.add_job(job_function, 'interval', hours=2)

sched.start()

你可以框定周期開始時間start_date和結束時間end_date。

# 周期觸發(fā)的時間范圍在2010-10-10 9:30 至 2014-06-15 11:00
sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')

也可以通過scheduled_job()裝飾器實現(xiàn)

from apscheduler.scheduler import BlockingScheduler


@sched.scheduled_job('interval', id='my_job_id', hours=2)
def job_function():
    print("Hello World")

jitter振動參數(shù),給每次觸發(fā)添加一個隨機浮動秒數(shù),一般適用于多服務器,避免同時運行造成服務擁堵。

# 每小時(上下浮動120秒區(qū)間內)運行`job_function`
sched.add_job(job_function, 'interval', hours=1, jitter=120)

cron 強大的類crontab表達式

# 注意參數(shù)順序
class apscheduler.triggers.cron.CronTrigger(
year=None, 
month=None, 
day=None, 
week=None, 
day_of_week=None, 
hour=None, 
minute=None,
second=None, 
start_date=None, 
end_date=None, 
timezone=None, 
jitter=None)

當省略時間參數(shù)時,在顯式指定參數(shù)之前的參數(shù)會被設定為,之后的參數(shù)會被設定為最小值,week 和day_of_week的最小值為。比如,設定day=1, minute=20等同于設定year='', month='', day=1, week='', day_of_week='', hour='*', minute=20, second=0,即每個月的第一天,且當分鐘到達20時就觸發(fā)。

表達式類型

表達式 參數(shù)類型 描述
* 所有 通配符。例:minutes=*即每分鐘觸發(fā)
*/a 所有 可被a整除的通配符。
a-b 所有 范圍a-b觸發(fā)
a-b/c 所有 范圍a-b,且可被c整除時觸發(fā)
xth y 第幾個星期幾觸發(fā)。x為第幾個,y為星期幾
last x 一個月中,最后個星期幾觸發(fā)
last 一個月最后一天觸發(fā)
x,y,z 所有 組合表達式,可以組合確定值或上方的表達式

注!month和day_of_week參數(shù)分別接受的是英語縮寫jan– dec 和 mon – sun

from apscheduler.schedulers.blocking import BlockingScheduler

def job_function():
    print "Hello World"

sched = BlockingScheduler()

# 任務會在6月、7月、8月、11月和12月的第三個周五,00:00、01:00、02:00和03:00觸發(fā)
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

sched.start()

start_date 和 end_date 可以用來適用時間范圍

# 在2014-05-30 00:00:00前,每周一到每周五 5:30運行
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

通過 scheduled_job() 裝飾器實現(xiàn):

@sched.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")

使用標準crontab表達式:

sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))

也可以添加jitter振動參數(shù)

# 每小時上下浮動120秒觸發(fā)
sched.add_job(job_function, 'cron', hour='*', jitter=120)

夏令時問題
有些timezone時區(qū)可能會有夏令時的問題。這個可能導致令時切換時,任務不執(zhí)行或任務執(zhí)行兩次。避免這個問題,可以使用UTC時間,或提前預知并規(guī)劃好執(zhí)行的問題。

# 在Europe/Helsinki時區(qū), 在三月最后一個周一就不會觸發(fā);在十月最后一個周一會觸發(fā)兩次
sched.add_job(job_function, 'cron', hour=3, minute=30)

配置調度器

APScheduler 有多種不同的配置方法,你可以選擇直接傳字典或傳參的方式創(chuàng)建調度器;也可以先實例一個調度器對象,再添加配置信息。靈活的配置方式可以滿足各種應用場景的需要。

整套的配置選項可以參考API文檔BaseScheduler類。一些調度器子類可能有它們自己特有的配置選項,以及獨立的任務儲存器和執(zhí)行器也可能有自己特有的配置選項,可以查閱API文檔了解。

下面舉一個例子,創(chuàng)建一個使用默認任務儲存器和執(zhí)行器的BackgroundScheduler:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

# 因為是非阻塞的后臺調度器,所以程序會繼續(xù)向下執(zhí)行

這樣就可以創(chuàng)建了一個后臺調度器。這個調度器有一個名稱為default的MemoryJobStore(內存任務儲存器)和一個名稱是default且最大線程是10的ThreadPoolExecutor(線程池執(zhí)行器)。

假如你現(xiàn)在有這樣的需求,兩個任務儲存器分別搭配兩個執(zhí)行器;同時,還要修改任務的默認參數(shù);最后還要改時區(qū)。可以參考下面例子,它們是完全等價的。

名稱為“mongo”的MongoDBJobStore
名稱為“default”的SQLAlchemyJobStore
名稱為“ThreadPoolExecutor ”的ThreadPoolExecutor,最大線程20個
名稱“processpool”的ProcessPoolExecutor,最大進程5個
UTC時間作為調度器的時區(qū)
默認為新任務關閉合并模式()
設置新任務的默認最大實例數(shù)為3
方法一:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方法二:

from apscheduler.schedulers.background import BackgroundScheduler


# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
    'apscheduler.jobstores.mongo': {
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

方法三:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()

# ..這里可以添加任務

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

啟動調度器

啟動調度器是只需調用start()即可。除了BlockingScheduler,非阻塞調度器都會立即返回,可以繼續(xù)運行之后的代碼,比如添加任務等。

對于BlockingScheduler,程序則會阻塞在start()位置,所以,要運行的代碼必須寫在start()之前。

注!調度器啟動后,就不能修改配置了。

添加任務
添加任務的方法有兩種:

通過調用add_job()
通過裝飾器scheduled_job()
第一種方法是最常用的;第二種方法是最方便的,但缺點就是運行時,不能修改任務。第一種add_job()方法會返回一個apscheduler.job.Job實例,這樣就可以在運行時,修改或刪除任務。

在任何時候你都能配置任務。但是如果調度器還沒有啟動,此時添加任務,那么任務就處于一個暫存的狀態(tài)。只有當調度器啟動時,才會開始計算下次運行時間。

還有一點要注意,如果你的執(zhí)行器或任務儲存器是會序列化任務的,那么這些任務就必須符合:

回調函數(shù)必須全局可用
回調函數(shù)參數(shù)必須也是可以被序列化的
內置任務儲存器中,只有MemoryJobStore不會序列化任務;內置執(zhí)行器中,只有ProcessPoolExecutor會序列化任務。

重要提醒!
如果在程序初始化時,是從數(shù)據(jù)庫讀取任務的,那么必須為每個任務定義一個明確的ID,并且使用replace_existing=True,否則每次重啟程序,你都會得到一份新的任務拷貝,也就意味著任務的狀態(tài)不會保存。

建議
如果想要立刻運行任務,可以在添加任務時省略trigger參數(shù)

移除任務

如果想從調度器移除一個任務,那么你就要從相應的任務儲存器中移除它,這樣才算移除了。有兩種方式:

調用remove_job(),參數(shù)為:任務ID,任務儲存器名稱
在通過add_job()創(chuàng)建的任務實例上調用remove()方法
第二種方式更方便,但前提必須在創(chuàng)建任務實例時,實例被保存在變量中。對于通過scheduled_job()創(chuàng)建的任務,只能選擇第一種方式。

當任務調度結束時(比如,某個任務的觸發(fā)器不再產生下次運行的時間),任務就會自動移除。

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

同樣,通過任務的具體ID:

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

暫停和恢復任務

通過任務實例或調度器,就能暫停和恢復任務。如果一個任務被暫停了,那么該任務的下一次運行時間就會被移除。在恢復任務前,運行次數(shù)計數(shù)也不會被統(tǒng)計。

暫停任務,有以下兩個方法:

  • apscheduler.job.Job.pause()

  • apscheduler.schedulers.base.BaseScheduler.pause_job()
    恢復任務,

  • apscheduler.job.Job.resume()

  • apscheduler.schedulers.base.BaseScheduler.resume_job()

獲取任務列表

通過get_jobs()就可以獲得一個可修改的任務列表。get_jobs()第二個參數(shù)可以指定任務儲存器名稱,那么就會獲得對應任務儲存器的任務列表。

print_jobs()可以快速打印格式化的任務列表,包含觸發(fā)器,下次運行時間等信息。

修改任務
通過apscheduler.job.Job.modify()或modify_job(),你可以修改任務當中除了id的任何屬性。

比如:

job.modify(max_instances=6, name='Alternate name')

如果想要重新調度任務(就是改變觸發(fā)器),你能通過apscheduler.job.Job.reschedule()或reschedule_job()來實現(xiàn)。這些方法會重新創(chuàng)建觸發(fā)器,并重新計算下次運行時間。

比如:

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

關閉調度器

關閉方法如下:

scheduler.shutdown()

默認情況下,調度器會先把正在執(zhí)行的任務處理完,再關閉任務儲存器和執(zhí)行器。但是,如果你就直接關閉,你可以添加參數(shù):

scheduler.shutdown(wait=False)

上述方法不管有沒有任務在執(zhí)行,會強制關閉調度器。

暫停、恢復任務進程

調度器可以暫停正在執(zhí)行的任務:

scheduler.pause()

也可以恢復任務:

scheduler.resume()

同時,也可以在調度器啟動時,默認所有任務設為暫停狀態(tài)。

scheduler.start(paused=True)

限制任務執(zhí)行的實例并行數(shù)

默認情況下,在同一時間,一個任務只允許一個執(zhí)行中的實例在運行。比如說,一個任務是每5秒執(zhí)行一次,但是這個任務在第一次執(zhí)行的時候花了6秒,也就是說前一次任務還沒執(zhí)行完,后一次任務又觸發(fā)了,由于默認一次只允許一個實例執(zhí)行,所以第二次就丟失了。為了杜絕這種情況,可以在添加任務時,設置max_instances參數(shù),為指定任務設置最大實例并行數(shù)。

丟失任務的執(zhí)行與合并

有時,任務會由于一些問題沒有被執(zhí)行。最常見的情況就是,在數(shù)據(jù)庫里的任務到了該執(zhí)行的時間,但調度器被關閉了,那么這個任務就成了“啞彈任務”。錯過執(zhí)行時間后,調度器才打開了。這時,調度器會檢查每個任務的misfire_grace_time參數(shù)int值,即啞彈上限,來確定是否還執(zhí)行啞彈任務(這個參數(shù)可以全局設定的或者是為每個任務單獨設定)。此時,一個啞彈任務,就可能會被連續(xù)執(zhí)行多次。

但這就可能導致一個問題,有些啞彈任務實際上并不需要被執(zhí)行多次。coalescing合并參數(shù)就能把一個多次的啞彈任務揉成一個一次的啞彈任務。也就是說,coalescing為True能把多個排隊執(zhí)行的同一個啞彈任務,變成一個,而不會觸發(fā)啞彈事件。

注!如果是由于線程池/進程池滿了導致的任務延遲,執(zhí)行器就會跳過執(zhí)行。要避免這個問題,可以添加進程或線程數(shù)來實現(xiàn)或把 misfire_grace_time值調高。

調度器事件

調度器允許添加事件偵聽器。部分事件會有特有的信息,比如當前運行次數(shù)等。add_listener(callback,mask)中,第一個參數(shù)是回調對象,mask是指定偵聽事件類型,mask參數(shù)也可以是邏輯組合。回調對象會有一個參數(shù)就是觸發(fā)的事件。

具體可以查看文檔中events模塊,里面有關于事件類型以及事件參數(shù)的詳細說明。

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

# 當任務執(zhí)行完或任務出錯時,調用my_listener
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

事件類型

Constant Description Event class
EVENT_SCHEDULER_STARTED The scheduler was started SchedulerEvent
EVENT_SCHEDULER_SHUTDOWN The scheduler was shut down SchedulerEvent
EVENT_SCHEDULER_PAUSED Job processing in the scheduler was paused SchedulerEvent
EVENT_SCHEDULER_RESUMED Job processing in the scheduler was resumed SchedulerEvent
EVENT_EXECUTOR_ADDED An executor was added to the scheduler SchedulerEvent
EVENT_EXECUTOR_REMOVED An executor was removed to the scheduler SchedulerEvent
EVENT_JOBSTORE_ADDED A job store was added to the scheduler SchedulerEvent
EVENT_JOBSTORE_REMOVED A job store was removed from the scheduler SchedulerEvent
EVENT_ALL_JOBS_REMOVED All jobs were removed from either all job stores or one particular job store SchedulerEvent
EVENT_JOB_ADDED A job was added to a job store JobEvent
EVENT_JOB_REMOVED A job was removed from a job store JobEvent
EVENT_JOB_MODIFIED A job was modified from outside the scheduler JobEvent
EVENT_JOB_SUBMITTED A job was submitted to its executor to be run JobSubmissionEvent
EVENT_JOB_MAX_INSTANCES A job being submitted to its executor was not accepted by the executor because the job has already reached its maximum concurrently executing instances JobSubmissionEvent
EVENT_JOB_EXECUTED A job was executed successfully JobExecutionEvent
EVENT_JOB_ERROR A job raised an exception during execution JobExecutionEvent
EVENT_JOB_MISSED A job’s execution was missed JobExecutionEvent
EVENT_ALL A catch-all mask that includes every event type N/A

異常捕獲

通過logging模塊,可以添加apscheduler日志至DEBUG級別,這樣就能捕獲異常信息。

關于logging初始化的方式如下:

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

日志會提供很多調度器的內部運行信息。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容