Python使用APScheduler實(shí)現(xiàn)定時(shí)任務(wù)

簡(jiǎn)介:APScheduler是基于Quartz的一個(gè)Python定時(shí)任務(wù)框架。提供了基于日期、固定時(shí)間間隔以及crontab類型的任務(wù),并且可以持久化任務(wù)。
在線文檔:https://apscheduler.readthedocs.io/en/latest/userguide.html

一、安裝APScheduler

pip install apscheduler

二、基本概念介紹

觸發(fā)器(triggers):

觸發(fā)器包含調(diào)度邏輯,描述一個(gè)任務(wù)何時(shí)被觸發(fā),按日期或按時(shí)間間隔或按 cronjob 表達(dá)式三種方式觸發(fā)。每個(gè)作業(yè)都有它自己的觸發(fā)器,除了初始配置之外,觸發(fā)器是完全無狀態(tài)的。
三種內(nèi)建的trigger:
(1)date: 特定的時(shí)間點(diǎn)觸發(fā)
(2)interval: 固定時(shí)間間隔觸發(fā)
(3)cron: 在特定時(shí)間周期性地觸發(fā)

作業(yè)存儲(chǔ)器(job stores):

作業(yè)存儲(chǔ)器指定了作業(yè)被存放的位置,默認(rèn)情況下作業(yè)保存在內(nèi)存,也可將作業(yè)保存在各種數(shù)據(jù)庫中,當(dāng)作業(yè)被存放在數(shù)據(jù)庫中時(shí),它會(huì)被序列化,當(dāng)被重新加載時(shí)會(huì)反序列化。作業(yè)存儲(chǔ)器充當(dāng)保存、加載、更新和查找作業(yè)的中間商。在調(diào)度器之間不能共享作業(yè)存儲(chǔ)。

執(zhí)行器(executors):

執(zhí)行器是將指定的作業(yè)(調(diào)用函數(shù))提交到線程池或進(jìn)程池中運(yùn)行,當(dāng)任務(wù)完成時(shí),執(zhí)行器通知調(diào)度器觸發(fā)相應(yīng)的事件。

調(diào)度器(schedulers):

任務(wù)調(diào)度器,屬于控制角色,通過它配置作業(yè)存儲(chǔ)器、執(zhí)行器和觸發(fā)器,添加、修改和刪除任務(wù)。調(diào)度器協(xié)調(diào)觸發(fā)器、作業(yè)存儲(chǔ)器、執(zhí)行器的運(yùn)行,通常只有一個(gè)調(diào)度程序運(yùn)行在應(yīng)用程序中,開發(fā)人員通常不需要直接處理作業(yè)存儲(chǔ)器、執(zhí)行器或觸發(fā)器,配置作業(yè)存儲(chǔ)器和執(zhí)行器是通過調(diào)度器來完成的。
根據(jù)開發(fā)需求選擇相應(yīng)的組件,下面是不同的調(diào)度器組件:
BlockingScheduler 阻塞式調(diào)度器:適用于只跑調(diào)度器的程序。
BackgroundScheduler 后臺(tái)調(diào)度器:適用于非阻塞的情況,調(diào)度器會(huì)在后臺(tái)獨(dú)立運(yùn)行。
AsyncIOScheduler AsyncIO調(diào)度器,適用于應(yīng)用使用AsnycIO的情況。
GeventScheduler Gevent調(diào)度器,適用于應(yīng)用通過Gevent的情況。
TornadoScheduler Tornado調(diào)度器,適用于構(gòu)建Tornado應(yīng)用。
TwistedScheduler Twisted調(diào)度器,適用于構(gòu)建Twisted應(yīng)用。
QtScheduler Qt調(diào)度器,適用于構(gòu)建Qt應(yīng)用。

三、使用步驟

1、新建一個(gè)調(diào)度器schedulers
2、添加調(diào)度任務(wù)
3、運(yùn)行調(diào)度任務(wù)

四、使用實(shí)例

1、觸發(fā)器date

特定的時(shí)間點(diǎn)觸發(fā),只執(zhí)行一次。參數(shù)如下:


image.png
from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):    
    print(text)

