Celery全面學(xué)習(xí)筆記

來源

https://blog.51cto.com/steed/2292346?source=dra

介紹

講師的博客:https://www.cnblogs.com/alex3714/articles/6351797.html
文檔(入門的部分是中文的):http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#first-steps
網(wǎng)上更多資料:https://blog.csdn.net/freeking101/article/details/74707619

Celery 是 Distributed Task Queue,分布式任務(wù)隊(duì)列。分布式?jīng)Q定了可以有多個(gè) worker 的存在,隊(duì)列表示其是異步操作。

Celery 核心模塊

Celery有一下5個(gè)核心角色
Task
就是任務(wù),有異步任務(wù)和定時(shí)任務(wù)
Broker
中間人,接收生產(chǎn)者發(fā)來的消息即Task,將任務(wù)存入隊(duì)列。任務(wù)的消費(fèi)者是Worker。Celery本身不提供隊(duì)列服務(wù),推薦用Redis或RabbitMQ實(shí)現(xiàn)隊(duì)列服務(wù)。
Worker
執(zhí)行任務(wù)的單元,它實(shí)時(shí)監(jiān)控消息隊(duì)列,如果有任務(wù)就獲取任務(wù)并執(zhí)行它。
Beat
定時(shí)任務(wù)調(diào)度器,根據(jù)配置定時(shí)將任務(wù)發(fā)送給Broler。
Backend
用于存儲任務(wù)的執(zhí)行結(jié)果。
各個(gè)角色間的關(guān)系看下面這張圖理解一下:

Celery 全面學(xué)習(xí)筆記

安裝

Celery4.x 開始不再支持Windows平臺了。3.1.26是最后目前最新的3.x版本,下面裝的是3.1.25。

pip install celery
pip install celery==3.1.25

建議使用的Broker只有RabbitMQ和redis這兩個(gè)。RabbitMQ只要準(zhǔn)備好服務(wù),不需要安裝額外的模塊。
如果要用redis,那么還要準(zhǔn)備redis服務(wù),以及安裝redis模塊:

pip install redis

上面的安裝也可以用下面的命令把redis一起裝上:

pip install -U 'celery[redis]'

驗(yàn)證

使用命令 celery --version 查看版本,順便驗(yàn)證:

>celery --version
'celery' 不是內(nèi)部或外部命令,也不是可運(yùn)行的程序
或批處理文件。

這里報(bào)錯(cuò)是因?yàn)闆]有把celery加到環(huán)境變量里,所以找不到程序。不過我也不想加,所以把路徑打全也好了:

>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery --version
3.1.25 (Cipater)

基本操作

這里跑一個(gè)簡單的任務(wù),最后再獲取到任務(wù)的執(zhí)行結(jié)果。

創(chuàng)建任務(wù)

先按下面寫一段代碼:

# task1.py

from celery import Celery

# 創(chuàng)建Celery實(shí)例
app = Celery('tasks',
             broker='redis://192.168.246.11:6379/0',
             )

# 創(chuàng)建任務(wù)
@app.task
def add(x, y):
    print("計(jì)算2個(gè)值的和: %s %s" % (x, y))
    return x+y

如果使用RabbitMQ,則把broker修改成這個(gè) broker='amqp://192.168.3.108' 。

啟動(dòng)Worker

啟動(dòng)Celery Worker來開始監(jiān)聽并執(zhí)行任務(wù):

$ celery -A task1 worker --loglevel=info 
$ celery -A task1 worker --l debug  # 或者可以這么起 

參數(shù) -A 后跟的是Celery實(shí)例,實(shí)例的名字可以省略,寫全是 task1.app 。你要把目錄切換到task1文件所在的目錄執(zhí)行命令,或者看看有沒有參數(shù)能把文件目錄加到python的環(huán)境變量中去。因?yàn)?A 之后的參數(shù)是作為python的模塊來導(dǎo)入的。所以像下面這樣,我也把Worker跑起來了:

