Python定時任務(wù)之APScheduler源碼分析(一)

前言

前面有一篇文章簡單介紹了Python的一些任務(wù)調(diào)度庫,并描述了 APScheduler 的工作原理及架構(gòu),這里再回顧一下 APScheduler 的架構(gòu), 這次主要以 v2.1.0 版本為分析目標,因為 2.x 版本與 3.x 版本之間加了很多功能,如異步任務(wù),代碼也重構(gòu)了不少,但是基本功能及概念沒有變化

APScheduler 的架構(gòu)

1、APScheduler 基本概念

APScheduler 由四個組件構(gòu)成(注:該部分翻譯至官方文檔):

  • triggers 觸發(fā)器

    觸發(fā)器包含調(diào)度邏輯。每個作業(yè)(job)都有自己的觸發(fā)器,用于確定下一個作業(yè)何時運行。除了最初的配置,觸發(fā)器是完全無狀態(tài)的

  • job stores 作業(yè)存儲

    job stores 是存放作業(yè)的地方,默認保存在內(nèi)存中。作業(yè)數(shù)據(jù)序列化后保存至持久性數(shù)據(jù)庫,從持久性數(shù)據(jù)庫加載回來時會反序列化。作業(yè)存儲(job stores)不將作業(yè)數(shù)據(jù)保存在內(nèi)存中(默認存儲除外),相反,內(nèi)存只是充當后端存儲在保存、加載、更新、查找作業(yè)時的中間人角色。作業(yè)存儲不能在調(diào)度器(schedulers) 之間共享

  • executors 執(zhí)行器

    執(zhí)行器處理作業(yè)的運行。它們通常通過將作業(yè)中的指定可調(diào)用部分提交給線程或進程池來實現(xiàn)這一點。 當作業(yè)完成后,執(zhí)行器通知調(diào)度器,然后調(diào)度器發(fā)出一個適當?shù)氖录?/p>

  • schedulers 調(diào)度器

    調(diào)度器是將其余部分綁定在一起的工具。通常只有一個調(diào)度器(scheduler)在應(yīng)用程序中運行。應(yīng)用程序開發(fā)者通常不直接處理作業(yè)存儲(job stores)、執(zhí)行器(executors)或者觸發(fā)器(triggers)。相反,調(diào)度器提供了適當?shù)慕涌趤硖幚硭鼈?。配置作業(yè)存儲(job stores)和執(zhí)行器(executors)是通過調(diào)度器(scheduler)來完成的,就像添加、修改和刪除 job(作業(yè))一樣

2、APScheduler 架構(gòu)圖

apscheduler架構(gòu)圖

APScheduler 代碼結(jié)構(gòu)

主要代碼邏輯都是由下面的文件組成,可以看出代碼結(jié)構(gòu)也根據(jù)功能不同而歸檔, 例如作業(yè)存儲模塊及觸發(fā)器模塊都單獨作為一個包管理,后面擴展其它作業(yè)存儲或觸發(fā)器也很容易管理,不會出現(xiàn)代碼結(jié)構(gòu)混亂,簡單而言,代碼層次分明

├── apscheduler
│   ├── __init__.py
│   ├── events.py
│   ├── job.py
│   ├── jobstores
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── mongodb_store.py
│   │   ├── ram_store.py
│   │   ├── redis_store.py
│   │   ├── shelve_store.py
│   │   └── sqlalchemy_store.py
│   ├── scheduler.py
│   ├── threadpool.py
│   ├── triggers
│   │   ├── __init__.py
│   │   ├── cron
│   │   │   ├── __init__.py
│   │   │   ├── expressions.py
│   │   │   └── fields.py
│   │   ├── interval.py
│   │   └── simple.py
│   └── util.py

官方示例

閱讀源碼,我一般第一步先閱讀該庫的官方文檔,先了解這個功能是什么用途,怎么使用,然后查看相關(guān)的單元測試代碼,這樣更有助于理解里面某個模塊或者某個類的用途及效果,最后以官方示例入手,一步一步深入到源碼內(nèi)部了解。那現(xiàn)在假設(shè)你已經(jīng)閱讀過官方文檔,知道這個庫的用途了,開始探索之旅吧!

"""
設(shè)置一個每隔 3 秒就運行一次的任務(wù)
"""
from datetime import datetime

from apscheduler.scheduler import Scheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


