Celery文檔總結(jié)(一)

目前我接觸的distributed task queue比較有名的是python的celery和go的nsq, 本文是我在學(xué)習(xí)celery的一些總結(jié)

Celery 是什么?

Celery是一個(gè)簡(jiǎn)單, 靈活, 可靠的分布式系統(tǒng), 用于處理大量消息, 同時(shí)為操作提供維護(hù)此類系統(tǒng)所需的工具. 它是一個(gè)任務(wù)隊(duì)列, 專注于實(shí)時(shí)處理, 同時(shí)還支持任務(wù)調(diào)度

Celery 優(yōu)點(diǎn)

  • 簡(jiǎn)單: celery使用很簡(jiǎn)單, 你可以不用配置就可以啟動(dòng)一個(gè)任務(wù)
  • 高度可用: worker和clients會(huì)自動(dòng)處理失敗或丟失的消息
  • 快: 一個(gè)celery每分鐘可以處理數(shù)百萬(wàn)的任務(wù)(使用RabbitMQ并做好優(yōu)化)
  • 靈活: 幾乎Celery的每個(gè)部分都可以自行擴(kuò)展或使用, 自定義池實(shí)現(xiàn), 序列化器, 壓縮方案, 日志記錄, 調(diào)度程序, 消費(fèi)者, 生產(chǎn)者, 代理傳輸?shù)鹊?/li>

Celery 架構(gòu)

celery架構(gòu)

Broker: RabbitMQ, Redis
Backend: Redis, Memcached, SQLAlchemy, Cassandra, Elasticsearch
Concurrency: Prefork, Eventlet, Gevent
Serialization: Pickle, Json, Yaml, Zlib, Bzip2

Celery 特性

Monitoring 自檢

  • Celery命令行(可以通過(guò)celery <command> --help了解celery的各種命令)
  • Flower是一個(gè)Django搭建的celery實(shí)時(shí)監(jiān)測(cè)拓展
  • RabbitMQ(rabbitmqctl list_queue)和Redis, 查看各種broker和backend數(shù)據(jù)
  • 通過(guò)代碼使用celery events跟蹤任務(wù)
from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,  # 任務(wù)狀態(tài):處理函數(shù)
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

Scheduling 調(diào)度

celery beat是一個(gè)調(diào)度器, 它定期啟動(dòng)任務(wù), 然后由群集中的可用工作節(jié)點(diǎn)執(zhí)行

  1. 將任務(wù)添加到節(jié)拍調(diào)度器 beat shedule
from celery import Celery
from celery.schedules import crontab

app = Celery()
app.conf.timezone =  'Asian/Shanghai'  # 需要設(shè)置時(shí)區(qū), 默認(rèn)是UTC,
app.conf.beat_schedule = {  # 配置中設(shè)置定時(shí)任務(wù)
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,  
        'schedule': crontab(hour=7, minute=30, day_of_week=1), 
        'schedule': solar('sunset', -37.81753, 144.96715),
        'args': (16, 16)
    },
}

@app.task
def add(a, b):
    return a + b

# 使用定時(shí)函數(shù)設(shè)置定時(shí)任務(wù)
@app.on_after_configure.connect  # 這個(gè)裝飾器確保了配置完成后才調(diào)用函數(shù)
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)
    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)
  1. 啟動(dòng)定時(shí)任務(wù)celery -A proj beat

Work-Flows 任務(wù)流

Signatures : 包裝單個(gè)任務(wù)調(diào)用的參數(shù), 關(guān)鍵字參數(shù)和執(zhí)行選項(xiàng), 以便可以將其傳遞給函數(shù), 甚至可以通過(guò)線路進(jìn)行序列化和發(fā)送

s = add.s(2, 2, {'debug': True}).set(countdown=1)  # 星形參數(shù)的快捷方式, 通過(guò)set()函數(shù)定義options
s.args  # (2, 2)
s.kwargs  # {'debug': True}
s.options  # {'countdown': 10}

# Partial可以實(shí)現(xiàn)部分用于回調(diào)
s()  # 相當(dāng)在當(dāng)前進(jìn)程執(zhí)行add(2, 2)
s.delay()  # 相當(dāng)在當(dāng)前進(jìn)程執(zhí)行add.delay(2, 2)
s.apply_async()  # 相當(dāng)在當(dāng)前進(jìn)程執(zhí)行add.apply_asybc(2, 2)
p = add.s(2)  # 所有參數(shù)都可以在后續(xù)流程中傳入
p.apply_async(args=(4,),kwargs={'debug': True}, countdown=1)

# Immutable不希望帶上上一個(gè)任務(wù)的結(jié)果
add.apply_async((2, 2), link=reset_buffers.signature(immutable=True))
add.apply_async((2, 2), link=reset_buffers.si())
add.si(2, 3)  # 最好使用這個(gè)方式, 簡(jiǎn)單