G:\>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery -A Steed.Documents.PycharmProjects.Celery.task1 worker --loglevel=info
[2018-09-28 17:55:10,715: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@IDX-xujf v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x1fb5056fda0
- ** ---------- .> transport:   redis://192.168.246.11:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

[tasks]
  . Steed.Documents.PycharmProjects.Celery.task1.add

[2018-09-28 17:55:10,864: INFO/MainProcess] Connected to redis://192.168.246.11:6379/0
[2018-09-28 17:55:10,922: INFO/MainProcess] mingle: searching for neighbors
[2018-09-28 17:55:11,961: INFO/MainProcess] mingle: all alone
[2018-09-28 17:55:11,980: WARNING/MainProcess] celery@IDX-xujf ready.

調(diào)用任務(wù)

要給Worker發(fā)送任務(wù),需要調(diào)用 delay() 方法,下面是在IDLE上操作的:

>>> import sys
>>> dir = r"G:\Steed\Documents\PycharmProjects\Celery"
>>> sys.path.append(dir)  # 我的任務(wù)文件不在環(huán)境變量里,IDLE找不到
>>> from task1 import add
>>> add.delay(1, 2)
<AsyncResult: 4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a>
>>> 

Worker顯示了下面這些信息

[2018-09-29 11:10:33,103: INFO/MainProcess] Received task: task1.add[4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a]
[2018-09-29 11:10:33,107: WARNING/Worker-1] 計(jì)算2個(gè)值的和: 1 2
[2018-09-29 11:10:33,109: INFO/MainProcess] Task task1.add[4f6613cb-3d2c-4a5e-ae58-bf9f28c3ec0a] succeeded in 0s: 3

上面只是一個(gè)發(fā)送任務(wù)的調(diào)用,結(jié)果是拿不到的。上面也沒有接收返回值,這次把返回值保存到起來:

>>> t = add.delay(3, 4)
>>> type(t)  # 查看返回值的類型
<class 'celery.result.AsyncResult'>
>>> t.get()  # 這句會(huì)報(bào)錯(cuò)
Traceback (most recent call last):
  File "<pyshell#16>", line 1, in <module>
    t.get()
  File "G:\Steed\Documents\PycharmProjects\venv\Celery\Lib\site-packages\celery\result.py", line 169, in get
    no_ack=no_ack,
  File "G:\Steed\Documents\PycharmProjects\venv\Celery\Lib\site-packages\celery\backends\base.py", line 616, in _is_disabled
    'No result backend configured.  '
NotImplementedError: No result backend configured.  Please see the documentation for more information.

這里是實(shí)例化的時(shí)候,沒有定義backend,就是保存任務(wù)結(jié)果的位置。

獲取返回結(jié)果

修改最初的任務(wù)的代碼,在實(shí)例化的時(shí)候加上backend參數(shù),指定保存任務(wù)結(jié)果的位置。這里把結(jié)果也存到同一個(gè)redis里:

from celery import Celery

app = Celery('tasks',
             broker='redis://192.168.246.11',
             backend='redis://192.168.246.11',  # 這次把端口號什么的都省了
             )

@app.task
def add(x, y):
    print("計(jì)算2個(gè)值的和: %s %s" % (x, y))
    return x+y

然后要重啟Worker,IDLE也要重啟,現(xiàn)在可以獲取到任務(wù)的返回結(jié)果了:

>>> t = add.delay(1, 1)
>>> t.get()
2
>>> 

如果是RabbitMQ,則app的初始話設(shè)置就這么寫:

app = Celery('tasks',
             broker='amqp://192.168.3.108',
             backend='rpc://192.168.3.108',  # 新版本rpc將初步替代amqp,用的還是RabbitMQ
             # backend='amqp://192.168.3.108',  # 如果是舊版本,沒有rpc,那只能用amqp
                         )

其他操作

get進(jìn)入阻塞
上面的任務(wù)執(zhí)行的太快了,準(zhǔn)備一個(gè)需要執(zhí)行一段時(shí)間的任務(wù):

import time

@app.task
def upper(v):
    for i in range(10):
        time.sleep(1)
        print(i)
    return v.upper()

用get調(diào)用任務(wù)會(huì)進(jìn)入阻塞,直到任務(wù)返回結(jié)果,這樣就沒有異步的效果了:

>>> t = upper.delay("abc")
>>> t.get()
'ABC'

ready獲取任務(wù)是否完成,不阻塞
ready()方法可以返回任務(wù)是否執(zhí)行完成,等到返回True了再去get,馬上能拿到結(jié)果:

>>> t = upper.delay("abcd")
>>> t.ready()
False
>>> t.ready()
False
>>> t.ready()
False
>>> t.ready()
True
>>> t.get()
'ABCD'
>> 

get設(shè)置超時(shí)時(shí)間
還可以給get設(shè)置一個(gè)超時(shí)時(shí)間,如果超時(shí),會(huì)拋出異常:

>>> t = upper.delay("abcde")
>>> t.get(timeout=11)
'ABCDE'
>>> t = upper.delay("abcde")
>>> t.get(timeout=1)
Traceback (most recent call last):
  File "<pyshell#17>", line 1, in <module>
    t.get(timeout=1)
  File "G:\Steed\Documents\PycharmProjects\venv\Celery\lib\site-packages\celery\result.py", line 169, in get
    no_ack=no_ack,
  File "G:\Steed\Documents\PycharmProjects\venv\Celery\lib\site-packages\celery\backends\base.py", line 238, in wait_for
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.
>>> 

任務(wù)報(bào)錯(cuò)
如果任務(wù)執(zhí)行報(bào)錯(cuò),比如執(zhí)行這個(gè)任務(wù):

>>> t = upper.delay(123)
>>> 

那么Worker那邊會(huì)顯示錯(cuò)誤的內(nèi)容:

[2018-09-29 12:57:07,077: ERROR/MainProcess] Task task1.upper[11820ee6-6936-4680-93c2-462487ec927e] raised unexpected: AttributeError("'int' object has no attribute 'upper'",)
Traceback (most recent call last):
  File "g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\app\trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\app\trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "G:\Steed\Documents\PycharmProjects\Celery\task1.py", line 25, in upper
    return v.upper()
AttributeError: 'int' object has no attribute 'upper'

然后再get結(jié)果的時(shí)候,會(huì)把這個(gè)錯(cuò)誤作為異常拋出,這樣很不友好:

>>> t = upper.delay(123)
>>> t.get()
Traceback (most recent call last):
  File "<pyshell#27>", line 1, in <module>
    t.get()
  File "G:\Steed\Documents\PycharmProjects\venv\Celery\lib\site-packages\celery\result.py", line 175, in get
    raise meta['result']
AttributeError: 'int' object has no attribute 'upper'
>>>

get設(shè)置只獲取錯(cuò)誤結(jié)果,不觸發(fā)異常

>>> t.get(propagate=False)
AttributeError("'int' object has no attribute 'upper'",)
>>>

traceback 里面存著錯(cuò)誤信息

>>> t.traceback
'Traceback (most recent call last):\n  File "g:\\steed\\documents\\pycharmprojects\\venv\\celery\\lib\\site-packages\\celery\\app\\trace.py", line 240, in trace_task\n    R = retval = fun(*args, **kwargs)\n  File "g:\\steed\\documents\\pycharmprojects\\venv\\celery\\lib\\site-packages\\celery\\app\\trace.py", line 438, in __protected_call__\n    return self.run(*args, **kwargs)\n  File "G:\\Steed\\Documents\\PycharmProjects\\Celery\\task1.py", line 25, in upper\n    return v.upper()\nAttributeError: \'int\' object has no attribute \'upper\'\n'
>>> 

小結(jié)

啟動(dòng)Celery Worker來開始監(jiān)聽并執(zhí)行任務(wù)

$ celery -A tasks worker --loglevel=info 

調(diào)用任務(wù)

>>> from tasks import add
>>> t = add.delay(4, 4)

同步拿結(jié)果

>>> t.get()
>>> t.get(timeout=1)

檢查任務(wù)是否完成

>>> t.ready()

如果出錯(cuò),獲取錯(cuò)誤結(jié)果,不觸發(fā)異常

>>> t.get(propagate=False)
>>> t.traceback  # 打印異常詳細(xì)結(jié)果

在項(xiàng)目中使用Celery

可以把celery配置成一個(gè)應(yīng)用,假設(shè)應(yīng)用名字是CeleryPro,目錄格式如下:

CeleryPro
├─__init.py
├─celery.py
├─tasks.py

這里的連接文件命名必須為celery.py,其他名字隨意

celery文件

這個(gè)文件名必須是celery.py:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('CeleryPro',
             broker='redis://192.168.246.11',
             backend='redis://192.168.246.11',
             include=['CeleryPro.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

第一句 from __future__ import absolute_import, unicode_literals ,后面的unicode_literals不知道是什么。不過前面的absolute_import是絕對引入。因?yàn)檫@個(gè)文件的文件名就是celery,所以默認(rèn)后面的 form celery 是引入這個(gè)文件,但我們實(shí)際需要的是引入celery模塊,所以用了絕對引入這個(gè)模塊。如果要引入這個(gè)文件,可以這么寫 from .celery ,加個(gè)點(diǎn),下面的tasks里會(huì)用到

tasks文件

這個(gè)文件開始兩行就多了一個(gè)點(diǎn),這里要導(dǎo)入上面的celery.py文件。后面只要寫各種任務(wù)加上裝飾器就可以了:

from __future__ import absolute_import, unicode_literals
from .celery import app
import time

@app.task
def add(x, y):
    print("計(jì)算2個(gè)值的和: %s %s" % (x, y))
    return x+y

@app.task
def upper(v):
    for i in range(10):
        time.sleep(1)
        print(i)
    return v.upper()

啟動(dòng)worker

啟動(dòng)的時(shí)候,-A 參數(shù)后面用應(yīng)用名稱 CeleryPro 。你還需要cd到你CeleryPro的父級目錄上啟動(dòng),否則找不到:

>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery -A CeleryPro worker -l info
[2018-09-29 15:06:20,818: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@IDX-xujf v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         CeleryPro:0x21deadaf470
- ** ---------- .> transport:   redis://192.168.246.11:6379//
- ** ---------- .> results:     redis://192.168.246.11/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

[tasks]
  . CeleryPro.tasks.add
  . CeleryPro.tasks.upper

[2018-09-29 15:06:20,953: INFO/MainProcess] Connected to redis://192.168.246.11:6379//
[2018-09-29 15:06:20,983: INFO/MainProcess] mingle: searching for neighbors
[2018-09-29 15:06:21,994: INFO/MainProcess] mingle: all alone
[2018-09-29 15:06:22,055: WARNING/MainProcess] celery@IDX-xujf ready.

各種啟動(dòng)的姿勢
這里注意用的都是CeleryPro:

celery -A CeleryPro worker -loglevel=info  # 前臺啟動(dòng)不推薦
celery -A CeleryPro worker -l info  # 前臺啟動(dòng)簡寫
celery multi start w1 -A  CeleryPro -l info  # 推薦用后臺啟動(dòng)

調(diào)用任務(wù)

調(diào)用任務(wù)也是在CeleryPro的父級目錄下調(diào)用就好了,各種用法都一樣。
操作都要在CeleryPro的父級目錄下執(zhí)行,就是說只要保證CeleryPro的父級目錄在環(huán)境變量里?;蛘哂?sys.path.append() 加到環(huán)境變量里去。
這里理解為把celery包裝成了你項(xiàng)目里的一個(gè)應(yīng)用,應(yīng)用的內(nèi)容都放在了CeleryPro這個(gè)文件夾下。而CeleryPro就作為你的項(xiàng)目里的一個(gè)模塊。而你項(xiàng)目的主目錄一定在項(xiàng)目啟動(dòng)的時(shí)候加到環(huán)境變量里的,所以其實(shí)這樣包裝好之后再項(xiàng)目里使用應(yīng)該很方便。

后臺啟動(dòng)多個(gè)Worker

啟動(dòng)命令:

  • celery -A 項(xiàng)目名 worker -loglevel=info : 前臺啟動(dòng)命令
  • celery multi start w1 -A 項(xiàng)目名 -l info : 后臺啟動(dòng)命令
  • celery multi restart w1 -A 項(xiàng)目名 -l info : 后臺重啟命令
  • celery multi stop w1 -A 項(xiàng)目名 -l info : 后臺停止命令

前后臺的區(qū)別:后臺是通過mult啟動(dòng)的。
w1是worker的名稱,可以后臺啟動(dòng)多個(gè)worker,每個(gè)worker有一個(gè)一名稱。
即便是所有的worker都已經(jīng)done了,用戶任然啟動(dòng)了任務(wù),所有的任務(wù)都會(huì)保留,直到有worker來執(zhí)行并返回結(jié)果。
如果前臺啟動(dòng)的worker斷開了,那么worker的任務(wù)會(huì)消失;如果后臺啟動(dòng)的worker斷開了,后臺的任務(wù)仍然在。沒太理解這句的意思。
查看當(dāng)前還有多少個(gè)Celery的worker
似乎也就只能通過ps來查看了,下面先起了3個(gè)后臺Worker,ps看一下,然后停掉了一個(gè)Worker,再用ps看了一下:

[root@Python3 ~]# celery multi start w1 -A CeleryPro -l info
celery multi v4.2.1 (windowlicker)
> Starting nodes...
    > w1@Python3: OK
[root@Python3 ~]# celery multi start w2 -A CeleryPro -l info
celery multi v4.2.1 (windowlicker)
> Starting nodes...
    > w2@Python3: OK
[root@Python3 ~]# celery multi start w3 -A CeleryPro -l info
celery multi v4.2.1 (windowlicker)
> Starting nodes...
    > w3@Python3: OK
[root@Python3 ~]# ps -ef | grep celery
root       1346      1  0 20:49 ?        00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w1%I.log --pidfile=w1.pid --hostname=w1@Python3
root       1350   1346  0 20:49 ?        00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w1%I.log --pidfile=w1.pid --hostname=w1@Python3
root       1360      1  0 20:49 ?        00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3
root       1364   1360  0 20:49 ?        00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3
root       1374      1  0 20:49 ?        00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3
root       1378   1374  0 20:49 ?        00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3
root       1391   1251  0 20:55 pts/0    00:00:00 grep --color=auto celery
[root@Python3 ~]# celery multi stop w1
celery multi v4.2.1 (windowlicker)
> Stopping nodes...
    > w1@Python3: TERM -> 1346
[root@Python3 ~]# ps -ef | grep celery
root       1360      1  0 20:49 ?        00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3
root       1364   1360  0 20:49 ?        00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w2%I.log --pidfile=w2.pid --hostname=w2@Python3
root       1374      1  0 20:49 ?        00:00:01 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3
root       1378   1374  0 20:49 ?        00:00:00 /usr/bin/python3.6 -m celery worker -A CeleryPro -l info --logfile=w3%I.log --pidfile=w3.pid --hostname=w3@Python3
root       1398   1251  0 20:57 pts/0    00:00:00 grep --color=auto celery
[root@Python3 ~]# 

Windows平臺不支持

錯(cuò)誤信息如下:

  File "g:\steed\documents\pycharmprojects\venv\celery\lib\site-packages\celery\platforms.py", line 429, in detached
    raise RuntimeError('This platform does not support detach.')
RuntimeError: This platform does not support detach.
        > w1@IDX-xujf: * Child terminated with errorcode 1
FAILED

根據(jù)錯(cuò)誤信息查看一下429行的代碼:

    if not resource:
        raise RuntimeError('This platform does not support detach.')

這里判斷了一下resource,然后就直接拋出異常了。resource具體是什么,可以在這個(gè)文件里搜索一下變量名(resource)

# 在開頭獲取了這個(gè)resource的值
resource = try_import('resource')

# 上面的try_import方法,在另外一個(gè)文件里
def try_import(module, default=None):
    """Try to import and return module, or return
    None if the module does not exist."""
    try:
        return importlib.import_module(module)
    except ImportError:
        return default

# 下面有一個(gè)方法注釋里表明resource為None代表是Windows
def get_fdmax(default=None):
    """Return the maximum number of open file descriptors
    on this system.

    :keyword default: Value returned if there's no file
                      descriptor limit.

    """
    try:
        return os.sysconf('SC_OPEN_MAX')
    except:
        pass
    if resource is None:  # Windows
        return default
    fdmax = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
    if fdmax == resource.RLIM_INFINITY:
        return default
    return fdmax

上面做的就是要嘗試導(dǎo)入一個(gè)模塊 “resource” 。該模塊只用于Unix。

定時(shí)任務(wù)

3版本的定時(shí)任務(wù)和4版本還是有很大差別的。另外4版本里有更多的定時(shí)任務(wù)。

Celery3

繼續(xù)使用之前的2個(gè)任務(wù),只需要為celery添加一些配置(conf),為任務(wù)設(shè)置計(jì)劃。
app.conf里的參數(shù)都是全大寫的,這里大小寫敏感,不能用小寫:

# CeleryPro/tasks.py
from __future__ import absolute_import, unicode_literals
from .celery import app
import time

@app.task
def add(x, y):
    print("計(jì)算2個(gè)值的和: %s %s" % (x, y))
    return x+y

@app.task
def upper(v):
    for i in range(10):
        time.sleep(1)
        print(i)
    return v.upper()

# CeleryPro/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

app = Celery('CeleryPro',
             broker='redis://192.168.246.11',
             backend='redis://192.168.246.11',
             include=['CeleryPro.tasks'])

app.conf.CELERYBEAT_SCHEDULE = {
    'add every 10 seconds': {
        'task': 'CeleryPro.tasks.add',
        'schedule': timedelta(seconds=10),  # 可以用timedelta對象
        # 'schedule': 10,  # 也支持直接用數(shù)字表示秒數(shù)
        'args': (1, 2)
    },
    'upper every 2 minutes': {
        'task': 'CeleryPro.tasks.upper',
        'schedule': crontab(minute='*/2'),
        'args': ('abc', ),
    },
}

# app.conf.CELERY_TIMEZONE = 'UTC'
app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'

# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
)

if __name__ == '__main__':
    app.start()

任務(wù)結(jié)果過期設(shè)置 `CELERY_TASK_RESULT_EXPIRES=3600' 。默認(rèn)設(shè)置是1天,官網(wǎng)介紹這是靠一個(gè)內(nèi)置的周期性任務(wù)把超過時(shí)限的任務(wù)結(jié)果給清除的。

A built-in periodic task will delete the results after this time (celery.task.backend_cleanup).

設(shè)置完成后,啟動(dòng)Worker,啟動(dòng)Beat就OK了:

G:\Steed\Documents\PycharmProjects\Celery>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery.exe -A CeleryPro worker -l info
G:\Steed\Documents\PycharmProjects\Celery>G:\Steed\Documents\PycharmProjects\venv\Celery\Scripts\celery.exe -A CeleryPro beat -l info

3版本里的參數(shù)都是全大寫的,到了4版本開始改用小寫了,并且很多參數(shù)名也變了。這里有新舊參數(shù)的對應(yīng)關(guān)系:
http://docs.celeryproject.org/en/latest/userguide/configuration.html#configuration

Celery4

新版的好處是,可以把定時(shí)任務(wù)和普通的任務(wù)一樣單獨(dú)定義了。多了 @app.on_after_configure.connect 這個(gè)裝飾器,3版本是沒有這個(gè)裝飾器的。
寫代碼
單獨(dú)再創(chuàng)建一個(gè)py文件,存放定時(shí)任務(wù):

# CeleryPro/periodic4.py
from __future__ import absolute_import, unicode_literals
from .celery import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每10秒執(zhí)行一次
    sender.add_periodic_task(10.0, hello.s(), name='hello every 10') # 給任務(wù)取個(gè)名字

    # 每30秒執(zhí)行一次
    sender.add_periodic_task(30, upper.s('abcdefg'), expires=10)  # 設(shè)置任務(wù)超時(shí)時(shí)間10秒

    # 執(zhí)行周期和Linux的計(jì)劃任務(wù)crontab設(shè)置一樣
    sender.add_periodic_task(
        crontab(hour='*', minute='*/2', day_of_week='*'),
        add.s(11, 22),
    )

@app.task
def hello():
    print('Hello World')

@app.task
def upper(arg):
    return arg.upper()

@app.task
def add(x, y):
    print("計(jì)算2個(gè)值的和: %s %s" % (x, y))
    return x+y

上面一共定了3個(gè)計(jì)劃。
name參數(shù)給計(jì)劃取名,這樣這個(gè)任務(wù)報(bào)告的時(shí)候就會(huì)使用name的值,像這樣:hello every 10。否則默認(rèn)顯示的是調(diào)用函數(shù)的命令,像這樣:CeleryPro.periodic4.upper('abcdefg')。
expires參數(shù)設(shè)置任務(wù)超時(shí)時(shí)間,超時(shí)未完成,可能就放棄了(沒測試)。
修改一下之前的celery.py文件,把新寫的任務(wù)文件添加到include的列表里。順便我這里改用RabbitMQ玩一下:

# CeleryPro/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('CeleryPro',
             broker='amqp://192.168.3.108',
             backend='rpc',
             include=['CeleryPro.tasks', 'CeleryPro.periodic4'])

app.conf.timezone = 'UTC'  # 計(jì)劃任務(wù)默認(rèn)用的是UTC時(shí)間
# app.conf.timezone = 'Asia/Shanghai'  # 也可以更改為北京時(shí)間

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

啟動(dòng)worker
啟動(dòng)方法和之前一樣:

[root@Python3 ~]# celery -A CeleryPro worker -l info
/usr/local/lib/python3.6/site-packages/celery/platforms.py:796: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,

 -------------- celery@Python3 v4.2.1 (windowlicker)
---- **** ----- 
--- * ***  * -- Linux-3.10.0-862.el7.x86_64-x86_64-with-centos-7.5.1804-Core 2018-10-01 12:46:35
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         CeleryPro:0x7ffb0c8b2908
- ** ---------- .> transport:   amqp://guest:**@192.168.3.108:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . CeleryPro.periodic4.add
  . CeleryPro.periodic4.hello
  . CeleryPro.periodic4.upper
  . CeleryPro.tasks.add
  . CeleryPro.tasks.upper

[2018-10-01 12:46:35,187: INFO/MainProcess] Connected to amqp://guest:**@192.168.3.108:5672//
[2018-10-01 12:46:35,216: INFO/MainProcess] mingle: searching for neighbors
[2018-10-01 12:46:36,266: INFO/MainProcess] mingle: all alone
[2018-10-01 12:46:36,307: INFO/MainProcess] celery@Python3 ready.

啟動(dòng)后看一下[tasks],新加的定時(shí)任務(wù)已經(jīng)列出來了,之前的任務(wù)也都在。
啟動(dòng)Beat
這里-A后面要寫全 CeleryPro.periodic4 ,和啟動(dòng)Worker的參數(shù)有點(diǎn)不一樣:

[root@Python3 ~]# celery -A CeleryPro.periodic4 beat -l info
celery beat v4.2.1 (windowlicker) is starting.
__    -    ... __   -        _
LocalTime -> 2018-10-01 12:45:04
Configuration ->
    . broker -> amqp://guest:**@192.168.3.108:5672//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2018-10-01 12:45:04,934: INFO/MainProcess] beat: Starting...
[2018-10-01 12:45:05,006: INFO/MainProcess] Scheduler: Sending due task hello every 10 (CeleryPro.periodic4.hello)
[2018-10-01 12:45:05,356: INFO/MainProcess] Scheduler: Sending due task CeleryPro.periodic4.upper('abcdefg') (CeleryPro.periodic4.upper)

啟動(dòng)之后馬上就把2個(gè)每隔一段時(shí)間執(zhí)行的任務(wù)發(fā)送給Worker執(zhí)行了,之后會(huì)根據(jù)定義的間隔繼續(xù)發(fā)送。
另外一個(gè)用crontab設(shè)置的任務(wù)需要等到時(shí)間匹配上了才會(huì)發(fā)送。當(dāng)時(shí)是45分,等到46分就會(huì)執(zhí)行了。

舊版本的做法一樣可以用
上面說了,新版主要是多提供了一個(gè)裝飾器。不用新提供的裝飾器,依然可以把定時(shí)任務(wù)寫在配置里:

# CeleryPro/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('CeleryPro',
             broker='amqp://192.168.3.108',
             backend='rpc',
             include=['CeleryPro.tasks'])

app.conf.beat_schedule = {
    'every 5 seconds': {
        'task': 'CeleryPro.tasks.upper',
        'schedule': 5,
        'args': ('xyz',)
    }
}

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

這里就是在配置里設(shè)置,定時(shí)啟動(dòng)一個(gè)普通任務(wù)。這里把include里的CeleryPro.periodic4刪掉了,留著也沒影響。
任務(wù)文件tasks.py還是之前的那個(gè),具體如下:

# CeleryPro/tasks.py
from __future__ import absolute_import, unicode_literals
from .celery import app
import time

@app.task
def add(x, y):
    print("計(jì)算2個(gè)值的和: %s %s" % (x, y))
    return x+y

@app.task
def upper(v):
    for i in range(10):
        time.sleep(1)
        print(i)
    return v.upper()

最后啟動(dòng)Worker,啟動(dòng)Breat試一下:

[root@Python3 ~]# celery -A CeleryPro beat -l info

這里Beat的-A參數(shù)用 CeleryPro 也能啟動(dòng)這里的定時(shí)任務(wù)。CeleryPro.tasks 效果也是一樣的。另外如果把periodic4.py加到include列表里去,用 CeleryPro.periodic4 參數(shù)啟動(dòng)的話,這里的定時(shí)任務(wù)也會(huì)啟動(dòng)。
這里也是支持用crontab的,用法和之前的一樣,把schedule參數(shù)的值換成調(diào)用crontab的函數(shù)。

小結(jié)

上面的兩種定時(shí)任務(wù)的方法,各有應(yīng)用場景。
如果要改任務(wù)執(zhí)行的函數(shù),只能改代碼,然后重啟Worker了。
這里要說的是改計(jì)劃(包括新增、取消和修改計(jì)劃周期),但是任務(wù)執(zhí)行的函數(shù)不變。用@app.on_after_configure.connect裝飾器,是把計(jì)劃寫死在一個(gè)函數(shù)里了。似乎無法動(dòng)態(tài)添加新任務(wù)。不過好處是結(jié)構(gòu)比較清晰。
而后一種方法,只要更新一下 app.conf.beat_schedule 這個(gè)字典里的配置信息,然后重啟Beat就能生效了。

crontab 舉例

下面是crontab的一些例子:

Example Meaning
crontab() Execute every minute.
crontab(minute=0, hour=0) Execute daily at midnight.
crontab(minute=0, hour='*/3') Execute every three hours: 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,hour='0,3,6,9,12,15,18,21') Same as previous.
crontab(minute='*/15') Execute every 15 minutes.
crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
crontab(minute='',hour='', day_of_week='sun') Same as previous.
crontab(minute='*/10',hour='3,17,22', day_of_week='thu,fri') Execute every ten minutes, but only between 3-4 am, 5-6 pm and 10-11 pm on Thursdays or Fridays.
crontab(minute=0, hour='/2,/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
crontab(minute=0, hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(day_of_month='2') Execute on the second day of every month.
crontab(day_of_month='2-30/3') Execute on every even numbered day.
crontab(day_of_month='1-7,15-21') Execute on the first and third weeks of the month.
crontab(day_of_month='11',month_of_year='5') Execute on 11th of May every year.
crontab(month_of_year='*/3') Execute on the first month of every quarter.

日程表(Solar schedules)

4版本里還提供這樣的方法來指定計(jì)劃

If you have a task that should be executed according to sunrise, sunset, dawn or dusk, you can use the solar schedule type:
如果你有一個(gè)任務(wù),是根據(jù)日出,日落,黎明或黃昏來執(zhí)行的,你可以使用日程表類型:

所有事件都是根據(jù)UTC時(shí)間計(jì)算的,所以不受時(shí)區(qū)設(shè)置影響。官網(wǎng)的例子:

from celery.schedules import solar

app.conf.beat_schedule = {
    # Executes at sunset in Melbourne
    'add-at-melbourne-sunset': {
        'task': 'tasks.add',
        'schedule': solar('sunset', -37.81753, 144.96715),
        'args': (16, 16),
    },
}

這里solar函數(shù)要提供3個(gè)參數(shù),事件、緯度、經(jīng)度。經(jīng)緯度使用的標(biāo)志看下表:

Sign Argument Meaning
+ latitude North
- latitude South
+ longitude East
- longitude West

支持的事件類型如下:

Event Meaning
dawn_astronomical Execute at the moment after which the sky is no longer completely dark. This is when the sun is 18 degrees below the horizon.
dawn_nautical Execute when there’s enough sunlight for the horizon and some objects to be distinguishable; formally, when the sun is 12 degrees below the horizon.
dawn_civil Execute when there’s enough light for objects to be distinguishable so that outdoor activities can commence; formally, when the Sun is 6 degrees below the horizon.
sunrise Execute when the upper edge of the sun appears over the eastern horizon in the morning.
solar_noon Execute when the sun is highest above the horizon on that day.
sunset Execute when the trailing edge of the sun disappears over the western horizon in the evening.
dusk_civil Execute at the end of civil twilight, when objects are still distinguishable and some stars and planets are visible. Formally, when the sun is 6 degrees below the horizon.
dusk_nautical Execute when the sun is 12 degrees below the horizon. Objects are no longer distinguishable, and the horizon is no longer visible to the naked eye.
dusk_astronomical Execute at the moment after which the sky becomes completely dark; formally, when the sun is 18 degrees below the horizon.

在Django中使用的最佳實(shí)踐

在django中使用的話,可以把celery的配置直接寫在django的settings.py文件里。另外任務(wù)函數(shù)則寫在tasks.py文件里放在各個(gè)app的目錄下。每個(gè)app下都可以有一個(gè)tasks.py,所有的任務(wù)都是共享的。

創(chuàng)建目錄結(jié)構(gòu)

創(chuàng)建一個(gè)django的項(xiàng)目,項(xiàng)目名稱就叫UsingCeleryWithDjango,app的名字就app01好了。創(chuàng)建好項(xiàng)目后,在項(xiàng)目目錄下創(chuàng)建CeleryPro目錄,目錄下建一個(gè)celery.py文件。目錄結(jié)構(gòu)如下:

UsingCeleryWithDjango
│
├─manage.py
│
├─app01
│  │  admin.py
│  │  apps.py
│  │  models.py
│  │  tests.py
│  │  views.py
│  └  __init__.py
│
├─CeleryPro
│  │  celery.py
│  └  __init__.py
│
├─templates
│
└─UsingCeleryWithDjango
    │  settings.py
    │  urls.py
    │  wsgi.py
    └  __init__.py

上面只要關(guān)注一下CeleryPro的結(jié)構(gòu)和位置就好了,其他都是創(chuàng)建django項(xiàng)目后的默認(rèn)內(nèi)容。
CeleryPro/celery.py 文件,是用來創(chuàng)建celery實(shí)例的。
CeleryPro/init.py 文件,需要確保當(dāng)Django啟動(dòng)時(shí)加載celery。之后在app里會(huì)用到celery模塊里的 @shared_task 這個(gè)裝飾器。

CeleryPro 示例代碼

具體的示例文件在官方的github里,3版本和4版本有一些的區(qū)別。
最新版的:https://github.com/celery/celery/tree/master/examples/django
3.1版本的:https://github.com/celery/celery/tree/3.1/examples/django
Github里也可以自行切換各個(gè)版本的分支查看。
下面我就在Windows上用3.1.25版本搞一遍了。

# UsingCeleryWithDjango/CeleryPro/__init__.py
from __future__ import absolute_import, unicode_literals
__author__ = '749B'

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

# UsingCeleryWithDjango/CeleryPro/celery.py
from __future__ import absolute_import
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'UsingCeleryWithDjango.settings')

from django.conf import settings  # noqa

app = Celery('CeleryPro')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
# 自動(dòng)發(fā)現(xiàn)所有app下的tasks
# 但是,新版django的INSTALLED_APPS的寫法無法發(fā)現(xiàn)到
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  # 這是官方示例的寫法
'''
# 這里是setting.py里的INSTALLED_APPS部分
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app01.apps.App01Config',  # 這種寫法自動(dòng)發(fā)現(xiàn)找不到tasks
    # 'app01',  # 這種寫法就能自動(dòng)發(fā)現(xiàn)
]
'''
# 或者不想改settings.INSTALLED_APPS,那就自己把a(bǔ)pp的列表寫在一個(gè)列表里作為參數(shù)吧
app.autodiscover_tasks(['app01'])  # 這里我就這么

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

這里有個(gè)坑,我寫了一段注釋,寫的應(yīng)該比較清楚了。

任務(wù)文件 tasks

在app下創(chuàng)建tasks.py文件(和models.py文件同一級目錄),創(chuàng)建任務(wù)。

- app01/
    - app01/tasks.py
    - app01/models.py

tasks.py文件里創(chuàng)建的函數(shù)用的是 @shared_task 這個(gè)裝飾器。這些任務(wù)是所有app共享的。

# UsingCeleryWithDjango/app01/tasks.py
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)

設(shè)置settings.py

這個(gè)是django的配置文件,不過現(xiàn)在celery的配置也都可以寫在這里了:

# UsingCeleryWithDjango/UsingCeleryWithDjango/settings.py

# 其他都是django的配置內(nèi)容,就省了
# Celery settings

BROKER_URL = 'redis://192.168.246.11/0'
CELERY_RESULT_BACKEND = 'redis://192.168.246.11/0'

這里就做最基本的設(shè)置,用redis收任務(wù)和存任務(wù)結(jié)果,其他都默認(rèn)了設(shè)置了。

啟動(dòng)Worker

啟動(dòng)命令是一樣的,關(guān)鍵就是-A后面的參數(shù):

G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>G:\Steed\Documents\PycharmProjects\venv\UsingCeleryWithDjango\Scripts\celery -A CeleryPro worker -l info
[2018-10-02 20:55:56,411: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@IDX-xujf v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         CeleryPro:0x27f5e4dbe80
- ** ---------- .> transport:   redis://192.168.246.11:6379/0
- ** ---------- .> results:     redis://192.168.246.11/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

[tasks]
  . CeleryPro.celery.debug_task
  . app01.tasks.add
  . app01.tasks.mul
  . app01.tasks.xsum

[2018-10-02 20:55:56,548: INFO/MainProcess] Connected to redis://192.168.246.11:6379/0
[2018-10-02 20:55:56,576: INFO/MainProcess] mingle: searching for neighbors
[2018-10-02 20:55:57,596: INFO/MainProcess] mingle: all alone
[2018-10-02 20:55:57,647: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\fixups\django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2018-10-02 20:55:57,653: WARNING/MainProcess] celery@IDX-xujf ready.

上面這樣就是成功啟動(dòng)了,確認(rèn)一下[tasks]下面的任務(wù)是否都有就沒問題了。
關(guān)于這個(gè)[tasks]下面的內(nèi)容,就是所有我們自定義的任務(wù)的名字,下面研究了一下自己如何獲取到這些任務(wù)名字

獲取到所有的tasks

所有的tasks都可以通過app.tasks獲取到。這個(gè)app就是 CeleryPro/celery.py 里 app = Celery('CeleryPro') 生成的實(shí)例。并且在 CeleryPro/init.py 里通過 from .celery import app as celery_app 換了個(gè)別名,所以在這個(gè)項(xiàng)目里應(yīng)該是 celery_app.tasks 。
打印celery_app.tasks結(jié)果如下:

{'celery.chord_unlock': <@task: celery.chord_unlock of CeleryPro:0x2360b07fc88>, 'celery.group': <@task: celery.group of CeleryPro:0x2360b07fc88>, 'app01.tasks.xsum': <@task: app01.tasks.xsum of CeleryPro:0x2360b07fc88>, 'celery.backend_cleanup': <@task: celery.backend_cleanup of CeleryPro:0x2360b07fc88>, 'app01.tasks.add': <@task: app01.tasks.add of CeleryPro:0x2360b07fc88>, 'celery.map': <@task: celery.map of CeleryPro:0x2360b07fc88>, 'app01.tasks.mul': <@task: app01.tasks.mul of CeleryPro:0x2360b07fc88>, 'celery.chain': <@task: celery.chain of CeleryPro:0x2360b07fc88>, 'CeleryPro.celery.debug_task': <@task: CeleryPro.celery.debug_task of CeleryPro:0x2360b07fc88>, 'celery.starmap': <@task: celery.starmap of CeleryPro:0x2360b07fc88>, 'celery.chord': <@task: celery.chord of CeleryPro:0x2360b07fc88>, 'celery.chunks': <@task: celery.chunks of CeleryPro:0x2360b07fc88>}

我們的任務(wù)都在里面了,但是還多了很多其他的任務(wù)(都是celery開頭的)。之前啟動(dòng)Worker的時(shí)候都是用 -l info 參數(shù),如果用 -l debug 參數(shù)也是能看到這些任務(wù)的。也就是說celery在啟動(dòng)Worker的時(shí)候做了個(gè)過濾,debug模式打印所有,info模式只打印用戶自定義的任務(wù)。接下來現(xiàn)在就是去源碼里找一下,看看是怎么做過濾的。
我在源碼里截取了下面這些來分析一下:

# celery/apps/worker.py

# 首先是一些在啟動(dòng)時(shí)會(huì)打印到控制臺的字符串內(nèi)容
# 這個(gè)是LOGO,這個(gè)不是重點(diǎn)
ARTLINES = [
    ' --------------',
    '---- **** -----',
    '--- * ***  * --',
    '-- * - **** ---',
    '- ** ----------',
    '- ** ----------',
    '- ** ----------',
    '- ** ----------',
    '- *** --- * ---',
    '-- ******* ----',
    '--- ***** -----',
    ' --------------',
]

# 這個(gè)字符串就是打印任務(wù)列表的字符串
# 輸出到控制臺之前,會(huì)用format做一下字符串格式化,這樣任務(wù)列表就能動(dòng)態(tài)的輸出了
EXTRA_INFO_FMT = """
[tasks]
{tasks}
"""

# 這個(gè)類里有很多方法,這里就看看動(dòng)態(tài)獲取任務(wù)列表的恨啊
class Worker(WorkController):

    # 這個(gè)就是生成任務(wù)列表的方法
    # 邏輯也很簡單就是判斷是不是以 'celery' 開頭
    # include_builtins 為True就輸出所有的task,為False就過濾掉'celery'開頭的
    # include_builtins 具體的值看下面的extra_info方法
    def tasklist(self, include_builtins=True, sep='\n', int_='celery.'):
        return sep.join(
            '  . {0}'.format(task) for task in sorted(self.app.tasks)
            if (not task.startswith(int_) if not include_builtins else task)
        )

    # 這個(gè)方法是調(diào)用上面的tasklist方法的
    # 先判斷啟動(dòng)級別,根據(jù)級別是否小于等于debug,決定include_builtins參數(shù)
    # 最后用tasklist返回的結(jié)果,格式化EXTRA_INFO_FMT
    def extra_info(self):
        if self.loglevel <= logging.INFO:
            include_builtins = self.loglevel <= logging.DEBUG
            tasklist = self.tasklist(include_builtins=include_builtins)
            return EXTRA_INFO_FMT.format(tasks=tasklist)

過濾方法很簡單,就是用startswith過濾掉以celery開頭的key就好了。另外過濾之前先用sorted做了個(gè)排序,順便把字典變成了用key組成的列表。
所以用下面的方法就可以獲取到任務(wù)列表:

from CeleryPro import celery_app

def celery_list(request):
    task_list = []
    for task in sorted(celery_app.tasks):
        if not task.startswith('celery.'):
            task_list.append(task)
    print(task_list)
    return HttpResponse('OK')

上面的代碼最終獲得的是一個(gè)列表,可以直接用一個(gè)列表生成式搞定:

task_list = [task for task in sorted(celery_app.tasks) if not task.startswith('celery.')]

這里拿到的只是任務(wù)的key,要調(diào)用任務(wù)的話,就用key在celery_app.tasks這個(gè)字典里獲取到對應(yīng)的value,調(diào)用這個(gè)value的方法:

    task_name = task_list[1]
    t = celery_app.tasks[task_name].delay(1, 2)

在views里調(diào)用任務(wù)

調(diào)用任務(wù)的具體做法,上一節(jié)最后已經(jīng)有了。但是獲取任務(wù)執(zhí)行結(jié)果還有些問題。
之前的做法都是在調(diào)用delay方法時(shí)獲取返回值,就是這個(gè)任務(wù)的對象,有了返回的對象,就可以判斷任務(wù)是否執(zhí)行完成以及獲取任務(wù)執(zhí)行結(jié)果。
但是現(xiàn)在在views視圖函數(shù)里提交任務(wù)后,函數(shù)就返回結(jié)束了,任務(wù)的對象就沒有了,并且也是無法把這里的對象直接返回給瀏覽器的。這里就需要返回一個(gè)任務(wù)的id(就是為每個(gè)任務(wù)生成的uuid)。之后請求時(shí),就通過這個(gè)uuid獲取到之前的任務(wù)的對象。

# 要通過uuid獲取對象,使用下面這個(gè)方法
from celery.result import AsyncResult

task_obj = AsyncResult(uuid)  # 通過uuid獲取到任務(wù)對象
# 先獲取到對象,之后的操作就和之前的一樣了
task_obj.ready()  # 檢查任務(wù)是否執(zhí)行完成
task_obj.get()  # 阻塞的拿結(jié)果
task_obj.result  # 任務(wù)執(zhí)行完成后,結(jié)果就存在這里,就不要再用get方法獲取了

下面是我測試寫的示例代碼
前端頁面
這個(gè)頁面可以選擇任務(wù),填好參數(shù),提交后臺執(zhí)行。提交后會(huì)跳轉(zhuǎn)到任務(wù)結(jié)果頁面:

# UsingCeleryWithDjango/templates/celery_list.html

<body>
<form method="post">
    {% csrf_token %}
    <select name="task_name">
        {% for task in task_list %}
            <option value="{{ task }}">{{ task }}</option>
        {% endfor %}
    </select>
    <input name="args" type="text" placeholder="參數(shù)" />
    <input type="submit" value="提交任務(wù)" />
</form>
<h3>Tips: 后臺會(huì)用json.loads把input提交的參數(shù)做一次反序列化,然后用*args傳參</h3>
<p>debug_task方法,參數(shù)不填</p>
<p>add和mul方法,參數(shù)填個(gè)2個(gè)元素的列表。比如:[1, 2]</p>
<p>xsum方法,參數(shù)接收一個(gè)列表,所以要再包一層[]。比如:[[1, 2, 3, 4, 5]]</p>
</body>

路由函數(shù)
有兩個(gè)url,一個(gè)是提交任務(wù)頁面的url。還有一個(gè)url是根據(jù)uuid拿任務(wù)結(jié)果的,這個(gè)視圖沒寫html,直接用HttpResponse返回了:

# UsingCeleryWithDjango/UsingCeleryWithDjango/urls.py

from django.contrib import admin
from django.urls import path
from app01 import views

urlpatterns = [
    path('admin/', admin.site.urls),
    path('celery_list/', views.celery_list),
    path('celery_result/<uuid:uuid>/', views.celery_result),
]

視圖函數(shù)

# UsingCeleryWithDjango/app01/views.py

from django.shortcuts import render, redirect, HttpResponse
# Create your views here.
from CeleryPro import celery_app
from celery.result import AsyncResult
import json

def celery_list(request):

    if request.method == 'POST':
        task_name = request.POST.get('task_name')
        args = request.POST.get('args')
        if args:
            t = celery_app.tasks[task_name].delay(*json.loads(args))
            return redirect('/celery_result/%s/' % t.id)
        else:
            celery_app.tasks[task_name]()

    # 參考源碼的方法,獲取到所有task名字的列表
    task_list = [task for task in sorted(celery_app.tasks) if not task.startswith('celery.')]
    return render(request, 'celery_list.html', {'task_list': task_list})

def celery_result(request, uuid):
    uuid = str(uuid)
    task_obj = AsyncResult(uuid)
    if task_obj.ready():
        return HttpResponse(task_obj.result)
    else:
        ele = "<input type='button' value='再次獲取結(jié)果' onclick='location.reload();'>"
        return HttpResponse('Not Ready %s' % ele)

測試下來都很好,不過所有任務(wù)都是立刻會(huì)返回結(jié)果的。所以去修改一下tasks.py里的任務(wù)。找個(gè)任務(wù)加點(diǎn)延遲 time.sleep() ,如果任務(wù)沒有執(zhí)行完成,也不會(huì)卡住,而是先返回一個(gè)頁面,可以再刷新,如果執(zhí)行完成了,就能返回任務(wù)執(zhí)行的結(jié)果。

在django中使用定時(shí)任務(wù)

要在django中使用定時(shí)任務(wù),到這里需要再安裝一個(gè)模塊:

pip install django_celery_beat

這個(gè)模塊是 django_celery_beat ,注意名字里是下劃線,不過命令里用中橫杠也認(rèn)(大概是做了別名)。這個(gè)模塊不僅僅只是做定時(shí)任務(wù),它是通過把任務(wù)存到django的數(shù)據(jù)庫里實(shí)現(xiàn)的,所以還可以很方便的通過django admin來設(shè)置和管理。
注意:安裝這個(gè)模塊的時(shí)候還會(huì)自動(dòng)安裝一些別的依賴模塊,不過坑的地方是,會(huì)把原本的celery更新到最新版,也就是號稱不支持windows的4版本。

既然升級了,就先在當(dāng)前的環(huán)境下跑跑試試看。然后踩了2個(gè)坑。
我用的是win10系統(tǒng),部分由于windows操作系統(tǒng)導(dǎo)致的問題,不知道通用性是如何的。

無法自動(dòng)發(fā)現(xiàn)app的任務(wù)

worker可以正常啟動(dòng),頁面也能打開,但是app里定義的任務(wù)都找不到了。
自動(dòng)發(fā)放所有app下的tasks是在 "UsingCeleryWithDjango/CeleryPro/celery.py" 這個(gè)文件里配置的,具體是調(diào)用下面的這個(gè)方法:

# from django.conf import settings  # noqa
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  # 從django的settings里獲取app的路徑
app.autodiscover_tasks(['app01'],)  # 自己指定

去看了下這個(gè)方法的源碼,一大段注釋,不過內(nèi)容很簡單:

    def autodiscover_tasks(self, packages=None,
                           related_name='tasks', force=False):
        """Auto-discover task modules.

        Searches a list of packages for a "tasks.py" module (or use
        related_name argument).

        If the name is empty, this will be delegated to fix-ups (e.g., Django).

        For example if you have a directory layout like this:

        .. code-block:: text

            foo/__init__.py
               tasks.py
               models.py

            bar/__init__.py
                tasks.py
                models.py

            baz/__init__.py
                models.py

        Then calling ``app.autodiscover_tasks(['foo', 'bar', 'baz'])`` will
        result in the modules ``foo.tasks`` and ``bar.tasks`` being imported.

        Arguments:
            packages (List[str]): List of packages to search.
                This argument may also be a callable, in which case the
                value returned is used (for lazy evaluation).
            related_name (str): The name of the module to find.  Defaults
                to "tasks": meaning "look for 'module.tasks' for every
                module in ``packages``."
            force (bool): By default this call is lazy so that the actual
                auto-discovery won't happen until an application imports
                the default modules.  Forcing will cause the auto-discovery
                to happen immediately.
        """
        if force:
            return self._autodiscover_tasks(packages, related_name)
        signals.import_modules.connect(starpromise(
            self._autodiscover_tasks, packages, related_name,
        ), weak=False, sender=self)

內(nèi)容就是一個(gè)if,然后返回某個(gè)東西。關(guān)鍵是if的條件,是一個(gè)默認(rèn)參數(shù)為false的變量,所以用默認(rèn)方法調(diào)用,是不會(huì)執(zhí)行任何語句的。解決辦法就很簡單了,調(diào)用的時(shí)候指定force參數(shù):

app.autodiscover_tasks(['app01'], force=True)  # 4版本有個(gè)force參數(shù)。默認(rèn)是False,需要設(shè)為True

執(zhí)行任務(wù)報(bào)錯(cuò)

啟動(dòng)worker(-l info),打開網(wǎng)頁,提交任務(wù)。然后報(bào)錯(cuò)。worker上的錯(cuò)誤信息如下:

[2018-10-08 13:23:28,062: INFO/MainProcess] Received task: app01.tasks.add[ff0f5e76-6474-4f74-a93c-7b2486abe07e]
[2018-10-08 13:23:28,078: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
Traceback (most recent call last):
  File "g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\billiard\pool.py", line 358, in workloop
    result = (True, prepare_result(fun(*args, **kwargs)))
  File "g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task
    tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)

這個(gè)問題基本上判斷下來就是4版本不支持windows系統(tǒng)導(dǎo)致的。
通過celery降級解決問題
這小段看看就好,因?yàn)楹竺嬗胁唤导壍霓k法。
到這里我就沒能力看懂錯(cuò)誤信息然后找出真正的問題了,只能把celery的版本降回去再看看了:

pip uninstall celery
pip install celery==3.1.25

所謂降級,其實(shí)就是先刪了,然后再裝一個(gè)舊版本。這條路我沒繼續(xù)走下去。

4版本的celery還是能用的
有發(fā)現(xiàn)個(gè)新的辦法,可以解決這里的問題,還需要再裝一個(gè)模塊:

pip install eventlet

裝完之后,加一個(gè)新的參數(shù)啟動(dòng)worker,"-P eventlet" :

G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>G:\Steed\Documents\PycharmProjects\venv\UsingCeleryWithDjango\Scripts\celery -A CeleryPro worker -l info -P eventlet

 -------------- celery@IDX-xujf v4.2.1 (windowlicker)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0 2018-10-08 13:33:21
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         CeleryPro:0x16ad81d16a0
- ** ---------- .> transport:   redis://192.168.246.11:6379/0
- ** ---------- .> results:     redis://192.168.246.11/0
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . CeleryPro.celery.debug_task
  . app01.tasks.add
  . app01.tasks.mul
  . app01.tasks.xsum

[2018-10-08 13:33:21,430: INFO/MainProcess] Connected to redis://192.168.246.11:6379/0
[2018-10-08 13:33:21,457: INFO/MainProcess] mingle: searching for neighbors
[2018-10-08 13:33:22,488: INFO/MainProcess] mingle: all alone
[2018-10-08 13:33:22,502: WARNING/MainProcess] g:\steed\documents\pycharmprojects\venv\usingcelerywithdjango\lib\site-packages\celery\fixups\django.py:200: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2018-10-08 13:33:22,504: INFO/MainProcess] celery@IDX-xujf ready.
[2018-10-08 13:33:22,519: INFO/MainProcess] pidbox: Connected to redis://192.168.246.11:6379/0.
[2018-10-08 13:34:13,596: INFO/MainProcess] Received task: app01.tasks.add[2b56d6b7-012f-44db-bf4b-2d85d22dcd8d]
[2018-10-08 13:34:13,611: INFO/MainProcess] Task app01.tasks.add[2b56d6b7-012f-44db-bf4b-2d85d22dcd8d] succeeded in 0.0s: 7

上面是worker的日志,啟動(dòng)后,還提交了一個(gè)任務(wù),這次正常處理完了。

使用 Django_Celery_Beat

先在settings的INSTALLED_APPS里注冊一下:

INSTALLED_APPS = [
    ......
    'django_celery_beat',
]

應(yīng)用django_celery_beat的數(shù)據(jù)庫,會(huì)自動(dòng)創(chuàng)建幾張表。只要直接migrate就好了:

>python manage.py migrate
Operations to perform:
  Apply all migrations: admin, auth, contenttypes, django_celery_beat, sessions
Running migrations:
  Applying django_celery_beat.0001_initial... OK
  Applying django_celery_beat.0002_auto_20161118_0346... OK
  Applying django_celery_beat.0003_auto_20161209_0049... OK
  Applying django_celery_beat.0004_auto_20170221_0000... OK
  Applying django_celery_beat.0005_add_solarschedule_events_choices... OK
  Applying django_celery_beat.0006_auto_20180210_1226... OK

登錄django admin后,就能看下如下的幾張表了:


Celery 全面學(xué)習(xí)筆記

任務(wù)都是配置在Periodic tasks表里的。另外幾張表就是各種任務(wù)執(zhí)行周期的。

配置任務(wù)

先進(jìn)入 Intervals 表,新建任務(wù)周期。這里建一個(gè)每5秒的周期。


Celery 全面學(xué)習(xí)筆記

然后進(jìn)入 Periodic tasks 表,選擇要執(zhí)行的任務(wù),關(guān)聯(lián)上某個(gè)周期。
這里能看到的任務(wù)就是通過自動(dòng)發(fā)現(xiàn)注冊的任務(wù):


Celery 全面學(xué)習(xí)筆記

下面還有填寫任務(wù)參數(shù)的部分,這里有兩個(gè)框,里面寫JSON。位置參數(shù)寫上面,關(guān)鍵參數(shù)寫下面:


Celery 全面學(xué)習(xí)筆記

這里的JSON會(huì)反序列化之后,以 "*args, **kwargs" 傳遞給任務(wù)函數(shù)的。
好了任務(wù)配置完了,其他任務(wù)周期也是一樣的,就不試了。

啟動(dòng)Beat

這里依然需要啟動(dòng)一個(gè)Beat來定時(shí)發(fā)任務(wù)的。先把Worker起動(dòng)起來,然后啟動(dòng)Beat需要多加一個(gè)參數(shù) "-S django" :

G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>G:\Steed\Documents\PycharmProjects\venv\UsingCeleryWithDjango\Scripts\celery -A CeleryPro beat -l info -S django
celery beat v4.2.1 (windowlicker) is starting.
__    -    ... __   -        _
LocalTime -> 2018-10-08 14:43:43
Configuration ->
    . broker -> redis://192.168.246.11:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> django_celery_beat.schedulers.DatabaseScheduler

    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 seconds (5s)
[2018-10-08 14:43:43,907: INFO/MainProcess] beat: Starting...
[2018-10-08 14:43:43,908: INFO/MainProcess] Writing entries...
[2018-10-08 14:43:48,911: INFO/MainProcess] Writing entries...
[2018-10-08 14:43:48,939: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add)
[2018-10-08 14:43:53,922: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add)
[2018-10-08 14:43:58,922: INFO/MainProcess] Scheduler: Sending due task add34 (app01.tasks.add)
[2018-10-08 14:43:59,534: INFO/MainProcess] Writing entries...
[2018-10-08 14:43:59,717: INFO/MainProcess] Writing entries...
[2018-10-08 14:43:59,727: INFO/MainProcess] Writing entries...
[2018-10-08 14:43:59,729: INFO/MainProcess] Writing entries...

G:\Steed\Documents\PycharmProjects\UsingCeleryWithDjango>

注意:每次修改任務(wù),都需要重啟Beat,最新的配置才能生效。這個(gè)對 Intervals 的任務(wù)(每隔一段時(shí)間執(zhí)行的),影響比較大。Crontab的任務(wù)問題貌似不是很大。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容