if __name__ == '__main__':
    scheduler = Scheduler(standalone=True)
    scheduler.add_interval_job(tick, seconds=3)
    print('Press Ctrl+C to exit')
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

從示例代碼看,第一步要實例化一下 Scheduler 類,這個類根據(jù)名稱就可以猜出它的功能了,它對應(yīng)著架構(gòu)圖里的調(diào)度器模塊,后續(xù)添加任務(wù)調(diào)度、啟動運行任務(wù)、觸發(fā)任務(wù)執(zhí)行,都要通過它來操作。仔細查看 Scheduler 實例化時還傳了個 standalone 參數(shù), 這個參數(shù)做什么用呢?接下來進入 Scheduler 類里面,查看它實例化時做了什么操作,看看 standalone 是什么作用

Scheduler 類

可以看出 __init__ 初始化時, 實例化了事件類(Event),并獲取了一些鎖,主要是 configure 函數(shù)進行任務(wù)開始前的配置,

可以看出實例化傳的 standalone 參數(shù)是裝載在 options 傳遞給 configure 進行配置使用

class Scheduler(object):
    """
    This class is responsible for scheduling jobs and triggering
    their execution.
    """

    _stopped = False
    _thread = None

    def __init__(self, gconfig={}, **options):
        self._wakeup = Event()
        self._jobstores = {}
        self._jobstores_lock = Lock()
        self._listeners = []
        self._listeners_lock = Lock()
        self._pending_jobs = []
        self.configure(gconfig, **options)

接下來看看 configure 做了什么工作, 可以看出,它的作用就是根據(jù)配置文件給調(diào)度器配置各種參數(shù),值得注意的是,下面幾個配置參數(shù)及其默認值, 參數(shù)配置的作用描述如下:

參數(shù)名稱 默認值 解釋
misfire_grace_time 1(s) 在允許作業(yè)執(zhí)行被延遲之前的最長時間(以秒為單位),比如這個值設(shè)置了 30s, 一個任務(wù)設(shè)置了 10:00:00運行,但10:00:00 由于一些原因沒有執(zhí)行,錯過了運行時間,但在 10:00:20 時調(diào)度器檢查這個任務(wù)還在這個設(shè)置誤差時間內(nèi),就可以繼續(xù)執(zhí)行,具體見解釋
coalesce True 將幾個待執(zhí)行的作業(yè)合并為一個,比如一個任務(wù)由于某個原因堆積了10次沒有執(zhí)行,該值為 True,只執(zhí)行最后一次任務(wù), 為 False 時,則測試執(zhí)行 10次相同的任務(wù),具體見解釋
daemonic True 控制調(diào)度器線程是否為守護程序。當standalone為True時,此選項無效。如果設(shè)置為 False, 當程序即將完成時必須顯式關(guān)閉調(diào)度器, 否則調(diào)度器會阻止程序終止,設(shè)置為 True, 調(diào)度器自動隨主程序的結(jié)束一起結(jié)束,但可能會在退出時引發(fā)異常
standalone False 設(shè)置為 True, start 函數(shù)會運行主循環(huán),它將直接在調(diào)用線程中運行,并且將阻塞直到?jīng)]有其他pending 狀態(tài)的作業(yè)。 設(shè)置為False, 當調(diào)用start() 時, 將會生成自己的線程, 具體見解釋
    def configure(self, gconfig={}, **options):
        """
        Reconfigures the scheduler with the given options. Can only be done
        when the scheduler isn't running.
        """
        # 判斷當前調(diào)度器是否運行狀態(tài),運行狀態(tài)直接報錯,因為這是配置調(diào)度器的全局配置,不允許運行期間動態(tài)修改配置
        if self.running:
            raise SchedulerAlreadyRunningError

        # Set general options(設(shè)置通用選項)
        config = combine_opts(gconfig, 'apscheduler.', options)
        # 這幾個參數(shù)都是存在默認值的,下面表格會詳細解釋參數(shù)作用
        self.misfire_grace_time = int(config.pop('misfire_grace_time', 1))
        self.coalesce = asbool(config.pop('coalesce', True))
        self.daemonic = asbool(config.pop('daemonic', True))
        self.standalone = asbool(config.pop('standalone', False))

        # Configure the thread pool(配置線程池, 這個線程池也就是queue 加鎖實現(xiàn)的)
        if 'threadpool' in config:
            self._threadpool = maybe_ref(config['threadpool'])
        else:
            threadpool_opts = combine_opts(config, 'threadpool.')
            self._threadpool = ThreadPool(**threadpool_opts)

        # Configure job stores(配置作業(yè)存儲)
        jobstore_opts = combine_opts(config, 'jobstore.')
        jobstores = {}
        for key, value in jobstore_opts.items():
            store_name, option = key.split('.', 1)
            opts_dict = jobstores.setdefault(store_name, {})
            opts_dict[option] = value

        for alias, opts in jobstores.items():
            classname = opts.pop('class')
            cls = maybe_ref(classname)
            jobstore = cls(**opts)
            self.add_jobstore(jobstore, alias, True)

