Celery,Tornado,Supervisor構(gòu)建和諧的分布式系統(tǒng)

Celery 分布式的任務(wù)隊(duì)列

與rabbitmq消息隊(duì)列的區(qū)別與聯(lián)系:

  • rabbitmq 調(diào)度的是消息,而Celery調(diào)度的是任務(wù).
  • Celery調(diào)度任務(wù)時,需要傳遞參數(shù)信息,傳輸載體可以選擇rabbitmq.
  • 利用rabbitmq的持久化和ack特性,Celery可以保證任務(wù)的可靠性.

優(yōu)點(diǎn):

  • 輕松構(gòu)建分布式的Service Provider,提供服務(wù)。
  • 高可擴(kuò)展性,增加worker也就是增加了隊(duì)列的consumer。
  • 可靠性,利用消息隊(duì)列的durable和ack,可以盡可能降低消息丟失的概率,當(dāng)worker崩潰后,未處理的消息會重新進(jìn)入消費(fèi)隊(duì)列。
  • 用戶友好,利用flower提供的管理工具可以輕松的管理worker。


    flower
    flower
  • 使用tornado-celery,結(jié)合tornado異步非阻塞結(jié)構(gòu),可以提高吞吐量,輕松創(chuàng)建分布式服務(wù)框架。
  • 學(xué)習(xí)成本低,可快速入門

快速入門
定義一個celery實(shí)例main.py:

from celery import Celery
app = Celery('route_check', include=['check_worker_path'], 
        broker='amqp://user:password@rabbitmq_host:port//')
app.config_from_object('celeryconfig')

include指的是需要celery掃描是否有任務(wù)定義的模塊路徑。例如add_task 就是掃描add_task.py中的任務(wù)

celery的配置文件可以從文件、模塊中讀取,這里是從模塊中讀取,celeryconfig.py為:

from multiprocessing import cpu_count

from celery import platforms
from kombu import Exchange, Queue

CELERYD_POOL_RESTARTS = False
CELERY_RESULT_BACKEND = 'redis://:password@redis_host:port/db'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('common_check', Exchange('route_check'), routing_key='common_check'),
    Queue('route_check', Exchange('route_check'), routing_key='route_check', delivery_mode=2),
    Queue('route_check_ignore_result', Exchange('route_check'), routing_key='route_check_ignore_result',
          delivery_mode=2)
)
CELERY_ROUTES = {
    'route_check_task.check_worker.common_check': {'queue': 'common_check'},
    'route_check_task.check_worker.check': {'queue': 'route_check'},
    'route_check_task.check_worker.check_ignore_result': {'queue': 'route_check_ignore_result'}
}
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'
# CELERY_MESSAGE_COMPRESSION = 'gzip'
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_DISABLE_RATE_LIMITS = True
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
CELERYD_CONCURRENCY = cpu_count() / 2
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_PUBLISH_RETRY = True
CELERY_TASK_PUBLISH_RETRY_POLICY = {
    'max_retries': 3,
    'interval_start': 10,
    'interval_step': 5,
    'interval_max': 20
}
platforms.C_FORCE_ROOT = True

這里面是一些celery的配置參數(shù)

在上面include的add_task.py定義如下:

#encoding:utf8

from main import app

@app.task
def add(x,y):
    return x+y

啟動celery
celery -A main worker -l info -Ofair

  • -A 后面是包含celery定義的模塊,我們在main.py中定義了app = Celery...
    測試celery:
  • -l 日志打印的級別,這里是info
  • -Ofair 這個參數(shù)可以讓Celery更好的調(diào)度任務(wù)
# encoding:utf8
__author__ = 'brianyang'

import add_task

result = add_task.add.apply_async((1,2))
print type(result)
print result.ready()
print result.get()
print result.ready()

輸出是

<class 'celery.result.AsyncResult'>
False
3
True