# Callbacks可以用于任務(wù)回調(diào)
add.apply_async((2, 2), link=other_task.s())

# Chain 任務(wù)鏈?zhǔn)酵瓿? 上一個(gè)結(jié)果可以用于后續(xù)任務(wù)
from celery import chain
res = chain(add.s(2, 2), add.s(4), add.s(8))()
res.get()  # 16
res1 = (add.s(2, 2) | add.s(4) | add.s(8))().get()  # 鏈?zhǔn)絺鬟f
res1.get()  # 16
res1.parent.get() # 8
res1.parent.parent.get()  # 4
res2 = (add.si(2, 2) | add.si(4, 8) | add.si(10, 10))()  # 結(jié)果不傳遞
res2.get()  # 20
res2.parent.get() # 12
res2.parent.parent.get()  # 4

# Group 創(chuàng)建一組要并行執(zhí)行的任務(wù)
from celery import group
res = group(add.s(i, i) for i in xrange(10))()
res.get(timeout=1)  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

# Chord 所有任務(wù)完成執(zhí)行時(shí)添加要調(diào)用的回調(diào)
from celery import chord
res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
res.get()  # 90

# Map he Chunk
xsum.map([range(10), range(100)])
res = add.chunks(zip(range(100), range(100)), 10)()

Time & Rate Limits 時(shí)間評(píng)率限制

可以控制單個(gè)任務(wù)的請(qǐng)求評(píng)率和執(zhí)行時(shí)間限制

# 時(shí)間限制有soft和hard之分, 運(yùn)行超過(guò)soft會(huì)拋出SoftTimeLimitExceeded異常, 超過(guò)hard會(huì)直接終止任務(wù), 可以在configuration中設(shè)置
task_soft_time_limit = 60
task_time_limit = 120
task_default_rate_limit = '200/m'

# 可以在任務(wù)中設(shè)置
@app.task(time_soft_limit=60 ,time_limit=120, rate_limit='200/m')
def test():
    pass

# 遠(yuǎn)程調(diào)用下面可以在運(yùn)行中修改任務(wù)時(shí)間和評(píng)率限制
app.control.time_limit('tasks.crawl_the_web', soft=60, hard=120, reply=True)  
app.control.rate_limit('myapp.mytask', '200/m', destination=['celery@worker1.example.com'])  # destination可以指定相應(yīng)節(jié)點(diǎn)限制評(píng)率

Resource Leak Protection 資源泄露保護(hù)

可以通過(guò)設(shè)置來(lái)保護(hù)工作節(jié)點(diǎn)的資源不被泄露, 使用此選項(xiàng), 您可以配置工作程序在被新進(jìn)程替換之前可以執(zhí)行的最大資源配置, 如果您無(wú)法控制資源使用, 例如來(lái)自閉源C擴(kuò)展, 則此功能非常有用

  • Max tasks per child setting 最大任務(wù)數(shù), 可以使用workers [--max-tasks-per-child]
  • Max memory per child 最大內(nèi)存, 可以使用workers [--max-memory-per-child]
# 池工作進(jìn)程在用新工作進(jìn)程替換之前可以執(zhí)行的最大任務(wù)數(shù), 默認(rèn)是沒(méi)有限制的
worker_max_tasks_per_child = 10
# 在新worker替換之前,worker可能消耗的最大駐留內(nèi)存量, 如果單個(gè)任務(wù)導(dǎo)致worker超過(guò)此限制, 則任務(wù)將完成,worker將被替換
worker_max_memory_per_child = 12000  # 12MB

User Components 用戶組件

可以定制每個(gè)工作組件, 并且可以由用戶定義其他組件. 工作人員使用“bootsteps”構(gòu)建 - 一個(gè)依賴關(guān)系圖, 可以對(duì)工人的內(nèi)部進(jìn)行細(xì)粒度控制

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,854評(píng)論 0 10
  • 1.定義: Celery是一個(gè)異步的任務(wù)隊(duì)列(也叫做分布式任務(wù)隊(duì)列) 2.工作結(jié)構(gòu) Celery分為3個(gè)部...
    四號(hào)公園_2016閱讀 28,950評(píng)論 5 60
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評(píng)論 19 139
  • =========================================================...
    lavor閱讀 3,647評(píng)論 0 5
  • 早在三月底,我就認(rèn)識(shí)到了我自己的寫文問(wèn)題,我就知道是因?yàn)槲易x書少,思想認(rèn)識(shí)不到位,寫出的文章沒(méi)有深度和廣度。 將近...
    靜心專注閱讀 600評(píng)論 2 1

友情鏈接更多精彩內(nèi)容