為什么要使用celery?
Web應(yīng)用程序適用于請(qǐng)求和響應(yīng)周期。當(dāng)用戶訪問您的應(yīng)用程序的某個(gè)URL時(shí),Web瀏覽器向您的服務(wù)器發(fā)送一個(gè)請(qǐng)求。Django收到這個(gè)請(qǐng)求,并用它做一些事情。通常它涉及在數(shù)據(jù)庫中執(zhí)行查詢,處理數(shù)據(jù)。當(dāng)Django做他的事情并處理請(qǐng)求時(shí),用戶必須等待。當(dāng)Django完成處理請(qǐng)求的工作時(shí),它會(huì)發(fā)送一個(gè)響應(yīng)給最終會(huì)看到某些東西的用戶。
理想情況下,這個(gè)請(qǐng)求和響應(yīng)周期應(yīng)該很快,否則我們會(huì)讓用戶等待太久。更糟的是,我們的Web服務(wù)器一次只能服務(wù)一定數(shù)量的用戶。所以,如果這個(gè)過程很慢,它可以限制您的應(yīng)用程序一次可以提供的頁面數(shù)量。
大多數(shù)情況下,我們可以使用緩存,優(yōu)化數(shù)據(jù)庫查詢等來解決此問題。但是有些情況下,沒有其他的選擇:需要做大量的工作。一個(gè)報(bào)告頁面,大量的數(shù)據(jù)輸出,視頻/圖像處理是你可能想要使用celery的幾個(gè)例子。
我們?cè)诓皇窃谡麄€(gè)項(xiàng)目中使用celery,而只是用于耗費(fèi)時(shí)間的特定任務(wù)。這里的想法是盡可能快地響應(yīng)用戶,并將耗時(shí)的任務(wù)傳遞給隊(duì)列,以便在后臺(tái)執(zhí)行,并始終保持服務(wù)器準(zhǔn)備好響應(yīng)新的請(qǐng)求。
安裝
安裝celery最簡(jiǎn)單的方法是使用點(diǎn):
pip install Celery
現(xiàn)在我們必須安裝RabbitMQ。
在Ubuntu 16.04上安裝RabbitMQ
要將其安裝在較新的Ubuntu版本上非常簡(jiǎn)單:
apt-get install -y erlangapt-get install rabbitmq-server
然后啟用并啟動(dòng)RabbitMQ服務(wù):
systemctlenable rabbitmq-serversystemctl start rabbitmq-server
檢查狀態(tài)以確保一切運(yùn)行平穩(wěn):
systemctl status rabbitmq-server
在Mac上安裝RabbitMQ
????brew install rabbitmq
RabbitMQ腳本安裝到/usr/local/sbin。你可以把它添加到您的.bash_profile或.profile。
????vim ~/.bash_profile
然后將其添加到文件的底部:
????export PATH=$PATH:/usr/local/sbin
重新啟動(dòng)終端,確保更改已生效。
現(xiàn)在您可以使用以下命令啟動(dòng)RabbitMQ服務(wù)器:
????rabbitmq-server

celery基本設(shè)置
首先,考慮名為core的應(yīng)用程序名為mysite的以下Django項(xiàng)目:

將CELERY_BROKER_URL配置添加到settings.py文件中:
settings.py
? ?CELERY_BROKER_URL='amqp://localhost'
除了settings.py和urls.py文件之外,我們還要創(chuàng)建一個(gè)名為celery.py的新文件。
celery.py
import ?os
?from ?celery import ? ?Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE','mysite.settings')
app=Celery('mysite')app.config_from_object('django.conf:settings',namespace='CELERY')
app.autodiscover_tasks()
現(xiàn)在編輯項(xiàng)目根目錄下的__init__.py文件:
__init__.py
from.celeryimportappascelery_app__all__=['celery_app']
這將確保每次Django啟動(dòng),我們的celery應(yīng)用程序是重要的。
創(chuàng)建我們的第一個(gè)芹菜任務(wù)。
我們可以在Django應(yīng)用程序中創(chuàng)建一個(gè)名為tasks.py的文件,并將所有的Celery任務(wù)放到這個(gè)文件中。我們?cè)陧?xiàng)目根目錄中創(chuàng)建的Celery應(yīng)用程序?qū)⑹占贗NSTALLED_APPS?配置中列出的所有Django應(yīng)用程序中定義的所有任務(wù)。
為了測(cè)試目的,我們創(chuàng)建一個(gè)Celery任務(wù),生成一些隨機(jī)的用戶帳戶。
app/ tasks.py
from __future__import absolute_import, unicode_literals
import string
from celeryimport shared_task
from django.contrib.auth.modelsimport User
from django.utils.cryptoimport get_random_string
@shared_task
def create_random_user_accounts(total):
? ? for i in range(total):
? ? ? ? username = 'user_{}'.format(get_random_string(10, string.ascii_letters))
? ? ? ? email = '{}@example.com'.format(username)
? ? ? ? password = get_random_string(50)
? ? ? ? User.objects.create_user(username=username, email=email, password=password)
? ? return '{} random users created with success!'.format(total)
這里的重要部分是:
from celery import shared_task
@shared_task
def name_of_your_function(optional_param):
? ?pass# do something heavy
然后我定義了一個(gè)表單和一個(gè)視圖來處理我的Celery任務(wù):
forms.py
from djangoimport forms
from django.core.validatorsimport MinValueValidator, MaxValueValidator
class GenerateRandomUserForm(forms.Form):
????total = forms.IntegerField(
????????validators=[
????????MinValueValidator(50),
????????MaxValueValidator(500)
????????]
)
這個(gè)是需要一個(gè)50到500之間的正整數(shù)字段。如圖:

然后在views中:
from django.contrib.auth.modelsimport User
from django.contribimport messages
from django.views.generic.editimport FormView
from django.shortcutsimport redirect
from .formsimport GenerateRandomUserForm
from .tasksimport create_random_user_accounts
class GenerateRandomUserView(FormView):
template_name ='core/generate_random_users.html'
? ? form_class = GenerateRandomUserForm
def form_valid(self, form):
????total = form.cleaned_data.get('total')
????create_random_user_accounts.delay(total) ?# 重點(diǎn) 就是便是這個(gè)任務(wù)在cerely后臺(tái)執(zhí)行。然后Django繼續(xù)處理我的視圖,GenerateRandomUserView并順利返回給用戶。
????messages.success(self.request,'We are generating your random users! Wait a moment and ????refresh this page.')
????return redirect('users_list')
啟動(dòng)工作進(jìn)程
打開一個(gè)新的終端選項(xiàng)卡,然后運(yùn)行以下命令:
celery -A mysite worker -l info
將mysite更改為您的項(xiàng)目名稱。結(jié)果是這樣的:

現(xiàn)在我們可以測(cè)試它。我提交了500個(gè)表單,創(chuàng)建了500個(gè)隨機(jī)用戶。

同時(shí),檢查celery worker過程:
[2017-08-20 19:11:17,485: INFO/MainProcess] Received task:mysite.core.tasks.create_random_user_accounts[8799cfbd-deae-41aa-afac-95ed4cc859b0]
然后幾秒鐘后,如果我們刷新頁面,用戶在那里:

如果我們?cè)俅尾榭辞鄄斯ぷ鬟M(jìn)程,我們可以看到它完成了執(zhí)行:
[2017-08-20 19:11:45,721: INFO/ForkPoolWorker-2] Taskmysite.core.tasks.create_random_user_accounts[8799cfbd-deae-41aa-afac-95ed4cc859b0] succeededin28.225658523035236s:'500 random users created with success!'
用Supervisord管理生產(chǎn)中的工人流程
如果您將應(yīng)用程序部署到DigitalOcean等?VPS?,則需要在后臺(tái)運(yùn)行輔助進(jìn)程。在我的教程中,我喜歡使用Supervisord來管理Gunicorn的工作人員,所以它通常與Celery很合適。
首先安裝它(在Ubuntu上):
sudo apt-get install supervisor
然后創(chuàng)建一個(gè)文件名為mysite的-celery.conf的文件夾中:/etc/supervisor/conf.d/mysite-celery.conf:
[program:mysite-celery]
command=/home/mysite/bin/celery worker -A mysite --loglevel=INFO
directory=/home/mysite/mysite
user=nobody
numprocs=1
stdout_logfile=/home/mysite/logs/celery.log
stderr_logfile=/home/mysite/logs/celery.log
autostart=true
autorestart=true
startsecs=10
; Need to waitfor currently executing tasks to finish at shutdown.
; Increase thisif you have verylong running tasks.
stopwaitsecs =600
stopasgroup=true
; Set Celery priority higher than default (999)
; so,if rabbitmqis supervised, it will start first.
priority=1000
在下面的例子中,Django項(xiàng)目是在虛擬環(huán)境中。路徑為/ home / mysite /。
現(xiàn)在重新讀取配置并添加新的過程:
sudo supervisorctl reread
sudo supervisorctl update
如果您不熟悉將Django部署到生產(chǎn)服務(wù)器并使用Supervisord,可以參考:如何將Django應(yīng)用程序部署到supervisor。