configure 函數(shù)的作用已經(jīng)很明顯了,

  • Scheduler 調(diào)度器的通用配置項,如 misfire_grace_time 等
  • 配置線程池
  • 配置作業(yè)存儲并添加給 Scheduler, 本質(zhì)上就是將添加到 _jobstores 字典里,并從各種作業(yè)存儲中加載出所有的任務(wù),最后發(fā)送作業(yè)存儲添加的事件通知

添加作業(yè)到調(diào)度器

從剛才的官方示例中,實例化完成調(diào)度器后,緊跟著就是使用 add_interval_job 為調(diào)度器添加一個任務(wù)作業(yè),不同的任務(wù)使用不同的函數(shù)添加,比如這里是為了定時執(zhí)行作業(yè),使用了 add_interval_job 并設(shè)置了 3秒執(zhí)行一次的策略。如果是指定某個日期執(zhí)行的作業(yè),那就使用 add_date_job 添加到調(diào)度器中?,F(xiàn)在進入 add_interval_job ,看看這個函數(shù)做了什么

    def add_interval_job(self, func, weeks=0, days=0, hours=0, minutes=0,
                         seconds=0, start_date=None, args=None, kwargs=None,
                         **options):
        """
                相關(guān)注釋
        """
        # 使用Python datetime.timedelta 函數(shù)包裝相關(guān)的參數(shù)
        interval = timedelta(weeks=weeks, days=days, hours=hours,
                             minutes=minutes, seconds=seconds)
        # 實例化一個觸發(fā)器類,根據(jù)任務(wù)的不同,實例化不同觸發(fā)器。然后通過 add_job 添加作業(yè)
        trigger = IntervalTrigger(interval, start_date)
        return self.add_job(trigger, func, args, kwargs, **options)

終于看到架構(gòu)圖中觸發(fā)器模塊的相關(guān)邏輯,在沒看代碼之前,大致可以猜到這個觸發(fā)器只要負責(zé)根據(jù)用戶設(shè)置的時間規(guī)則來計算出下次觸發(fā)作業(yè)執(zhí)行的時間。比如官方示例中 add_interval_job(tick, seconds=3), 用戶設(shè)置了 seconds 為 3秒,這個觸發(fā)器就負責(zé)根據(jù)當前時間計算出下次作業(yè)執(zhí)行的時間,接下來看看觸發(fā)器的初始化函數(shù)做了什么操作

IntervalTrigger
class IntervalTrigger(object):
    def __init__(self, interval, start_date=None):
        # interval 為上面 datetime.timedelta 對象,start_date 根據(jù)上面默認為 None
        if not isinstance(interval, timedelta):
            raise TypeError('interval must be a timedelta')
        # 如果有 start_date,則先轉(zhuǎn)換成 datetime 類型
        if start_date:
            # convert_to_datetime 可以接收 datetime、date、字符串格式三種類型并轉(zhuǎn)換
            # 為 datetime類型,這個通用方法使用正則表達式對字符串格式進行匹配,有點用
            start_date = convert_to_datetime(start_date)

        self.interval = interval
        # 將 timedelta 轉(zhuǎn)換成秒數(shù), 這里按照官方示例就是 3s 了
        self.interval_length = timedelta_seconds(self.interval)
        # 如果沒有設(shè)置任何的時間,那么默認就是 1秒執(zhí)行一次作業(yè)
        if self.interval_length == 0:
            self.interval = timedelta(seconds=1)
            self.interval_length = 1
                
        # 這里設(shè)置了作業(yè)的起始執(zhí)行時間,根據(jù)當前時間 + 用戶設(shè)置的時間點(如上面設(shè)置了3秒)后執(zhí)行
        if start_date is None:
            self.start_date = datetime.now() + self.interval
        else:
            self.start_date = convert_to_datetime(start_date)

