一、使用celery的原因
分布式任務(wù)調(diào)度框架celery及其監(jiān)控工具flower,Linux進(jìn)程管理工具supervisor
項(xiàng)目痛點(diǎn):
1、代碼上線及運(yùn)維困難,新代碼上線必須保證系統(tǒng)中沒有正在運(yùn)行的異步任務(wù),等待任務(wù)結(jié)束期間無法保證系統(tǒng)不在接收新任務(wù)。(項(xiàng)目中進(jìn)程多是以multiprocessing方式啟動)
2、重啟困難,重啟后不知道是否啟動成功,必須手動curl測試接口保證系統(tǒng)重啟成功,缺少重啟監(jiān)控機(jī)制。
痛點(diǎn)解決:
1、celery解決中斷任務(wù)痛點(diǎn),所有異步任務(wù)均由celery下發(fā)??蓡为?dú)重啟一個worker或所有worker。重啟worker時保證當(dāng)前worker正在消費(fèi)的任務(wù)重新回到隊(duì)列,等待處于工作狀態(tài)的worker消費(fèi)。不同worker可運(yùn)行不同版本的代碼。
2、supervisor解決重啟痛點(diǎn),新架構(gòu)中一個節(jié)點(diǎn)會啟多worker以及flower和后端服務(wù),具有大量進(jìn)程需要管理,手動管理已然不現(xiàn)實(shí)。supervisor可對啟動異常的進(jìn)程自動重啟也可對異常退出的進(jìn)程進(jìn)行拉起,并且提供客戶端和web界面。
二、架構(gòu)圖

三、調(diào)度框架celery
celery中的幾個概念
1、broker 消息傳輸中間件,可以簡單理解為隊(duì)列,支持RabbitMQ,Redis,SQS(某些博客說支持sqlalchemy,官網(wǎng)未找到,實(shí)驗(yàn)也未成功)。celery對Redis Cluster類型的redis集群支持不是很好,目前正在尋找解決方案。
2、exchange 路由,可將特定任務(wù)路由到指定隊(duì)列。
3、worker 消費(fèi)者。會在多節(jié)點(diǎn)啟多worker
4、task 異步任務(wù)。某些任務(wù)需要指定消費(fèi)節(jié)點(diǎn)。所以觸發(fā)任務(wù)時需要顯式指定該任務(wù)的存放的隊(duì)列,task.apply_async(queue='q1')。未指定的將會放到default隊(duì)列,由三個節(jié)點(diǎn)競爭。
5、backend 結(jié)果存儲??墒褂胢q,redis,nosql、mysql等。存放任務(wù)執(zhí)行的結(jié)果。
1、使用方案
一、異步任務(wù)
設(shè)置default,q1,q2,q3四個隊(duì)列,各節(jié)點(diǎn)會監(jiān)聽各自的隊(duì)列,并且所有節(jié)點(diǎn)都監(jiān)聽default隊(duì)列。
編寫異步任務(wù)和正常寫函數(shù)是一樣的,最后只需要對該函數(shù)使用裝飾器@celery.task將該任務(wù)注冊為異步任務(wù)。如果有多個裝飾器進(jìn)行組合使用時,必須確保 task() 裝飾器被放置在首位:
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
觸發(fā)任務(wù)
簡單觸發(fā)時可使用 delay ,但是該方法無法指定存放的隊(duì)列,因此該任務(wù)會被放到默認(rèn)隊(duì)列
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
如果需要設(shè)置額外的行參數(shù),必須用 apply_async
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x'}, queue='q1')
啟動worker
celery -A app.celery worker -l info -Q default,q1 --concurrency=10 -n node1-worker-1@%%h
二、beat任務(wù)(定時任務(wù))
對于celery產(chǎn)生的定時任務(wù)如果放到一個隊(duì)列里,該任務(wù)被一個worker拿到后其他worker將獲取不到該任務(wù)。這樣會產(chǎn)生一個現(xiàn)象即該任務(wù)只在一個節(jié)點(diǎn)執(zhí)行了,但業(yè)務(wù)上需要的是該任務(wù)在各個節(jié)點(diǎn)都執(zhí)行。
對此現(xiàn)象的解決方案是各節(jié)點(diǎn)需要定制各節(jié)點(diǎn)的定時任務(wù)并放到各自的隊(duì)列里。對于任務(wù)a生成者會將其產(chǎn)生三份對下發(fā)到三個節(jié)點(diǎn)。對于任務(wù)b,node1并不需要執(zhí)行,因此會產(chǎn)生兩份并將其下發(fā)到node2和node3上。
這并不意味同一個任務(wù)需要編寫三份,任務(wù)編寫完后只需要將其注冊各節(jié)點(diǎn)對應(yīng)的配置里(需要自己實(shí)現(xiàn))。
為不影響其他異步任務(wù)執(zhí)行,beat將會由各節(jié)點(diǎn)單獨(dú)的worker進(jìn)行消費(fèi)。
產(chǎn)生beat任務(wù)
celery -A app.celery beat -l info
消費(fèi)beat任務(wù)
celery -A app.celery worker -l info -Q node1-crontab --concurrency=10 -n node1-worker-crontab@%%h
這里的node1-crontab為新的隊(duì)列,專門存放node1節(jié)點(diǎn)需要消費(fèi)的定時任務(wù)。
三、task是如何工作的
這里會說明為什么不同worker可以運(yùn)行不同版本的代碼,甚至生產(chǎn)者和消費(fèi)者之間也可以運(yùn)行不同版本的代碼。
celery的任務(wù)是注冊在注冊表中,該表中注冊了任務(wù)名和任務(wù)類。說人話就是celery會在隊(duì)列中傳遞任務(wù)的模塊,例如proj模塊中有一個task.py,該文件中編寫了一個叫add的異步任務(wù)(函數(shù)),那么celery傳遞的就是proj.task.add,只要保證消費(fèi)該任務(wù)的worker中有該模塊該文件該函數(shù)就行,worker并不關(guān)心該函數(shù)里是怎樣執(zhí)行的,是否和生產(chǎn)者一致。
任務(wù)狀態(tài)
- PENDING 任務(wù)正在等待執(zhí)行或未知。任何未知的任務(wù) ID 都默認(rèn)處于掛起狀態(tài)。
- STARTED 任務(wù)開始執(zhí)行
- SUCCESS 任務(wù)執(zhí)行成功
- FAILURE 任務(wù)執(zhí)行失敗
- RETRY 任務(wù)處于重試狀態(tài),這里指在task中捕獲到異常并顯式調(diào)用celery使其重試
- REVOKED 任務(wù)被撤銷
@celery.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
2、celery監(jiān)控工具flower
Flower是基于web的監(jiān)控和管理celery工具
flower可以
- 用Celery事件實(shí)時監(jiān)控,顯示任務(wù)的詳細(xì)信息,圖形化和統(tǒng)計
- 查看worker狀態(tài)和統(tǒng)計,查看當(dāng)前正在運(yùn)行的tasks
- Broker monitoring(中間人監(jiān)控),查看所有Celery 隊(duì)列的統(tǒng)計,隊(duì)列長度圖
flower只需啟動在生產(chǎn)者端即可
截圖展示