scheduler = BlockingScheduler()
# 在 2020-8-30 運(yùn)行一次 job 方法
scheduler.add_job(job, 'date', run_date=date(2020, 8, 30), args=['text1'])
# 在 2020-8-30 01:00:00 運(yùn)行一次 job 方法
scheduler.add_job(job, 'date', run_date=datetime(2020, 8, 30, 1, 0, 0), args=['text2'])
# 在 2020-8-30 01:00:01 運(yùn)行一次 job 方法
scheduler.add_job(job, 'date', run_date='2020-8-30 01:00:00', args=['text3'])

scheduler.start()

2、觸發(fā)器interval

固定時(shí)間間隔觸發(fā)。參數(shù)如下


image.png
import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('{} --- {}'.format(text, t))

scheduler = BlockingScheduler()
# 每隔 1分鐘 運(yùn)行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, args=['job1'])
# 在 2020-08-28 15:43:00至2020-08-28 15:50:00期間,每隔1分30秒 運(yùn)行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, seconds = 30, start_date='2020-08-28 15:43:00', end_date='2020-08-28 15:50:00', args=['job2'])

scheduler.start()

'''
運(yùn)行結(jié)果:
job2 --- 2020-08-28 15:43:00
job1 --- 2020-08-28 15:43:19
job1 --- 2020-08-28 15:44:19
job2 --- 2020-08-28 15:44:30
job1 --- 2020-08-28 15:45:19
job2 --- 2020-08-28 15:46:00
job1 --- 2020-08-28 15:46:19
job1 --- 2020-08-28 15:47:19
job2 --- 2020-08-28 15:47:30
...余下省略...
'''

3、觸發(fā)器cron

在特定時(shí)間周期性地觸發(fā)。參數(shù)如下:
year (int 或 str):年,4位數(shù)字
month (int 或 str):月 (范圍1-12)
day (int 或 str):日 (范圍1-31)
week (int 或 str):周 (范圍1-53)
day_of_week (int 或 str):周內(nèi)第幾天或者星期幾 (范圍0-6 或者mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str):時(shí) (范圍0-23)
minute (int 或 str):分 (范圍0-59)
second (int 或 str):秒 (范圍0-59)
start_date (datetime 或 str):最早開始日期(包含)
end_date (datetime 或 str):最晚結(jié)束時(shí)間(包含)
timezone (datetime.tzinfo 或str):指定時(shí)區(qū)

import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):    
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('{} --- {}'.format(text, t))

scheduler = BlockingScheduler()
# 在每天22點(diǎn),每隔 1分鐘 運(yùn)行一次 job 方法
scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1'])
# 在每天22和23點(diǎn)的25分,運(yùn)行一次 job 方法
scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2'])

scheduler.start()

'''
運(yùn)行結(jié)果:
job2 --- 2020-08-28 16:20:00
job1 --- 2020-08-28 16:30:00
job2 --- 2020-08-28 17:20:00
'''

4、通過裝飾器scheduled_job()添加任務(wù)

import time
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

#指定時(shí)間內(nèi)每1分30秒執(zhí)行一次
@scheduler.scheduled_job('interval',  minutes=1, seconds = 30, start_date='2020-08-28 17:10:00', end_date='2020-08-28 17:20:00',)
def job1():
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('job1 --- {}'.format(t))

#17-18點(diǎn)內(nèi)每10分鐘執(zhí)行一次
@scheduler.scheduled_job('cron', hour='17-18', minute='*/10',)
def job2():
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('job2 --- {}'.format(t))

scheduler.start()

運(yùn)行結(jié)果:
job1 --- 2020-08-28 17:10:00
job2 --- 2020-08-28 17:10:00
job1 --- 2020-08-28 17:11:30
job1 --- 2020-08-28 17:13:00
job1 --- 2020-08-28 17:14:30
job1 --- 2020-08-28 17:16:00
job1 --- 2020-08-28 17:17:30
job1 --- 2020-08-28 17:19:00
job2 --- 2020-08-28 17:20:00
job2 --- 2020-08-28 17:30:00
job2 --- 2020-08-28 17:40:00
job2 --- 2020-08-28 17:50:00
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容