接下來回到添加 Job 作業(yè)的地方(self.add_job),這個將觸發(fā)器實例化的對象及相關(guān)要執(zhí)行的任務(wù)的函數(shù)及參數(shù)傳入,在里面

實例化了 Job 類,并根據(jù) Scheduler 的運行情況 將 job 實例添加到 Scheduler 類的 _pending_jobs 等待隊列中或者直接添加到 jobstore 中

    def add_job(self, trigger, func, args, kwargs, jobstore='default',
                **options):
        # 這時候 misfire_grace_time、coalesce 兩次參數(shù)才會有用途
        job = Job(trigger, func, args or [], kwargs or {},
                  options.pop('misfire_grace_time', self.misfire_grace_time),
                  options.pop('coalesce', self.coalesce), **options)
        # 如果 Scheduler 還沒有運行,直接添加到 self._pending_jobs 隊列,因為這時候 jobstore 還沒實例化
        # 也就無法直接添加到 jobstore 中
        if not self.running:
            self._pending_jobs.append((job, jobstore))
            logger.info('Adding job tentatively -- it will be properly '
                        'scheduled when the scheduler starts')
        # 添加 job 到 jobstore 中
        else:
            self._real_add_job(job, jobstore, True)
        return job

這里做個小總結(jié),可以看出 Scheduler 聯(lián)系起所有的模塊,包括觸發(fā)器模塊的實例化、作業(yè)存儲的相關(guān)配置、Job類的實例化,所有子模塊的初始化都委托給 Scheduler 執(zhí)行(這才對得起這個命名吧),并且都是通過 add_interval_job、add_job 這種簡而易懂的方式來將任務(wù)的所有環(huán)節(jié)串聯(lián)起來,值得思考并應(yīng)用。

接著進入 _real_add_job 看看做了什么操作

    def _real_add_job(self, job, jobstore, wakeup):
        # 計算下一次運行時間,實際上調(diào)用了 Trigger(觸發(fā)器提供的計算下一次時間)的get_next_fire_time 計算
        job.compute_next_run_time(datetime.now())
        if not job.next_run_time:
            raise ValueError('Not adding job since it would never be run')

        self._jobstores_lock.acquire()
        try:
            try:
                store = self._jobstores[jobstore]
            except KeyError:
                raise KeyError('No such job store: %s' % jobstore)
            # 把執(zhí)行任務(wù)添加到存儲中
            store.add_job(job)
        finally:
            self._jobstores_lock.release()

        # Notify listeners that a new job has been added,(新增任務(wù)事件發(fā)送)
        event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job)
        self._notify_listeners(event)

        logger.info('Added job "%s" to job store "%s"', job, jobstore)

        # Notify the scheduler about the new job, 這個很關(guān)鍵,用Python的事件機制來喚醒scheduler(下面會詳細解釋)
        if wakeup:
            self._wakeup.set()
            
class IntervalTrigger(object):
    def __init__(self, interval, start_date=None):
                """省略,具體見上面"""

    def get_next_fire_time(self, start_date):
        # start_date 當前時間, self.start_date 任務(wù)啟動的時間點(初始化IntervalTrigger類的時間)
        if start_date < self.start_date:
            return self.start_date

        timediff_seconds = timedelta_seconds(start_date - self.start_date)
        next_interval_num = int(ceil(timediff_seconds / self.interval_length))
        return self.start_date + self.interval * next_interval_num        

start 函數(shù)

前面分析了這么多,其實我們只跑了下面這兩行代碼的相關(guān)邏輯,也就是通過 Scheduler 來構(gòu)建作業(yè),并設(shè)置全局配置包括作業(yè)存儲的配置、觸發(fā)器相關(guān)信息、最小的執(zhí)行單元 Job 等操作, 這時候任務(wù)還沒有真正執(zhí)行起來,想要執(zhí)行作業(yè)任務(wù),還得運行 Scheduler 的 start 來啟用調(diào)度器

    scheduler = Scheduler(standalone=True)
    scheduler.add_interval_job(tick, seconds=3)

