celery 源碼筆記(一)

由于工作需要,開一個(gè)celery源碼筆記的坑。

啟動(dòng)

從github上下載源碼打開后,可看到源碼的結(jié)構(gòu)如下:

celery目錄結(jié)構(gòu)

打開setup.py文件,在文件的最后可以看到
1533043004390.png

因此,可以分析出celery的入口是celery/__main__.py文件的main函數(shù),函數(shù)定義如下

def main():
    """Entrypoint to the ``celery`` umbrella command."""
    if 'multi' not in sys.argv:
        maybe_patch_concurrency()
    from celery.bin.celery import main as _main
    _main()

這里可以看到,main函數(shù)會(huì)調(diào)用celery.bin.celery模塊的main()函數(shù),轉(zhuǎn)到定義,在代碼中可以看到這里的主要邏輯為

cmd = CeleryCommand() # 創(chuàng)建CeleryComman對(duì)象
cmd.execute_from_commandline(argv) # 從命令行啟動(dòng)

CeleryCommand對(duì)象在celery/bin/celery.py文件,這里可以看到CeleryCommand繼承自Command類(該類的聲明celery/bin/base.py文件,在很多類都是由該類派生出來,以后會(huì)提到),由于CeleryCommand并沒有實(shí)現(xiàn)自己的__init__函數(shù),因此會(huì)調(diào)用Command類的__init__函數(shù)進(jìn)行初始化

    def __init__(self, app=None, get_app=None, no_color=False,
                 stdout=None, stderr=None, quiet=False, on_error=None,
                 on_usage_error=None):
        self.app = app
        self.get_app = get_app or self._get_default_app
        self.stdout = stdout or sys.stdout
        self.stderr = stderr or sys.stderr
        self._colored = None
        self._no_color = no_color
        self.quiet = quiet
        if not self.description:
            self.description = self._strip_restructeredtext(self.__doc__)
        if on_error:
            self.on_error = on_error
        if on_usage_error:
            self.on_usage_error = on_usage_error

這里我們可以看到__init__執(zhí)行進(jìn)行了一些簡(jiǎn)單的初始化工作。接下來分析execute_from_commandline函數(shù)

    def execute_from_commandline(self, argv=None):
        argv = sys.argv if argv is None else argv
        if 'multi' in argv[1:3]:  # Issue 1008
            self.respects_app_option = False
        try:
            sys.exit(determine_exit_status(
                super(CeleryCommand, self).execute_from_commandline(argv)))
        except KeyboardInterrupt:
            sys.exit(EX_FAILURE)

CeleryCommandexecute_from_commandline函數(shù)中,我們可以看到這里調(diào)用了Command類的execute_from_commandline函數(shù)

    def execute_from_commandline(self, argv=None):
        """Execute application from command-line.

        Arguments:
            argv (List[str]): The list of command-line arguments.
                Defaults to ``sys.argv``.
        """
        if argv is None:
            argv = list(sys.argv)
        # Should we load any special concurrency environment?
        self.maybe_patch_concurrency(argv)
        self.on_concurrency_setup()

        # Dump version and exit if '--version' arg set.
        self.early_version(argv)
        argv = self.setup_app_from_commandline(argv) # 解析命令行參數(shù)并創(chuàng)建Celery實(shí)例
        self.prog_name = os.path.basename(argv[0])
        return self.handle_argv(self.prog_name, argv[1:]) # 調(diào)用當(dāng)前對(duì)象的handle_argv函數(shù)

在該函數(shù)中會(huì)調(diào)用setup_app_from_commandline解析命令行參數(shù)并創(chuàng)建應(yīng)用(用戶的app也是在這一步被加載),之后調(diào)用handle_argv函數(shù)繼續(xù)處理,這里需要注意,代碼中調(diào)用的handle_argv函數(shù)是CeleryCommand中定義的,接下來我們分析handle_argv函數(shù)。

    def handle_argv(self, prog_name, argv, **kwargs):
        self.prog_name = self.prepare_prog_name(prog_name)
        argv = self._relocate_args_from_start(argv)
        _, argv = self.prepare_args(None, argv)
        try:
            command = argv[0]
        except IndexError:
            command, argv = 'help', ['help']
        return self.execute(command, argv)

這里可以看到,在解析了參數(shù)之后,調(diào)用了execute函數(shù),其中第一個(gè)參數(shù)為命令行參數(shù)中解析出來的,按照官網(wǎng)的示例,這里的字符串為"worker",(后面的分析都暫時(shí)認(rèn)為command的值是"worker")。之后進(jìn)入到execute函數(shù)中

    def execute(self, command, argv=None):
        try:
            cls = self.commands[command]
        except KeyError:
            cls, argv = self.commands['help'], ['help']
        cls = self.commands.get(command) or self.commands['help'] # 根據(jù)傳入的command字符串獲取對(duì)應(yīng)的類
        try:
            return cls(
                app=self.app, on_error=self.on_error,
                no_color=self.no_color, quiet=self.quiet,
                on_usage_error=partial(self.on_usage_error, command=command),
            ).run_from_argv(self.prog_name, argv[1:], command=argv[0]) # 初始化并啟動(dòng)實(shí)例
        except self.UsageError as exc:
            self.on_usage_error(exc)
            return exc.status
        except self.Error as exc:
            self.on_error(exc)
            return exc.status

這里我們看到execute函數(shù)主要做了兩件事,一是根據(jù)傳入的command查找類;二是創(chuàng)建上一步的類的實(shí)例并啟動(dòng)。
轉(zhuǎn)到worker類的定義,在文件celery/bin/worker.py中,可以看到,該類也是繼承自Command類,worker類實(shí)例的初始化也是調(diào)用Command類的__init__,初始化完成后會(huì)調(diào)用run_from_argv啟動(dòng),該函數(shù)只是回調(diào)了一下當(dāng)前對(duì)象的handle_argv函數(shù)。由于worker類沒有重寫handle_argv,因此會(huì)調(diào)用Command類中的該函數(shù)。

    def handle_argv(self, prog_name, argv, command=None):
        """Parse arguments from argv and dispatch to :meth:`run`.

        Warning:
            Exits with an error message if :attr:`supports_args` is disabled
            and ``argv`` contains positional arguments.

        Arguments:
            prog_name (str): The program name (``argv[0]``).
            argv (List[str]): Rest of command-line arguments.
        """
        options, args = self.prepare_args(
            *self.parse_options(prog_name, argv, command))
        return self(*args, **options)

在該函數(shù)中,會(huì)調(diào)用當(dāng)前對(duì)象的__call__函數(shù),同樣地,這里也是調(diào)用Command類中定義的該函數(shù)。該函數(shù)中,會(huì)調(diào)用當(dāng)前對(duì)象的run函數(shù),這里調(diào)用的便是worker類中定義的run函數(shù)。在該函數(shù)中,會(huì)首先進(jìn)行一些配置,之后便是創(chuàng)建真正的Worker類的對(duì)象之后調(diào)用start函數(shù)啟動(dòng)。
本階段的調(diào)用時(shí)序圖可以整理如下:

啟動(dòng)時(shí)序圖
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容