前言
前面有一篇文章簡單介紹了Python的一些任務(wù)調(diào)度庫,并描述了 APScheduler 的工作原理及架構(gòu),這里再回顧一下 APScheduler 的架構(gòu), 這次主要以 v2.1.0 版本為分析目標,因為 2.x 版本與 3.x 版本之間加了很多功能,如異步任務(wù),代碼也重構(gòu)了不少,但是基本功能及概念沒有變化
APScheduler 的架構(gòu)
1、APScheduler 基本概念
APScheduler 由四個組件構(gòu)成(注:該部分翻譯至官方文檔):
-
triggers 觸發(fā)器
觸發(fā)器包含調(diào)度邏輯。每個作業(yè)(job)都有自己的觸發(fā)器,用于確定下一個作業(yè)何時運行。除了最初的配置,觸發(fā)器是完全無狀態(tài)的
-
job stores 作業(yè)存儲
job stores 是存放作業(yè)的地方,默認保存在內(nèi)存中。作業(yè)數(shù)據(jù)序列化后保存至持久性數(shù)據(jù)庫,從持久性數(shù)據(jù)庫加載回來時會反序列化。作業(yè)存儲(job stores)不將作業(yè)數(shù)據(jù)保存在內(nèi)存中(默認存儲除外),相反,內(nèi)存只是充當后端存儲在保存、加載、更新、查找作業(yè)時的中間人角色。作業(yè)存儲不能在調(diào)度器(schedulers) 之間共享
-
executors 執(zhí)行器
執(zhí)行器處理作業(yè)的運行。它們通常通過將作業(yè)中的指定可調(diào)用部分提交給線程或進程池來實現(xiàn)這一點。 當作業(yè)完成后,執(zhí)行器通知調(diào)度器,然后調(diào)度器發(fā)出一個適當?shù)氖录?/p>
-
schedulers 調(diào)度器
調(diào)度器是將其余部分綁定在一起的工具。通常只有一個調(diào)度器(scheduler)在應(yīng)用程序中運行。應(yīng)用程序開發(fā)者通常不直接處理作業(yè)存儲(job stores)、執(zhí)行器(executors)或者觸發(fā)器(triggers)。相反,調(diào)度器提供了適當?shù)慕涌趤硖幚硭鼈?。配置作業(yè)存儲(job stores)和執(zhí)行器(executors)是通過調(diào)度器(scheduler)來完成的,就像添加、修改和刪除 job(作業(yè))一樣
2、APScheduler 架構(gòu)圖