四、進(jìn)程管理工具supervisor
粗略估計在node1上會啟后端服務(wù),celery worker三個,定時任務(wù)消費(fèi)worker一個,celery beat一個,flower進(jìn)程。這么多進(jìn)程用手工一個個啟動肯定要花費(fèi)大量時間,于是用supervisor管理這些進(jìn)程。
supervisor會已啟動自己子進(jìn)程的方式開啟進(jìn)程,可以對異常退出的進(jìn)程進(jìn)行重啟操作。
supervisor可以分為三個部分
- supervisord 服務(wù)端,主要負(fù)責(zé)啟動與管理進(jìn)程,響應(yīng)客戶端的請求
- supervisorctl 客戶端,提供一個命令行來使用supervisord提供的服務(wù)
- web界面 用來查看與管理子進(jìn)程
1、子進(jìn)程配置
[program:worker]
command=celery -A app.celery worker -l info -Q default,q1 --concurrency=10 -n node1-worker-%(process_num)s@%%h ; 啟動命令
process_name=%(program_name)s-%(process_num)d ; 進(jìn)程名
numprocs=3 ; 進(jìn)程數(shù)量
directory=/Users/aaa/PycharmProjects/flask_test ; 工作路徑
;umask=022 ; umask for process (default None)
priority=999 ; 優(yōu)先級。優(yōu)先級低,最先啟動,關(guān)閉的時候最后關(guān)閉
autostart=true ; supervisor啟動后自動啟動
startsecs=1 ; 啟動多少秒后是running認(rèn)為啟動成功
;startretries=3 ; 最大啟動重試次數(shù) (default 3)
autorestart=true ; 子進(jìn)程掛掉自動重啟 (def: unexpected)
;exitcodes=0 ; 'expected' exit codes used with autorestart (default 0)
stopsignal=TERM ; 進(jìn)程停止信號,停止celery worker時使用TERM, (TERM, HUP, INT, QUIT, KILL, USR1, or USR2)
stopwaitsecs=30 ; 等待停止最大時間,超過此時間會強(qiáng)制kill (default 10)
stopasgroup=true ; 停掉子進(jìn)程的子進(jìn)程(保證不會出現(xiàn)孤兒進(jìn)程)
;killasgroup=true ; kill進(jìn)程及其子進(jìn)程,直接發(fā)送KILL信號不會等待進(jìn)程退出
;user=chrism ; 管理子進(jìn)程的用戶
redirect_stderr=true ; redirect 日志 stderr to stdout
stdout_logfile=/Users/aaa/PycharmProjects/flask_test/log/node1/celery-worker-1.log ; 日志
stdout_logfile_maxbytes=50MB ; 單個日志文件最大大小 (default 50MB)
stdout_logfile_backups=20 ; 日志文件數(shù)量 (default 10)
;stdout_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
;stdout_events_enabled=false ; emit events on stdout writes (default false)
;stdout_syslog=false ; send stdout to syslog with process name (default false)
;stderr_logfile=/Users/aaa/PycharmProjects/flask_test/log/default/celery_err.log ; 錯誤日志
;stderr_logfile_maxbytes=10MB ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10)
;stderr_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
;stderr_events_enabled=false ; emit events on stderpidr writes (default false)
;stderr_syslog=false ; send stderr to syslog with process name (default false)
environment=PATH="/Users/aaa/anaconda3/envs/flask_test/bin" ; 環(huán)境變量,子進(jìn)程間不共享
;serverurl=AUTO ; override serverurl computation (childutils)
supervisor配置文件放在supervisord.conf中
啟動supervisord
supervisord -c supervisord.conf
對于celery worker節(jié)點(diǎn)進(jìn)程退出信號使用TERM,TERM信號會使worker進(jìn)行熱關(guān)機(jī),worker會將未消費(fèi)完的任務(wù)放回到隊(duì)列。
發(fā)現(xiàn)丟任務(wù)的情況:
假設(shè)worker1正在消費(fèi)任務(wù)3個,worker2正在消費(fèi)任務(wù)4個。將worker1關(guān)機(jī),3個任務(wù)會進(jìn)入到worker2,再將work2關(guān)機(jī)后打開worker3,這時會發(fā)現(xiàn)少了兩個任務(wù)。
丟任務(wù)的解決方案:
方案一:啟動一個worker然后將其關(guān)機(jī)后未消費(fèi)完的任務(wù)可以全部回到隊(duì)列,需要重啟時可以先將未有消費(fèi)任務(wù)的worker進(jìn)行重啟,然后再停掉正在消費(fèi)的worker?;蛘咧煌5粢粋€worker。
方案二:supervisor進(jìn)程組的概念,直接將進(jìn)程組重啟
代碼更新后只需重啟子進(jìn)程,不需要重啟supervisord
進(jìn)程組配置
[group:celery-worker]
programs=worker ;上面實(shí)例三個進(jìn)程會默認(rèn)分配到名為worker的進(jìn)程組,這里定義進(jìn)程組會覆蓋默認(rèn)的
priority=999 ; the relative start priority (default 999)
對進(jìn)程組進(jìn)行操作等同于對進(jìn)程組下所有的進(jìn)程操作
對于進(jìn)程組操作在進(jìn)程組名后需要加上冒號即 celery-worker:
2、進(jìn)程數(shù)說明
問題:celery啟動命令中已經(jīng)指定了 --concurrency=10 參數(shù)配置worker中開啟的進(jìn)程數(shù)量,為什么在supervisord中還要指定 numprocs=3 進(jìn)程數(shù)呢?
答:這兩個參數(shù)指定的進(jìn)程數(shù)量是不同的意義。在celery中指定進(jìn)程數(shù)即意味著單個worker中可開啟的最大進(jìn)程數(shù)據(jù)量。在supervisord指定的進(jìn)程數(shù)會直接開啟三個worker,相當(dāng)將定義的cmd執(zhí)行了三次。
supervisord中如果指定numprocs的同時也需要指定 process_name=%(program_name)s-%(process_num)d ,原因在于如果多個進(jìn)程使用相同的進(jìn)程名會報錯,所以需要指定不同的進(jìn)程名。program_name為 [program:worker] 中定義的名字,即worker。process_num為進(jìn)程的序號,從1開始,注意它不是pid。
使用numprocs=3 創(chuàng)建的三個worker默認(rèn)會被放到一個名為worker(在哪里定義)的進(jìn)程組里,如果在后面定義一個新的進(jìn)程組并將worker放進(jìn)去則這三個worker會的默認(rèn)進(jìn)程組會被替換為新的進(jìn)程組,同時新的進(jìn)程組里也可以放一個在其它program里定義的進(jìn)程。
[group:node1-celery-worker]
programs=worker,crontab-worker # crontab-worker為在其它program里定義的進(jìn)程
priority=999
3、supervisorctl命令
supervisorctl start ${program} # 啟動進(jìn)程
supervisorctl stop ${program} # 停止進(jìn)程
supervisorctl restart ${program} # 重啟進(jìn)程
supervisorctl status ${program} # 查看進(jìn)程狀態(tài)
supervisorctl update # 重新載入配置文件
supervisorctl shutdown # 關(guān)閉supervisord服務(wù)
supervisorctl reload # 重啟supervisord服務(wù)
supervisorctl stop all # 停止所有進(jìn)程
對于已經(jīng)配置好的supervisor并不需要進(jìn)行supervisord級別的重啟以及重新載入配置。代碼更新后只需重啟子進(jìn)程即可加載最新代碼。

4、supervisor界面
將supervisorctl的命令可視化,可以直接點(diǎn)點(diǎn)點(diǎn),此外還可以展示子進(jìn)程的日志。實(shí)則感覺有supervisorctl就可以了