當(dāng)調(diào)用result.get()時,如果還沒有返回結(jié)果,將會阻塞直到結(jié)果返回。這里需要注意的是,如果需要返回worker執(zhí)行的結(jié)果,必須在之前的config中配置CELERY_RESULT_BACKEND這個參數(shù),一般推薦使用Redis來保存執(zhí)行結(jié)果,如果不關(guān)心worker執(zhí)行結(jié)果,設(shè)置CELERY_IGNORE_RESULT=True就可以了,關(guān)閉緩存結(jié)果可以提高程序的執(zhí)行速度。
在上面的測試程序中,如果修改為:

# encoding:utf8
__author__ = 'brianyang'

import add_task

result = add_task.add.(1,2)
print type(result)
print result

輸出結(jié)果為:

<type 'int'>
3

相當(dāng)于直接本地調(diào)用了add方法,并沒有走Celery的調(diào)度。
通過flower的dashbord可以方便的監(jiān)控任務(wù)的執(zhí)行情況:


task list
task list

task detail
task detail

還可以對worker進(jìn)行重啟,關(guān)閉之類的操作


taks_op
taks_op

使用Celery將一個集中式的系統(tǒng)拆分為分布式的系統(tǒng)大概步驟就是:
  • 根據(jù)功能將耗時的模塊拆分出來,通過注解的形式讓Celery管理
  • 為拆分的模塊設(shè)置獨(dú)立的消息隊(duì)列
  • 調(diào)用者導(dǎo)入需要的模塊或方法,使用apply_async進(jìn)行異步的調(diào)用并根據(jù)需求關(guān)注結(jié)果。
  • 根據(jù)性能需要可以添加機(jī)器或增加worker數(shù)量,方便彈性管理。

需要注意的是:

  • 盡量為不同的task分配不同的queue,避免多個功能的請求堆積在同一個queue中。
  • celery -A main worker -l info -Ofair -Q add_queue啟動Celery時,可以通過參數(shù)Q加queue_name來指定該worker只接受指定queue中的tasks.這樣可以使不同的worker各司其職。
  • CELERY_ACKS_LATE可以讓你的Celery更加可靠,只有當(dāng)worker執(zhí)行完任務(wù)后,才會告訴MQ,消息被消費(fèi)。
  • CELERY_DISABLE_RATE_LIMITS Celery可以對任務(wù)消費(fèi)的速率進(jìn)行限制,如果你沒有這個需求,就關(guān)閉掉它吧,有益于會加速你的程序。

tornado-celery

tornado應(yīng)該是python中最有名的異步非阻塞模型的web框架,它使用的是單進(jìn)程輪詢的方式處理用戶請求,通過epoll來關(guān)注文件狀態(tài)的改變,只掃描文件狀態(tài)符發(fā)生變化的FD(文件描述符)。
由于tornado是單進(jìn)程輪詢模型,那么就不適合在接口請求后進(jìn)行長時間的耗時操作,而是應(yīng)該接收到請求后,將請求交給背后的worker去干,干完活兒后在通過修改FD告訴tornado我干完了,結(jié)果拿走吧。很明顯,Celery與tornado很般配,而tornado-celery是celery官方推薦的結(jié)合兩者的一個模塊。
整合兩者很容易,首先需要安裝:

  • tornado-celery
  • tornado-redis
    tornado代碼如下:
# encoding:utf8
__author__ = 'brianyang'

import tcelery
import tornado.gen
import tornado.web

from main import app
import add_task

tcelery.setup_nonblocking_producer(celery_app=app)


class CheckHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self):
        x = int(self.get_argument('x', '0'))
        y = int(self.get_argument('y', '0'))
        response = yield tornado.gen.Task(add_task.add.apply_async, args=[x, y])
        self.write({'results': response.result})
        self.finish


application = tornado.web.Application([
    (r"/add", CheckHandler),
])

if __name__ == "__main__":
    application.listen(8889)
    tornado.ioloop.IOLoop.instance().start()

在瀏覽器輸入:http://127.0.0.1:8889/add?x=1&y=2
結(jié)果為:

通過tornado+Celery可以顯著的提高系統(tǒng)的吞吐量。

Benchmark

使用Jmeter進(jìn)行壓測,60個進(jìn)程不間斷地的訪問服務(wù)器:
接口單獨(dú)訪問響應(yīng)時間一般在200~400ms

  • uwsgi + Flask方案:
    uwsgi關(guān)鍵配置:
