可以統(tǒng)一配置任務(wù)和隊(duì)列的關(guān)系
也可以啟動(dòng)worker的時(shí)候指定worker綁定的隊(duì)列
也可以調(diào)用task的時(shí)候指定到哪個(gè)隊(duì)列
也可以直接聲明task到達(dá)的隊(duì)列
#: Task state is unknown (assumed pending since you know the id).
PENDING = 'PENDING'
#: Task was received by a worker (only used in events).
RECEIVED = 'RECEIVED'
#: Task was started by a worker (:setting:`task_track_started`).
STARTED = 'STARTED'
#: Task succeeded
SUCCESS = 'SUCCESS'
#: Task failed
FAILURE = 'FAILURE'
#: Task was revoked.
REVOKED = 'REVOKED'
#: Task was rejected (only used in events).
REJECTED = 'REJECTED'
#: Task is waiting for retry.
RETRY = 'RETRY'
IGNORED = 'IGNORED'
READY_STATES = frozenset({SUCCESS, FAILURE, REVOKED})
UNREADY_STATES = frozenset({PENDING, RECEIVED, STARTED, REJECTED, RETRY})
EXCEPTION_STATES = frozenset({RETRY, FAILURE, REVOKED})
PROPAGATE_STATES = frozenset({FAILURE, REVOKED})
from celery import Celery
from config.config_util import get_yaml_cfg_by_key
from tools.logger_util import logger
celery_redis = get_yaml_cfg_by_key("celery_redis")
def app_sentinel_config(app):
service_name = celery_redis.get("service_name")
sentinels = celery_redis.get("sentinels")
password = celery_redis.get("password")
logger.info(f"{sentinels=},{service_name=},{password=}")
if service_name and sentinels and password:
sentinels = [(i.split(':')[0], i.split(':')[1]) for i in sentinels.split(',')]
transport_options = {'master_name': service_name, 'sentinels': sentinels,
'socket_timeout': 0.1, 'password': password}
app.conf.broker_transport_options = transport_options
app.conf.result_backend_transport_options = transport_options
app = Celery("celery_check",
broker=celery_redis["celery_broker"],
backend=celery_redis["celery_backend"],
include=['celery_check.tasks', 'celery_check.tasks2'])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = True
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.accept_content = ["json"]
app.conf.broker_connection_retry_on_startup = True
app.conf.task_default_queue = 'default'
# app.conf.worker_pool = 'threads'
# app.conf.worker_max_tasks_per_child = 1
# app.conf.worker_concurrency = 5
# app_sentinel_config(app)
app.conf.task_queues = {
'default': {'exchange': 'default', 'routing_key': 'default'},
'check_queue1': {'exchange': 'check_queue1', 'routing_key': 'check_queue1'},
'check_queue2': {'exchange': 'check_queue2', 'routing_key': 'check_queue2'},
}
# 隊(duì)列路由
app.conf.task_routes = {
'celery_check.tasks.celery_check': {'queue': 'check_queue1', 'routing_key': 'check_queue1'},
'celery_check.tasks2.task_default': {'queue': 'default', 'routing_key': 'default'},
'celery_check.tasks2.task_check_queue1': {'queue': 'check_queue1', 'routing_key': 'check_queue1'},
'celery_check.tasks2.task_check_queue2': {'queue': 'check_queue2', 'routing_key': 'check_queue2'}
}
# app.conf.beat_schedule = {
# 'pdb_period_chck': {
# 'task': 'celery_check.tasks.pdb_period_task',
# 'schedule': 10,
# },
# }
import time
from celery_check.celery_app import app
from tools.logger_util import logger
@app.task(bind=True, queue='default', max_retries=1)
def test_retry(self, t):
logger.info(f"test_celery:{t}")
try:
from pathlib import Path
f_read = Path(__file__).parent.joinpath("a.txt").read_text()
if not f_read:
raise RuntimeError("retry test")
time.sleep(t)
except Exception as e:
"""
retry的參數(shù)可以有:
exc:指定拋出的異常
throw:重試時(shí)是否通知worker是重試任務(wù)
eta:指定重試的時(shí)間/日期
countdown:在多久之后重試(每多少秒重試一次,默認(rèn)3分鐘)
max_retries:最大重試次數(shù)(默認(rèn)3次)
"""
logger.info("開始重試")
self.retry(exc=e, countdown=10)
return t
@app.task(bind=True, queue='default', autoretry_for=(Exception,), max_retries=5, default_retry_delay=5)
def test_retry2(self, t):
logger.info(f"test_celery:{t}")
from pathlib import Path
f_read = Path(__file__).parent.joinpath("a.txt").read_text()
if not f_read:
raise RuntimeError("retry test")
time.sleep(t)
print(123)
@app.task(bind=True, queue='default')
def test_not_retry(self, t):
logger.info(f"test_celery:{t}")
if t == 1:
raise RuntimeError("retry test")
time.sleep(t)
print(123)
@app.task()
def task_check_default(msg):
print(msg * 10)
@app.task(queue='check_queue11')
def task_check_queue1(msg):
print(msg * 10)
@app.task(queue='check_queue22')
def task_check_queue2(msg):
print(msg * 10)
@app.task()
def task_check_queue11(msg):
print(msg * 10)
@app.task()
def task_check_queue22(msg):
print(msg * 10)
import time
from celery import group
from celery.result import AsyncResult
from celery.result import ResultSet
from celery_check import tasks2
from tools.logger_util import logger
from celery_check.tasks2 import test_not_retry
from celery.result import result_from_tuple
if __name__ == '__main__':
# res = tasks2.test_retry2.apply_async(args=(1,), queue='check_queue2')
# # res = tasks.test_not_retry.apply_async(args=(1,), queue='check_queue2')
# while True:
# print(res.status)
# time.sleep(1)
# tasks2.task_check_queue1()
# try:
# # res = tasks.test_celery.apply_async(args=(5.1,), time_limit=5)
# res = tasks.test_celery.apply_async(args=(3,), time_limit=4)
# res2 = AsyncResult(id=res.id)
# while True:
# print(res2.ready(), res2.status)
# # while True:
# # print(res.ready())
# # if res.ready():
# # print(res.status)
# # print(res.ready())
# # print(res.get())
# # # print(234)
# # time.sleep(0.1)
# except Exception as e:
# logger.exception(e)
# from celery.result import AsyncResult
#
#
# result = AsyncResult(id="a20c1d7b-552b-4e83-b2a7-5ee1fd8cde81")
# result.failed()
# result.revoke(terminate=True, signal='SIGKILL')
# print(f"Task {result.id} revoked.")
# # 按照task_routes
# tasks2.task_check_default.apply_async(args=("task_check_default",))
# tasks2.task_check_queue11.apply_async(args=("task_check_queue1",))
# tasks2.task_check_queue22.apply_async(args=("task_check_queue2",))
# # 運(yùn)行的時(shí)候動(dòng)態(tài)指定queue
# tasks2.task_check_default.apply_async(args=("運(yùn)行的時(shí)候動(dòng)態(tài)指定task_check_default",), queue='check_queue22')
# tasks2.task_check_queue11.apply_async(args=("運(yùn)行的時(shí)候動(dòng)態(tài)指定task_check_queue11",), queue='check_queue22')
# tasks2.task_check_queue22.apply_async(args=("運(yùn)行的時(shí)候動(dòng)態(tài)指定task_check_queue22",), queue='check_queue22')
# 不定義任務(wù)queue以及task_routes,靠啟動(dòng)worker的時(shí)候指定
# celery -A celery_check.celery_app worker --loglevel=info -Q check_queue11
# celery -A celery_check.celery_app worker --loglevel=info -Q check_queue22
tasks2.task_check_default.apply_async(args=("運(yùn)行的時(shí)候動(dòng)態(tài)指定task_check_default",))
tasks2.task_check_queue1.apply_async(args=("運(yùn)行的時(shí)候動(dòng)態(tài)指定task_check_queue11",))
tasks2.task_check_queue2.apply_async(args=("運(yùn)行的時(shí)候動(dòng)態(tài)指定task_check_queue22",))