APScheduler 代碼結(jié)構(gòu)
主要代碼邏輯都是由下面的文件組成,可以看出代碼結(jié)構(gòu)也根據(jù)功能不同而歸檔, 例如作業(yè)存儲模塊及觸發(fā)器模塊都單獨作為一個包管理,后面擴展其它作業(yè)存儲或觸發(fā)器也很容易管理,不會出現(xiàn)代碼結(jié)構(gòu)混亂,簡單而言,代碼層次分明
├── apscheduler
│ ├── __init__.py
│ ├── events.py
│ ├── job.py
│ ├── jobstores
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── mongodb_store.py
│ │ ├── ram_store.py
│ │ ├── redis_store.py
│ │ ├── shelve_store.py
│ │ └── sqlalchemy_store.py
│ ├── scheduler.py
│ ├── threadpool.py
│ ├── triggers
│ │ ├── __init__.py
│ │ ├── cron
│ │ │ ├── __init__.py
│ │ │ ├── expressions.py
│ │ │ └── fields.py
│ │ ├── interval.py
│ │ └── simple.py
│ └── util.py
官方示例
閱讀源碼,我一般第一步先閱讀該庫的官方文檔,先了解這個功能是什么用途,怎么使用,然后查看相關(guān)的單元測試代碼,這樣更有助于理解里面某個模塊或者某個類的用途及效果,最后以官方示例入手,一步一步深入到源碼內(nèi)部了解。那現(xiàn)在假設(shè)你已經(jīng)閱讀過官方文檔,知道這個庫的用途了,開始探索之旅吧!
"""
設(shè)置一個每隔 3 秒就運行一次的任務(wù)
"""
from datetime import datetime
from apscheduler.scheduler import Scheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = Scheduler(standalone=True)
scheduler.add_interval_job(tick, seconds=3)
print('Press Ctrl+C to exit')
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
從示例代碼看,第一步要實例化一下 Scheduler 類,這個類根據(jù)名稱就可以猜出它的功能了,它對應(yīng)著架構(gòu)圖里的調(diào)度器模塊,后續(xù)添加任務(wù)調(diào)度、啟動運行任務(wù)、觸發(fā)任務(wù)執(zhí)行,都要通過它來操作。仔細查看 Scheduler 實例化時還傳了個 standalone 參數(shù), 這個參數(shù)做什么用呢?接下來進入 Scheduler 類里面,查看它實例化時做了什么操作,看看 standalone 是什么作用
Scheduler 類
可以看出 __init__ 初始化時, 實例化了事件類(Event),并獲取了一些鎖,主要是 configure 函數(shù)進行任務(wù)開始前的配置,
可以看出實例化傳的 standalone 參數(shù)是裝載在 options 傳遞給 configure 進行配置使用
class Scheduler(object):
"""
This class is responsible for scheduling jobs and triggering
their execution.
"""
_stopped = False
_thread = None
def __init__(self, gconfig={}, **options):
self._wakeup = Event()
self._jobstores = {}
self._jobstores_lock = Lock()
self._listeners = []
self._listeners_lock = Lock()
self._pending_jobs = []
self.configure(gconfig, **options)
接下來看看 configure 做了什么工作, 可以看出,它的作用就是根據(jù)配置文件給調(diào)度器配置各種參數(shù),值得注意的是,下面幾個配置參數(shù)及其默認值, 參數(shù)配置的作用描述如下:
| 參數(shù)名稱 | 默認值 | 解釋 |
|---|---|---|
| misfire_grace_time | 1(s) | 在允許作業(yè)執(zhí)行被延遲之前的最長時間(以秒為單位),比如這個值設(shè)置了 30s, 一個任務(wù)設(shè)置了 10:00:00運行,但10:00:00 由于一些原因沒有執(zhí)行,錯過了運行時間,但在 10:00:20 時調(diào)度器檢查這個任務(wù)還在這個設(shè)置誤差時間內(nèi),就可以繼續(xù)執(zhí)行,具體見解釋 |
| coalesce | True | 將幾個待執(zhí)行的作業(yè)合并為一個,比如一個任務(wù)由于某個原因堆積了10次沒有執(zhí)行,該值為 True,只執(zhí)行最后一次任務(wù), 為 False 時,則測試執(zhí)行 10次相同的任務(wù),具體見解釋 |
| daemonic | True | 控制調(diào)度器線程是否為守護程序。當standalone為True時,此選項無效。如果設(shè)置為 False, 當程序即將完成時必須顯式關(guān)閉調(diào)度器, 否則調(diào)度器會阻止程序終止,設(shè)置為 True, 調(diào)度器自動隨主程序的結(jié)束一起結(jié)束,但可能會在退出時引發(fā)異常 |
| standalone | False | 設(shè)置為 True, start 函數(shù)會運行主循環(huán),它將直接在調(diào)用線程中運行,并且將阻塞直到?jīng)]有其他pending 狀態(tài)的作業(yè)。 設(shè)置為False, 當調(diào)用start() 時, 將會生成自己的線程, 具體見解釋 |
def configure(self, gconfig={}, **options):
"""
Reconfigures the scheduler with the given options. Can only be done
when the scheduler isn't running.
"""
# 判斷當前調(diào)度器是否運行狀態(tài),運行狀態(tài)直接報錯,因為這是配置調(diào)度器的全局配置,不允許運行期間動態(tài)修改配置
if self.running:
raise SchedulerAlreadyRunningError
# Set general options(設(shè)置通用選項)
config = combine_opts(gconfig, 'apscheduler.', options)
# 這幾個參數(shù)都是存在默認值的,下面表格會詳細解釋參數(shù)作用
self.misfire_grace_time = int(config.pop('misfire_grace_time', 1))
self.coalesce = asbool(config.pop('coalesce', True))
self.daemonic = asbool(config.pop('daemonic', True))
self.standalone = asbool(config.pop('standalone', False))
# Configure the thread pool(配置線程池, 這個線程池也就是queue 加鎖實現(xiàn)的)
if 'threadpool' in config:
self._threadpool = maybe_ref(config['threadpool'])
else:
threadpool_opts = combine_opts(config, 'threadpool.')
self._threadpool = ThreadPool(**threadpool_opts)
# Configure job stores(配置作業(yè)存儲)
jobstore_opts = combine_opts(config, 'jobstore.')
jobstores = {}
for key, value in jobstore_opts.items():
store_name, option = key.split('.', 1)
opts_dict = jobstores.setdefault(store_name, {})
opts_dict[option] = value
for alias, opts in jobstores.items():
classname = opts.pop('class')
cls = maybe_ref(classname)
jobstore = cls(**opts)
self.add_jobstore(jobstore, alias, True)
configure 函數(shù)的作用已經(jīng)很明顯了,
- Scheduler 調(diào)度器的通用配置項,如 misfire_grace_time 等
- 配置線程池
- 配置作業(yè)存儲并添加給 Scheduler, 本質(zhì)上就是將添加到 _jobstores 字典里,并從各種作業(yè)存儲中加載出所有的任務(wù),最后發(fā)送作業(yè)存儲添加的事件通知
添加作業(yè)到調(diào)度器
從剛才的官方示例中,實例化完成調(diào)度器后,緊跟著就是使用 add_interval_job 為調(diào)度器添加一個任務(wù)作業(yè),不同的任務(wù)使用不同的函數(shù)添加,比如這里是為了定時執(zhí)行作業(yè),使用了 add_interval_job 并設(shè)置了 3秒執(zhí)行一次的策略。如果是指定某個日期執(zhí)行的作業(yè),那就使用 add_date_job 添加到調(diào)度器中?,F(xiàn)在進入 add_interval_job ,看看這個函數(shù)做了什么
def add_interval_job(self, func, weeks=0, days=0, hours=0, minutes=0,
seconds=0, start_date=None, args=None, kwargs=None,
**options):
"""
相關(guān)注釋
"""
# 使用Python datetime.timedelta 函數(shù)包裝相關(guān)的參數(shù)
interval = timedelta(weeks=weeks, days=days, hours=hours,
minutes=minutes, seconds=seconds)
# 實例化一個觸發(fā)器類,根據(jù)任務(wù)的不同,實例化不同觸發(fā)器。然后通過 add_job 添加作業(yè)
trigger = IntervalTrigger(interval, start_date)
return self.add_job(trigger, func, args, kwargs, **options)
終于看到架構(gòu)圖中觸發(fā)器模塊的相關(guān)邏輯,在沒看代碼之前,大致可以猜到這個觸發(fā)器只要負責(zé)根據(jù)用戶設(shè)置的時間規(guī)則來計算出下次觸發(fā)作業(yè)執(zhí)行的時間。比如官方示例中 add_interval_job(tick, seconds=3), 用戶設(shè)置了 seconds 為 3秒,這個觸發(fā)器就負責(zé)根據(jù)當前時間計算出下次作業(yè)執(zhí)行的時間,接下來看看觸發(fā)器的初始化函數(shù)做了什么操作
IntervalTrigger
class IntervalTrigger(object):
def __init__(self, interval, start_date=None):
# interval 為上面 datetime.timedelta 對象,start_date 根據(jù)上面默認為 None
if not isinstance(interval, timedelta):
raise TypeError('interval must be a timedelta')
# 如果有 start_date,則先轉(zhuǎn)換成 datetime 類型
if start_date:
# convert_to_datetime 可以接收 datetime、date、字符串格式三種類型并轉(zhuǎn)換
# 為 datetime類型,這個通用方法使用正則表達式對字符串格式進行匹配,有點用
start_date = convert_to_datetime(start_date)
self.interval = interval
# 將 timedelta 轉(zhuǎn)換成秒數(shù), 這里按照官方示例就是 3s 了
self.interval_length = timedelta_seconds(self.interval)
# 如果沒有設(shè)置任何的時間,那么默認就是 1秒執(zhí)行一次作業(yè)
if self.interval_length == 0:
self.interval = timedelta(seconds=1)
self.interval_length = 1
# 這里設(shè)置了作業(yè)的起始執(zhí)行時間,根據(jù)當前時間 + 用戶設(shè)置的時間點(如上面設(shè)置了3秒)后執(zhí)行
if start_date is None:
self.start_date = datetime.now() + self.interval
else:
self.start_date = convert_to_datetime(start_date)
接下來回到添加 Job 作業(yè)的地方(self.add_job),這個將觸發(fā)器實例化的對象及相關(guān)要執(zhí)行的任務(wù)的函數(shù)及參數(shù)傳入,在里面
實例化了 Job 類,并根據(jù) Scheduler 的運行情況 將 job 實例添加到 Scheduler 類的 _pending_jobs 等待隊列中或者直接添加到 jobstore 中
def add_job(self, trigger, func, args, kwargs, jobstore='default',
**options):
# 這時候 misfire_grace_time、coalesce 兩次參數(shù)才會有用途
job = Job(trigger, func, args or [], kwargs or {},
options.pop('misfire_grace_time', self.misfire_grace_time),
options.pop('coalesce', self.coalesce), **options)
# 如果 Scheduler 還沒有運行,直接添加到 self._pending_jobs 隊列,因為這時候 jobstore 還沒實例化
# 也就無法直接添加到 jobstore 中
if not self.running:
self._pending_jobs.append((job, jobstore))
logger.info('Adding job tentatively -- it will be properly '
'scheduled when the scheduler starts')
# 添加 job 到 jobstore 中
else:
self._real_add_job(job, jobstore, True)
return job
這里做個小總結(jié),可以看出 Scheduler 聯(lián)系起所有的模塊,包括觸發(fā)器模塊的實例化、作業(yè)存儲的相關(guān)配置、Job類的實例化,所有子模塊的初始化都委托給 Scheduler 執(zhí)行(這才對得起這個命名吧),并且都是通過 add_interval_job、add_job 這種簡而易懂的方式來將任務(wù)的所有環(huán)節(jié)串聯(lián)起來,值得思考并應(yīng)用。
接著進入 _real_add_job 看看做了什么操作
def _real_add_job(self, job, jobstore, wakeup):
# 計算下一次運行時間,實際上調(diào)用了 Trigger(觸發(fā)器提供的計算下一次時間)的get_next_fire_time 計算
job.compute_next_run_time(datetime.now())
if not job.next_run_time:
raise ValueError('Not adding job since it would never be run')
self._jobstores_lock.acquire()
try:
try:
store = self._jobstores[jobstore]
except KeyError:
raise KeyError('No such job store: %s' % jobstore)
# 把執(zhí)行任務(wù)添加到存儲中
store.add_job(job)
finally:
self._jobstores_lock.release()
# Notify listeners that a new job has been added,(新增任務(wù)事件發(fā)送)
event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job)
self._notify_listeners(event)
logger.info('Added job "%s" to job store "%s"', job, jobstore)
# Notify the scheduler about the new job, 這個很關(guān)鍵,用Python的事件機制來喚醒scheduler(下面會詳細解釋)
if wakeup:
self._wakeup.set()
class IntervalTrigger(object):
def __init__(self, interval, start_date=None):
"""省略,具體見上面"""
def get_next_fire_time(self, start_date):
# start_date 當前時間, self.start_date 任務(wù)啟動的時間點(初始化IntervalTrigger類的時間)
if start_date < self.start_date:
return self.start_date
timediff_seconds = timedelta_seconds(start_date - self.start_date)
next_interval_num = int(ceil(timediff_seconds / self.interval_length))
return self.start_date + self.interval * next_interval_num
start 函數(shù)
前面分析了這么多,其實我們只跑了下面這兩行代碼的相關(guān)邏輯,也就是通過 Scheduler 來構(gòu)建作業(yè),并設(shè)置全局配置包括作業(yè)存儲的配置、觸發(fā)器相關(guān)信息、最小的執(zhí)行單元 Job 等操作, 這時候任務(wù)還沒有真正執(zhí)行起來,想要執(zhí)行作業(yè)任務(wù),還得運行 Scheduler 的 start 來啟用調(diào)度器
scheduler = Scheduler(standalone=True)
scheduler.add_interval_job(tick, seconds=3)
下面我們就深入 start 函數(shù),了解這個函數(shù)是如何開始任務(wù)的調(diào)度的
def start(self):
"""
(在一個新的線程中開啟一個調(diào)度器)
Starts the scheduler in a new thread.
線程模式, 在 scheduler 線程啟動后立即返回
In threaded mode (the default), this method will return immediately
after starting the scheduler thread.
標準模式, 這個函數(shù)會阻塞直到?jīng)]有需要調(diào)度的作業(yè)
In standalone mode, this method will block until there are no more
scheduled jobs.
"""
if self.running:
raise SchedulerAlreadyRunningError
# Create a RAMJobStore as the default if there is no default job store
# 這個地方在沒有配置任何的作用存儲情況下,默認使用的內(nèi)存存儲
if not 'default' in self._jobstores:
self.add_jobstore(RAMJobStore(), 'default', True)
# Schedule all pending jobs
# 將所有的作業(yè)添加到作業(yè)存儲中
for job, jobstore in self._pending_jobs:
# 上面已經(jīng)解釋過這個函數(shù)
self._real_add_job(job, jobstore, False)
del self._pending_jobs[:]
self._stopped = False
if self.standalone:
self._main_loop()
else:
self._thread = Thread(target=self._main_loop, name='APScheduler')
self._thread.setDaemon(self.daemonic)
self._thread.start()
從上面代碼來來,start 函數(shù)根據(jù) standalone 的配置不同啟用不同的模式來運行。
使用 standalone 模式,則調(diào)用 _main_loop 函數(shù)運行一個死循環(huán),直到調(diào)用 shutdown 函數(shù)關(guān)閉
-
線程模式則是啟用一個后臺守護線程進行任務(wù)的執(zhí)行,如果上面示例的 standalone 設(shè)置為 False,daemonic 默認情況下為 True,這時候就會設(shè)置線程為守護線程
當程序中所有的非守護線程都完成執(zhí)行時,任何剩余的守護線程將在 Python程序退出時被放棄,因此示例中的任務(wù)也就沒有運行就已經(jīng)結(jié)束,看不到任何的輸出
if __name__ == '__main__':
# 設(shè)置為線程模式
scheduler = Scheduler(standalone=False)
scheduler.add_interval_job(tick, seconds=3)
print('Press Ctrl+C to exit')
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
# 加這一步讓主程序(也就是所謂的非守護線程一直運行中)
import time
while True:
time.sleep(1)
# 運行輸出結(jié)果:主程序運行完成后直接退出
Press Ctrl+C to exit
主循環(huán) main_loop
def _main_loop(self):
"""Executes jobs on schedule."""
logger.info('Scheduler started')
# 事件通知 Scheduler 啟用
self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START))
# 清理 threading.Event 的設(shè)置
self._wakeup.clear()
while not self._stopped:
logger.debug('Looking for jobs to run')
now = datetime.now()
# 獲取下一次醒來的時間
next_wakeup_time = self._process_jobs(now)
# Sleep until the next job is scheduled to be run,
# a new job is added or the scheduler is stopped
if next_wakeup_time is not None:
# 計算等待時間,時間不到就一直阻塞著
wait_seconds = time_difference(next_wakeup_time, now)
logger.debug('Next wakeup is due at %s (in %f seconds)',
next_wakeup_time, wait_seconds)
# 通過 threading.Event 的 wait 設(shè)置線程等待 wait_seconds 長時間
self._wakeup.wait(wait_seconds)
# 將標志設(shè)置為 False
self._wakeup.clear()
elif self.standalone:
logger.debug('No jobs left; shutting down scheduler')
self.shutdown()
break
else:
logger.debug('No jobs; waiting until a job is added')
self._wakeup.wait()
self._wakeup.clear()
logger.info('Scheduler has been shut down')
self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))
主線程本質(zhì)上是一個死循環(huán),不斷獲取作業(yè)任務(wù),并獲取作業(yè)的下一次執(zhí)行時間,然后使用 Python threading.Event 模塊讓線程阻塞一段時間(一次循環(huán)結(jié)束之前會計算任務(wù)下次執(zhí)行事件與當前時間之差),這樣就不用在死循環(huán)中不斷從 jobstore 存儲中取出任務(wù),然后計算執(zhí)行時間,這樣會浪費 Scheduler 的資源,也加重了 jobstore 取作業(yè)的負擔(dān)。
現(xiàn)在來回顧一下Python threading Event 模塊的官方描述:
未完待續(xù)
下一篇接著分析 Event 的具體使用及后續(xù)代碼
