CELERY 集群管理實(shí)現(xiàn)

這篇需要結(jié)合[CELERY 測試多服務(wù)器]來看。

主要功能實(shí)現(xiàn): 當(dāng)創(chuàng)建新任務(wù)時(shí),可以指定虛擬環(huán)境(服務(wù)器)執(zhí)行任務(wù)。

1.代碼

1.1 __init__.py

from celery import Celery

app = Celery('celery_app')
# 加載配置
app.config_from_object('celery_app.celeryconfig')

1.2 celeryconfig.py

from datetime import timedelta
from kombu import Queue, Exchange

result_serializer = 'json'

# 中間件
broker_url = 'redis://127.0.0.1:6379/7'
# 結(jié)果處理
result_backend = 'redis://127.0.0.1:6379/8'
# 時(shí)區(qū)
timezone = 'Asia/Shanghai'

# 導(dǎo)入任務(wù)模塊
imports = (
    'celery_app.task1',
    'celery_app.task2'
)
# 定時(shí)任務(wù)
beat_schedule = {
    'add-every-20-seconds': {
        'task': 'celery_app.task1.multiply',
        'schedule': timedelta(seconds=20),
        'args': (5, 7)
    },
    'add-every-10-seconds': {
        'task': 'celery_app.task2.add',
        'schedule': timedelta(seconds=10),
        'args': (100, 200)
    }
}
# 任務(wù)隊(duì)列
# 保持三個數(shù)據(jù)一致
# exchange 對應(yīng) 一個消息隊(duì)列(queue),即:通過"消息路由"的機(jī)制使exchange對應(yīng)queue,每個queue對應(yīng)每個worker
task_queues = (
    Queue('default', exchange=Exchange('default'), routing_key='default'),
    Queue('priority_high', exchange=Exchange('priority_high'), routing_key='priority_high'),
    Queue('priority_low', exchange=Exchange('priority_low'), routing_key='priority_low')
)

task_routes = {
    'celery_app.task1.multiply': {'queue': 'priority_high', 'routing_key': 'priority_high'},
    'celery_app.task2.add': {'queue': 'priority_low', 'routing_key': 'priority_low'}
}

1.3 task1.py

import time
from . import app


@app.task
def multiply(x, y):
    print('multiply')
    time.sleep(4)
    return x * y

1.4 task2.py

import time
from . import app


@app.task
def add(x, y):
    print('add')
    time.sleep(2)
    return x + y

2. 執(zhí)行

2.1 創(chuàng)建虛擬環(huán)境

創(chuàng)建使用pipenv( pipenv install )創(chuàng)建兩個虛擬環(huán)境,并將上面的代碼文件分別復(fù)制到相應(yīng)環(huán)境下。

2.2 啟動celery

使用pipenv shell打開虛擬環(huán)境
回到celery_app的上一層,使用一下命令
在虛擬環(huán)境(env1)中,
celery -A celery_app worker -l info -Q priority_high -P eventlet
在虛擬環(huán)境(env2)中
celery -A celery_app worker -l info -Q priority_low -P eventlet
作用是,queue=priority_high, routing_key=priority_high的任務(wù)都將在env1中執(zhí)行,
queue=priority_low, routing_key=priority_low的任務(wù)都將在env2中執(zhí)行。

2.3 創(chuàng)建任務(wù)

使用apply_async函數(shù)創(chuàng)建任務(wù),args表示傳參,queue結(jié)合routing_key確定使用的虛擬環(huán)境(服務(wù)器)

>>> for _ in range(30):
...     re = task1.multiply.apply_async(args=[20,20],queue='priority_high',routing_key='priority_high')
...     re2 = task2.add.apply_async(args=[20,20],queue='priority_low',routing_key='priority_low')
...     print(re.get())
...     print(re2.get())

在虛擬環(huán)境1中,只執(zhí)行了queue=priority_high, routing_key=priority_high的任務(wù)

[2018-09-06 16:48:01,572: INFO/MainProcess] Received task: celery_app.task1.mult
iply[3c058487-eee8-4505-9238-f549726680fb]
[2018-09-06 16:48:01,574: WARNING/MainProcess] multiply
[2018-09-06 16:48:01,575: INFO/MainProcess] Received task: celery_app.task1.mult
iply[ead3df3d-a23c-4a57-825a-8e85d5033fd1]
[2018-09-06 16:48:05,581: INFO/MainProcess] Task celery_app.task1.multiply[ead3d
f3d-a23c-4a57-825a-8e85d5033fd1] succeeded in 4.0090000000018335s: 400
[2018-09-06 16:48:05,583: INFO/MainProcess] Received task: celery_app.task1.mult
iply[954575ef-fe0a-4752-907b-ded4e4a4173b]
[2018-09-06 16:48:05,584: WARNING/MainProcess] multiply
[2018-09-06 16:48:05,586: INFO/MainProcess] Received task: celery_app.task1.mult
iply[864f666f-8a5c-490b-950f-5ac5b37a83b5]
[2018-09-06 16:48:09,591: INFO/MainProcess] Task celery_app.task1.multiply[864f6
66f-8a5c-490b-950f-5ac5b37a83b5] succeeded in 4.008999999998196s: 400

在虛擬環(huán)境2中,執(zhí)行了queue=priority_low, routing_key=priority_low的任務(wù)

[2018-09-06 16:47:49,519: WARNING/MainProcess] add
[2018-09-06 16:47:51,520: INFO/MainProcess] Task celery_app.task2.add[9ab04f96-d
1c5-4d3c-87c4-6e5707ef90c1] succeeded in 2.0119999999988067s: 40
[2018-09-06 16:47:53,542: INFO/MainProcess] Received task: celery_app.task2.add[
dcf76311-e251-4f8e-bf48-b3c422f63eec]
[2018-09-06 16:47:53,543: WARNING/MainProcess] add
[2018-09-06 16:47:55,545: INFO/MainProcess] Task celery_app.task2.add[dcf76311-e
251-4f8e-bf48-b3c422f63eec] succeeded in 2.0120000000024447s: 40
[2018-09-06 16:47:57,553: INFO/MainProcess] Received task: celery_app.task2.add[
60c201fa-bad4-4af4-9d02-bb68e718fb51]
[2018-09-06 16:47:57,554: WARNING/MainProcess] add
[2018-09-06 16:47:59,556: INFO/MainProcess] Task celery_app.task2.add[60c201fa-b
ad4-4af4-9d02-bb68e718fb51] succeeded in 2.0119999999988067s: 40
[2018-09-06 16:48:01,575: INFO/MainProcess] Received task: celery_app.task2.add[
1640912d-536e-43e2-9960-ebd58af5cb3f]
[2018-09-06 16:48:01,576: WARNING/MainProcess] add
[2018-09-06 16:48:03,580: INFO/MainProcess] Task celery_app.task2.add[1640912d-5
36e-43e2-9960-ebd58af5cb3f] succeeded in 2.0120000000024447s: 40

3.參考

https://my.oschina.net/hochikong/blog/518587
https://www.213.name/archives/1105

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容