下面我們就深入 start 函數(shù),了解這個函數(shù)是如何開始任務(wù)的調(diào)度的

    def start(self):
        """
        (在一個新的線程中開啟一個調(diào)度器)
        Starts the scheduler in a new thread.
                線程模式, 在 scheduler 線程啟動后立即返回
        In threaded mode (the default), this method will return immediately
        after starting the scheduler thread.
        標準模式, 這個函數(shù)會阻塞直到?jīng)]有需要調(diào)度的作業(yè)
        In standalone mode, this method will block until there are no more
        scheduled jobs.
        """
        if self.running:
            raise SchedulerAlreadyRunningError

        # Create a RAMJobStore as the default if there is no default job store
        # 這個地方在沒有配置任何的作用存儲情況下,默認使用的內(nèi)存存儲
        if not 'default' in self._jobstores:
            self.add_jobstore(RAMJobStore(), 'default', True)

        # Schedule all pending jobs
        # 將所有的作業(yè)添加到作業(yè)存儲中
        for job, jobstore in self._pending_jobs:
            # 上面已經(jīng)解釋過這個函數(shù)
            self._real_add_job(job, jobstore, False)
        del self._pending_jobs[:]

        self._stopped = False
        if self.standalone:
            self._main_loop()
        else:
            self._thread = Thread(target=self._main_loop, name='APScheduler')
            self._thread.setDaemon(self.daemonic)
            self._thread.start()

從上面代碼來來,start 函數(shù)根據(jù) standalone 的配置不同啟用不同的模式來運行。

  • 使用 standalone 模式,則調(diào)用 _main_loop 函數(shù)運行一個死循環(huán),直到調(diào)用 shutdown 函數(shù)關(guān)閉

  • 線程模式則是啟用一個后臺守護線程進行任務(wù)的執(zhí)行,如果上面示例的 standalone 設(shè)置為 False,daemonic 默認情況下為 True,這時候就會設(shè)置線程為守護線程

    當程序中所有的非守護線程都完成執(zhí)行時,任何剩余的守護線程將在 Python程序退出時被放棄,因此示例中的任務(wù)也就沒有運行就已經(jīng)結(jié)束,看不到任何的輸出

if __name__ == '__main__':
    # 設(shè)置為線程模式
    scheduler = Scheduler(standalone=False)
    scheduler.add_interval_job(tick, seconds=3)
    print('Press Ctrl+C to exit')
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass
    
    # 加這一步讓主程序(也就是所謂的非守護線程一直運行中)
    import time
    while True:
      time.sleep(1)
# 運行輸出結(jié)果:主程序運行完成后直接退出
Press Ctrl+C to exit
主循環(huán) main_loop
    def _main_loop(self):
        """Executes jobs on schedule."""

        logger.info('Scheduler started')
        # 事件通知 Scheduler 啟用
        self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START))
                # 清理 threading.Event 的設(shè)置
        self._wakeup.clear()
        while not self._stopped:
            logger.debug('Looking for jobs to run')
            now = datetime.now()
            # 獲取下一次醒來的時間
            next_wakeup_time = self._process_jobs(now)

            # Sleep until the next job is scheduled to be run,
            # a new job is added or the scheduler is stopped
            if next_wakeup_time is not None:
                # 計算等待時間,時間不到就一直阻塞著
                wait_seconds = time_difference(next_wakeup_time, now)
                logger.debug('Next wakeup is due at %s (in %f seconds)',
                             next_wakeup_time, wait_seconds)
                # 通過 threading.Event 的 wait 設(shè)置線程等待 wait_seconds 長時間
                self._wakeup.wait(wait_seconds)
                # 將標志設(shè)置為 False
                self._wakeup.clear()
            elif self.standalone:
                logger.debug('No jobs left; shutting down scheduler')
                self.shutdown()
                break
            else:
                logger.debug('No jobs; waiting until a job is added')
                self._wakeup.wait()
                self._wakeup.clear()

        logger.info('Scheduler has been shut down')
        self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))

主線程本質(zhì)上是一個死循環(huán),不斷獲取作業(yè)任務(wù),并獲取作業(yè)的下一次執(zhí)行時間,然后使用 Python threading.Event 模塊讓線程阻塞一段時間(一次循環(huán)結(jié)束之前會計算任務(wù)下次執(zhí)行事件與當前時間之差),這樣就不用在死循環(huán)中不斷從 jobstore 存儲中取出任務(wù),然后計算執(zhí)行時間,這樣會浪費 Scheduler 的資源,也加重了 jobstore 取作業(yè)的負擔(dān)。

現(xiàn)在來回顧一下Python threading Event 模塊的官方描述:

未完待續(xù)

下一篇接著分析 Event 的具體使用及后續(xù)代碼

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容