Distribute Tasks with Celery and RabbitMQ

Celery is an asynchronous task queue(異步任務(wù)隊(duì)列).

RabbitMQ is a message broker(消息代理,消息中間件) widely used with Celery.

基本概念

Broker

The Broker (RabbitMQ) is used for dispatching tasks to task queues according to some routing rules, and then delivering tasks from task queues to workers.
Broker (RabbitMQ)按照一定的路由規(guī)則派遣任務(wù)到任務(wù)隊(duì)列,然后把任務(wù)隊(duì)列中的任務(wù)交付給 workers。

Consumer (Celery Workers)

The Consumer is the one or multiple Celery workers executing the tasks. You could start many workers depending on your use case.
Consumer 是一個(gè)或多個(gè)正在執(zhí)行任務(wù)的 Celery workers。你可以開(kāi)始許多 workers 根據(jù)您的使用情況。

Result Backend

The Result Backend is used for storing the results of your tasks. However, it is not a required element, so if you do not include it in your settings, you cannot access the results of your tasks.
Result Backend 用于存儲(chǔ)你的任務(wù)結(jié)果。但是,它不是必需的元素,因此,如果你的設(shè)置中不包括它,你便不能訪問(wèn)你的任務(wù)結(jié)果。

安裝 Celery

pip install celery

選擇 Broker

Why do we need another thing called broker?

It’s because Celery does not actually construct a message queue itself, so it needs an extra(額外的) message transport (a broker) to do that work.

In fact, you can choose from a few different brokers, like RabbitMQ, Redis or a database.

We are using RabbitMQ as our broker because it is feature-complete, stable and recommended by Celery.

啟動(dòng) RabbitMQ

You will see similar output if the RabbitMQ server starts successfully.


配置 RabbitMQ

要使用 Celery,我們需要?jiǎng)?chuàng)建一個(gè) RabbitMQ 用戶、一個(gè)虛擬主機(jī),并且允許這個(gè)用戶訪問(wèn)這個(gè)虛擬主機(jī):

rabbitmqctl add_user myuser mypassword

rabbitmqctl add_vhost myvhost

rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

There are three kinds of operations in RabbitMQ: configure, write and read.

The "." "." ".*" means that the user “myuser” will have all configure, write and read permissions.

A Simple Demo Project

Project Structure
celery_demo
    __init__.py
    celery.py
    tasks.py
    run_tasks.py
celery.py
# -*-coding:utf-8-*-
from __future__ import absolute_import
from celery import Celery


app = Celery('celery_demo',
             broker='amqp://myuser:mypassword@localhost/myvhost',
             backend='rpc://',
             include=['celery_demo.tasks'])

The first argument of Celery is just the name of the project package, which is “celery_demo”.
The broker argument specifies the broker URL, which should be the RabbitMQ we started earlier. Note that the format of broker URL should be: transport://userid:password@hostname:port/virtual_host

For RabbitMQ, the transport is amqp.

The backend argument specifies a backend URL. A backend in Celery is used for storing the task results. So if you need to access the results of your task when it is finished, you should set a backend for Celery.
rpc means sending the results back as AMQP messages, which is an acceptable format for our demo. More choices for message formats can be found here.
The include argument specifies a list of modules that you want to import when Celery worker starts. We add the tasks module here so that the worker can find our task.

tasks.py
# -*-coding:utf-8-*-
from .celery import app
import time


@app.task
def longtime_add(x, y):
    print 'long time task begins'
    time.sleep(5)
    print 'long time task finished'
    return x + y
run_tasks.py
# -*-coding:utf-8-*-
from .tasks import longtime_add
import time

if __name__ == '__main__':
    result = longtime_add.delay(1, 2)
    # at this time, our task is not finished, so it will return False
    print 'Task finished? ', result.ready()
    print 'Task result: ', result.result
    # sleep 10 seconds to ensure the task has been finished
    time.sleep(10)
    print 'Task finished? ', result.ready()
    print 'Task result: ', result.result

Here, we call the task longtime_add using the delay method, which is needed if we want to process the task asynchronously.

Start Celery Worker

Now, we can start Celery worker using the command below (run in the parent folder of our project folder celery_demo):
celery -A celery_demo worker --loglevel=info

You will see something like this if Celery successfully connects to RabbitMQ:


Run Tasks

In another console, input the following (run in the parent folder of our project folder celery_demo):
python -m celery_demo.run_tasks
Now if you look at the Celery console, you will see that our worker received the task:

In the current console, you will see the following output:


This is the expected behavior. At first, our task was not ready, and the result was None. After 10 seconds, our task has been finished and the result is 3.

Monitor Celery in Real Time

Flower is a real-time web-based monitor for Celery. Using Flower, you could easily monitor your task progress and history.

To start the Flower web console, we need to run the following command (run in the parent folder of our project folder celery_demo):
celery -A test_celery flower
Flower will run a server with default port 5555, and you can access the web console at http://localhost:5555.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • **2014真題Directions:Read the following text. Choose the be...
    又是夜半驚坐起閱讀 11,050評(píng)論 0 23
  • 在對(duì)android進(jìn)行開(kāi)發(fā)時(shí),常常需要用到第三方的jar包,例如lite-orm 這里以該包為例講解如何導(dǎo)入 1 ...
    Ry_L閱讀 403評(píng)論 0 0
  • 人均餐飲消費(fèi)7340元 南京吃貨全國(guó)第一 來(lái)源:新華報(bào)業(yè)網(wǎng) 人均餐飲消費(fèi)7340元 南京吃貨全國(guó)第一 南京吃貨的實(shí)...
    Terry1018閱讀 485評(píng)論 0 0
  • 點(diǎn)開(kāi)伊洛發(fā)來(lái)的照片,是兩個(gè)人的背影照。照片很模糊,只依稀看得清是一對(duì)高中生裝扮模樣的男女同學(xué)。從拍照角度來(lái)看,應(yīng)該...
    染墨以待閱讀 568評(píng)論 2 3
  • 本文參加#我的軍訓(xùn)我來(lái)說(shuō)#活動(dòng),本人承諾,文章內(nèi)容為原創(chuàng),且未在其他平臺(tái)發(fā)表過(guò)。 九月的風(fēng)涼爽但...
    隨意取個(gè)昵稱閱讀 272評(píng)論 0 0

友情鏈接更多精彩內(nèi)容