在Flask中使用Celery的最佳實踐

寫在前面

本最佳實踐是基于作者有限的經(jīng)驗,歡迎大家共同討論,可以持續(xù)維護此最佳實踐。另本文中所使用的環(huán)境為Mac&Ubuntu環(huán)境,軟件版本如下:

  • Celery (4.1.0)
  • Flask (0.12.1)
  • RabbitMQ(3.6.9)
  • librabbitmq (1.6.1)

介紹

簡單來說Celery是一個異步的任務(wù)隊列,當(dāng)我們需要將一些任務(wù)(比如一些需要長時間操作的任務(wù))異步操作的時候,這時候Celery就可以幫到我們,另外Celery還支持定時任務(wù)(類似Crontab)。詳細(xì)的介紹可以參考官網(wǎng)

使用RabbitMQ作為Broker

RabbitMQ是官方推薦使用的Broker,它實際是一個消息中間件,負(fù)責(zé)消息的路由分發(fā),安裝RabbitMQ如下:

# install on Ubuntu
apt-get update
apt-get install rabbitmq-server -yq

需要注意的是,線上環(huán)境我們需要創(chuàng)建新的賬號,并將guest賬號刪除,操作如下:

rabbitmqctl add_user myuser mypassword  # 新增用戶
rabbitmqctl add_vhost myvhost  # 新增vhost,以使用不同的命名空間
rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"  # 設(shè)置權(quán)限
rabbitmqctl  delete_user guest  # 安全原因,刪除guest

注意:vhost是一個虛擬空間,用于區(qū)分不同類型的消息
然后,在Celery的配置中配置broker URL

CELERY_BROKER_URL = 'amqp://myuser:mypassword@localhost:5672/myvhost'

注意:當(dāng)使用amqp協(xié)議頭時,如果安裝有librabbitmq則使用librabbitmq,否則使用pyamqp

Celery的日志輸出

在task中想要輸出日志,最好的方法是通過如下方式

from celery.utils.log import get_task_logger

lg = get_task_logger(__name__)

@celery.task
def log_test():
    lg.debug("in log_test()")

但是僅如此會發(fā)現(xiàn)所有的日志最后都跑到shell窗口的stdout當(dāng)中,原來必須得在啟動celery的時候使用-f option來指定輸出文件,如下:

celery -A main.celery worker -l debug -f log/celery/celery_task.log &

-A:指定celery實例
worker: 啟動worker進程
-l:指定log level,這里指定log level為debug level
-f:指定輸出的日志文件

使用Redis作為backend

當(dāng)使用Redis作為存儲后端的時候,我們可以通過設(shè)置DB number來使得Celery的結(jié)果存儲與其它數(shù)據(jù)存儲隔離開來,比如在筆者的項目中,redis還用作緩存的存儲后端,因此為了區(qū)分,Celery在使用Redis的時候使用的DB number是1(默認(rèn)是0),關(guān)于Redis DB number可以參考這里.
因此我們的backend設(shè)置如下:

CELERY_RESULT_BACKEND = 'redis://localhost:6379/1' # 最后的數(shù)字1代表DB number

查看Celery任務(wù)的結(jié)果可以通過Redis-cli連接Redis數(shù)據(jù)庫進行查看

> redis-cli
> select 1 # 這里選擇DB 1, 也可以在使用redis-cli -n 1來進入指定的DB
> get key # 獲取指定key對應(yīng)的結(jié)果
redis-cli.jpg

調(diào)試代碼

我認(rèn)為此處是非常重要的一個技巧,即在調(diào)試代碼的時候,我們可以將delay或者apply_async先去掉,直接調(diào)用worker的函數(shù)進行同步調(diào)試,調(diào)試成功后再加上delay或者apply_async method

Celery可能會遇到的坑

Celery4.x版本使用librabbitmq的問題

Celery 4.x版本在使用librabbitmq時,會出現(xiàn)類似這樣的錯誤

Received and deleted unknown message.  Wrong destination?!?

完整錯誤如圖:


celeryerror

解決這個問題有兩個方式:

  1. 推薦方式,更改配置項task_protocol為1。
    Github上Robert Kopaczewski詳細(xì)解釋了這個問題,原文如下:

Apparently librabbitmq issue is related to new default protocol in celery 4.x. You can switch to previous protocol version by either putting CELERY_TASK_PROTOCOL = 1 in your settings if you're using Django or settings app.conf.task_protocol = 1 in celeryconf.py.

  1. 另一種方式是不使用librabbitmq, 通過pip uninstall librabbitmq, 并且更改broker配置的協(xié)議頭為'pyamqp',如下,也可以解決這個問題。
BROKER_URL = 'pyamqp://guest:guest@localhost:5672/%2F'

由于librabbitmq的性能優(yōu)勢,我們還是推薦方式1來解決該問題。

RabbitMQ遠(yuǎn)程連接問題

如果RabbitMQ與Celery不在同一臺機器上,除在Celery配置的時候要將BROKER_URL設(shè)置為正確的IP地址外,還需要將Rabbitmq的配置文件/usr/local/etc/rabbitmq/rabbitmq-env.conf中的NODE_IP_ADDRESS更改為0.0.0.0