processes       = 10
threads         = 3

Flask負(fù)責(zé)接受并處理請求,壓測結(jié)果:
qps是46,吞吐量大概是2700/min


uwsgi+Flask
uwsgi+Flask
  • tornado+Celery方案:
    Celery配置:
    CELERYD_CONCURRENCY = 10也就是10個worker(進(jìn)程),壓測結(jié)果:
    qps是139,吞吐量大概是8300/min
    tornado+Celery
    tornado+Celery

    從吞吐量和接口相應(yīng)時間各方面來看,使用tornado+Celery都能帶來更好的性能。

Supervisor

  • 什么是supervisor
    supervisor俗稱Linux后臺進(jìn)程管理器
  • 適合場景
    -- 需要長期運(yùn)行程序,除了nohup,我們有更好的supervisor
    -- 程序意外掛掉,需要重啟,讓supervisor來幫忙
    -- 遠(yuǎn)程管理程序,不想登陸服務(wù)器,來來來,supervisor提供了高大上的web操作界面.
    之前啟動Celery命令是celery -A main worker -l info -Ofair -Q common_check,當(dāng)你有10臺機(jī)器的時候,每次更新代碼后,都需要登陸服務(wù)器,然后更新代碼,最后再殺掉Celery進(jìn)程重啟,惡不惡心,簡直惡心死了。
    讓supervisor來,首先需要安裝:
    pip install supervisor
    配置文件示例:
[unix_http_server]
file=/tmp/supervisor.sock   ; path to your socket file
chmod=0777
username=admin
password=admin

[inet_http_server]
port=0.0.0.0:2345
username=admin
password=admin

[supervisord]
logfile=/var/log/supervisord.log ; supervisord log file
logfile_maxbytes=50MB       ; maximum size of logfile before rotation
logfile_backups=10          ; number of backed up logfiles
loglevel=info               ; info, debug, warn, trace
pidfile=/var/run/supervisord.pid ; pidfile location
nodaemon=false              ; run supervisord as a daemon
minfds=1024                 ; number of startup file descriptors
minprocs=200                ; number of process descriptors
user=root                   ; default user
childlogdir=/var/log/            ; where child log files will live

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use unix:// schem for a unix sockets.
username=admin
password=admin
[program:celery]
command=celery -A main worker -l info -Ofair

directory=/home/q/celeryTest
user=root
numprocs=1
stdout_logfile=/var/log/worker.log
stderr_logfile=/var/log/worker.log
autostart=true
autorestart=true
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 10

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; Set Celery priority higher than default (999)
; so, if rabbitmq is supervised, it will start first.
priority=1000

示例文件很長,不要怕,只需要復(fù)制下來,改改就可以
比較關(guān)鍵的幾個地方是:

[inet_http_server]
port=0.0.0.0:2345
username=admin
password=admin

這個可以讓你通過訪問http://yourhost:2345 ,驗(yàn)證輸入admin/admin的方式遠(yuǎn)程管理supervisor,效果如下:

remote supervisor
remote supervisor

[program:flower]這里就是你要托管給supervisor的程序的一些配置,其中autorestart=true可以在程序崩潰時自動重啟進(jìn)程,不信你用kill試試看。
剩下的部分就是一些日志位置的設(shè)置,當(dāng)前工作目錄設(shè)置等,so esay~

supervisor優(yōu)點(diǎn):

  • 管理進(jìn)程簡單,再也不用nohup & kill了。
  • 再也不用擔(dān)心程序掛掉了
  • web管理很方便

缺點(diǎn):

  • web管理雖然方便,但是每個頁面只能管理本機(jī)的supervisor,如果我有一百臺機(jī)器,那就需要打開100個管理頁面,太麻煩了.

怎么辦~

supervisor-easy閃亮登場

通過rpc調(diào)用獲取配置中的每一個supervisor程序的狀態(tài)并進(jìn)行管理,可以分組,分機(jī)器進(jìn)行批量/單個的管理。方便的不要不要的。來兩張截圖:

  • 分組管理:


    group
    group
  • 分機(jī)器管理:


    server
    server

    通過簡單的配置,可以方便的進(jìn)行管理。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容