celery 隊(duì)列

可以統(tǒng)一配置任務(wù)和隊(duì)列的關(guān)系
也可以啟動(dòng)worker的時(shí)候指定worker綁定的隊(duì)列
也可以調(diào)用task的時(shí)候指定到哪個(gè)隊(duì)列
也可以直接聲明task到達(dá)的隊(duì)列

#: Task state is unknown (assumed pending since you know the id).
PENDING = 'PENDING'
#: Task was received by a worker (only used in events).
RECEIVED = 'RECEIVED'
#: Task was started by a worker (:setting:`task_track_started`).
STARTED = 'STARTED'
#: Task succeeded
SUCCESS = 'SUCCESS'
#: Task failed
FAILURE = 'FAILURE'
#: Task was revoked.
REVOKED = 'REVOKED'
#: Task was rejected (only used in events).
REJECTED = 'REJECTED'
#: Task is waiting for retry.
RETRY = 'RETRY'
IGNORED = 'IGNORED'

READY_STATES = frozenset({SUCCESS, FAILURE, REVOKED})
UNREADY_STATES = frozenset({PENDING, RECEIVED, STARTED, REJECTED, RETRY})
EXCEPTION_STATES = frozenset({RETRY, FAILURE, REVOKED})
PROPAGATE_STATES = frozenset({FAILURE, REVOKED})
from celery import Celery
from config.config_util import get_yaml_cfg_by_key
from tools.logger_util import logger

celery_redis = get_yaml_cfg_by_key("celery_redis")


def app_sentinel_config(app):
    service_name = celery_redis.get("service_name")
    sentinels = celery_redis.get("sentinels")
    password = celery_redis.get("password")
    logger.info(f"{sentinels=},{service_name=},{password=}")
    if service_name and sentinels and password:
        sentinels = [(i.split(':')[0], i.split(':')[1]) for i in sentinels.split(',')]
        transport_options = {'master_name': service_name, 'sentinels': sentinels,
                             'socket_timeout': 0.1, 'password': password}
        app.conf.broker_transport_options = transport_options
        app.conf.result_backend_transport_options = transport_options


app = Celery("celery_check",
             broker=celery_redis["celery_broker"],
             backend=celery_redis["celery_backend"],
             include=['celery_check.tasks', 'celery_check.tasks2'])

app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = True
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.accept_content = ["json"]
app.conf.broker_connection_retry_on_startup = True
app.conf.task_default_queue = 'default'
# app.conf.worker_pool = 'threads'
# app.conf.worker_max_tasks_per_child = 1
# app.conf.worker_concurrency = 5
# app_sentinel_config(app)

app.conf.task_queues = {
    'default': {'exchange': 'default', 'routing_key': 'default'},
    'check_queue1': {'exchange': 'check_queue1', 'routing_key': 'check_queue1'},
    'check_queue2': {'exchange': 'check_queue2', 'routing_key': 'check_queue2'},
}
# 隊(duì)列路由
app.conf.task_routes = {
    'celery_check.tasks.celery_check': {'queue': 'check_queue1', 'routing_key': 'check_queue1'},
    'celery_check.tasks2.task_default': {'queue': 'default', 'routing_key': 'default'},
    'celery_check.tasks2.task_check_queue1': {'queue': 'check_queue1', 'routing_key': 'check_queue1'},
    'celery_check.tasks2.task_check_queue2': {'queue': 'check_queue2', 'routing_key': 'check_queue2'}
}

# app.conf.beat_schedule = {
#     'pdb_period_chck': {
#         'task': 'celery_check.tasks.pdb_period_task',
#         'schedule': 10,
#     },
# }

import time

from celery_check.celery_app import app
from tools.logger_util import logger


@app.task(bind=True, queue='default', max_retries=1)
def test_retry(self, t):
    logger.info(f"test_celery:{t}")
    try:
        from pathlib import Path
        f_read = Path(__file__).parent.joinpath("a.txt").read_text()
        if not f_read:
            raise RuntimeError("retry test")
        time.sleep(t)
    except Exception as e:
        """
            retry的參數(shù)可以有:
            exc:指定拋出的異常
            throw:重試時(shí)是否通知worker是重試任務(wù)
            eta:指定重試的時(shí)間/日期
            countdown:在多久之后重試(每多少秒重試一次,默認(rèn)3分鐘)
            max_retries:最大重試次數(shù)(默認(rèn)3次)
        """
        logger.info("開始重試")
        self.retry(exc=e, countdown=10)
    return t