NODE_IP_ADDRESS=0.0.0.0

Celery import問題

The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'\x8e\xa7expires\xc0\xa3utc\xc3\xa4args\x91\x85\xa3tid\xb85971a43d47f84bb278f77fc2\xa3sen\xa2A1\xa2tt\xa2ar\xa2co\xc4\x00\xa1t\xa4like\xa5chord\xc0\xa9callbacks\xc0\xa8errbacks\xc0\xa7taskset\xc0\xa2id\xc4$c133dbf8-2c89-4311-b7cf-c377041058ec\xa7retries\x00\xa4task\xd9$tasks.messageTasks.send_like_message\xa5group\xc0\xa9timelimit\x92\xc0\xc0\xa3eta\xc0\xa6kwargs\x80' (239b)
Traceback (most recent call last):
  File "/Users/liufeng/.pyenv/versions/2.7.13/envs/kaopu_backend/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 561, in on_task_received
    strategy = strategies[type_]
KeyError: u'tasks.messageTasks.send_like_message'

出現(xiàn)這條錯誤是由于我們的tasks跟celery并不是在同一個文件中,即不是同一個module,當(dāng)我們通過如下命令啟動task worker時,實際只加載了app module,而沒有加載tasks相關(guān)的module

celery -A app.celery worker -l info

要解決這個問題,必須為celery配置文件添加import參數(shù),如下

app.config['imports'] = ['tasks.messageTasks']

Celery unregistered task問題

在開發(fā)過程中遇到了這樣一個問題

[2017-08-31 15:38:19,605: ERROR/MainProcess] Received unregistered task of type u'app.tasks.messageTasks.send_follow_message'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'\x8e\xa7expires\xc0\xa3utc\xc3\xa4args\x91\x86\xa6sender\xa5Jenny\xa9target_id\xb859a5313847f84be534ad7d46\xabtarget_type\xa4user\xa7content\xc4\x00\xa8receiver\xb859a5313847f84be534ad7d46\xa4type\xa6follow\xa5chord\xc0\xa9callbacks\xc0\xa8errbacks\xc0\xa7taskset\xc0\xa2id\xc4$a4d40c14-1976-41a6-a753-d2a495929920\xa7retries\x00\xa4task\xd9*app.tasks.messageTasks.send_follow_message\xa5group\xc0\xa9timelimit\x92\xc0\xc0\xa3eta\xc0\xa6kwargs\x80' (312b)
Traceback (most recent call last):
  File "/Users/liufeng/.pyenv/versions/2.7.13/envs/kaopu_backend/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 561, in on_task_received
    strategy = strategies[type_]
KeyError: u'app.tasks.messageTasks.send_follow_message'

解決這個問題,最開始是根據(jù)提示,將所有涉及到task的module全部加上from __future__ import absolute_import 之后運行之后還是不行,后來發(fā)現(xiàn)是由于之前啟動時使用的是app module, 但是我的代碼已經(jīng)改成了main.py,所以重新啟動了celery,最后問題解決

使用鏡像遷移系統(tǒng)也依然需要重新添加rabbitmq的用戶

問題最開始是發(fā)現(xiàn)無法點贊,也無法Follow用戶,通過http消息發(fā)現(xiàn)出現(xiàn)502錯誤,于是登錄到服務(wù)器檢查,發(fā)現(xiàn)應(yīng)用服務(wù)本身沒有任何報錯,于是又去查看Celery的日志,結(jié)果發(fā)現(xiàn)出現(xiàn)如下錯誤:

[2017-11-13 16:32:01,243: ERROR/MainProcess] consumer: Cannot connect to amqp://celeryuser:**@loc      alhost:5672/celeryvhost: Couldn't log in: a socket error occurred.

經(jīng)過一番搜索發(fā)現(xiàn)網(wǎng)上的評論主要是說URL不對的情況下會出現(xiàn)這種情況,但是我的URL沒有改過啊,那又會是什么問題呢?繼續(xù)看,發(fā)現(xiàn)有人提到了權(quán)限問題,于是又是一番檢查,發(fā)現(xiàn)RabbitMQ中并沒有原先設(shè)置的用戶(我使用的是原系統(tǒng)的鏡像,原以為用戶也是已經(jīng)設(shè)置好的)

# 查看有哪些用戶
rabbitmqctl  list_users

然后就簡單了,按照步驟創(chuàng)建用戶,vhost,再賦予權(quán)限,刪除guest,然后就終于都連好了

另外,發(fā)現(xiàn)從鏡像復(fù)制系統(tǒng)后,RabbitMQ并不能正常工作,必須殺掉原先的進程,重新啟動

更改task的代碼后,重啟Celery

需要注意的是,在更改task的代碼后,必須重新啟動Celery,否則代碼改動無法生效,可能導(dǎo)致一些意外的問題

使用--pidfile

--pidfile: 可以指定pid,從而避免當(dāng)出現(xiàn)多個進程的時候Celery不知道該將任務(wù)發(fā)送給誰,從而出現(xiàn)無法收到任務(wù)的情況。所以,我們在啟動celery的時候就應(yīng)當(dāng)指定pidfile,如下:

--pidfile /var/pidfile/celery.pid
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容