@app.task(bind=True, queue='default', autoretry_for=(Exception,), max_retries=5, default_retry_delay=5)
def test_retry2(self, t):
    logger.info(f"test_celery:{t}")
    from pathlib import Path
    f_read = Path(__file__).parent.joinpath("a.txt").read_text()
    if not f_read:
        raise RuntimeError("retry test")
    time.sleep(t)
    print(123)


@app.task(bind=True, queue='default')
def test_not_retry(self, t):
    logger.info(f"test_celery:{t}")
    if t == 1:
        raise RuntimeError("retry test")
    time.sleep(t)
    print(123)


@app.task()
def task_check_default(msg):
    print(msg * 10)


@app.task(queue='check_queue11')
def task_check_queue1(msg):
    print(msg * 10)


@app.task(queue='check_queue22')
def task_check_queue2(msg):
    print(msg * 10)


@app.task()
def task_check_queue11(msg):
    print(msg * 10)


@app.task()
def task_check_queue22(msg):
    print(msg * 10)


import time

from celery import group
from celery.result import AsyncResult
from celery.result import ResultSet

from celery_check import tasks2
from tools.logger_util import logger
from celery_check.tasks2 import test_not_retry
from celery.result import result_from_tuple

if __name__ == '__main__':
    # res = tasks2.test_retry2.apply_async(args=(1,), queue='check_queue2')
    # # res = tasks.test_not_retry.apply_async(args=(1,), queue='check_queue2')
    # while True:
    #     print(res.status)
    #     time.sleep(1)

    # tasks2.task_check_queue1()
    # try:
    #     # res = tasks.test_celery.apply_async(args=(5.1,), time_limit=5)
    #     res = tasks.test_celery.apply_async(args=(3,), time_limit=4)
    #     res2 = AsyncResult(id=res.id)
    #     while True:
    #         print(res2.ready(), res2.status)
    #     # while True:
    #     #     print(res.ready())
    #     #     if res.ready():
    #     #         print(res.status)
    #     #         print(res.ready())
    #     #         print(res.get())
    #     #     # print(234)
    #     #     time.sleep(0.1)
    # except Exception as e:
    #     logger.exception(e)

    # from celery.result import AsyncResult
    #
    #
    # result = AsyncResult(id="a20c1d7b-552b-4e83-b2a7-5ee1fd8cde81")
    # result.failed()
    # result.revoke(terminate=True, signal='SIGKILL')
    # print(f"Task {result.id} revoked.")

    # # 按照task_routes
    # tasks2.task_check_default.apply_async(args=("task_check_default",))
    # tasks2.task_check_queue11.apply_async(args=("task_check_queue1",))
    # tasks2.task_check_queue22.apply_async(args=("task_check_queue2",))

    # # 運(yùn)行的時(shí)候動(dòng)態(tài)指定queue
    # tasks2.task_check_default.apply_async(args=("運(yùn)行的時(shí)候動(dòng)態(tài)指定task_check_default",), queue='check_queue22')
    # tasks2.task_check_queue11.apply_async(args=("運(yùn)行的時(shí)候動(dòng)態(tài)指定task_check_queue11",), queue='check_queue22')
    # tasks2.task_check_queue22.apply_async(args=("運(yùn)行的時(shí)候動(dòng)態(tài)指定task_check_queue22",), queue='check_queue22')

    # 不定義任務(wù)queue以及task_routes,靠啟動(dòng)worker的時(shí)候指定
    # celery -A celery_check.celery_app worker --loglevel=info -Q check_queue11
    # celery -A celery_check.celery_app worker --loglevel=info -Q check_queue22
    tasks2.task_check_default.apply_async(args=("運(yùn)行的時(shí)候動(dòng)態(tài)指定task_check_default",))
    tasks2.task_check_queue1.apply_async(args=("運(yùn)行的時(shí)候動(dòng)態(tài)指定task_check_queue11",))
    tasks2.task_check_queue2.apply_async(args=("運(yùn)行的時(shí)候動(dòng)態(tài)指定task_check_queue22",))

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容