目前我接觸的distributed task queue比較有名的是python的celery和go的nsq, 本文是我在學(xué)習(xí)celery的一些總結(jié)
Celery 是什么?
Celery是一個(gè)簡(jiǎn)單, 靈活, 可靠的分布式系統(tǒng), 用于處理大量消息, 同時(shí)為操作提供維護(hù)此類系統(tǒng)所需的工具. 它是一個(gè)任務(wù)隊(duì)列, 專注于實(shí)時(shí)處理, 同時(shí)還支持任務(wù)調(diào)度
Celery 優(yōu)點(diǎn)
- 簡(jiǎn)單: celery使用很簡(jiǎn)單, 你可以不用配置就可以啟動(dòng)一個(gè)任務(wù)
- 高度可用: worker和clients會(huì)自動(dòng)處理失敗或丟失的消息
- 快: 一個(gè)celery每分鐘可以處理數(shù)百萬(wàn)的任務(wù)(使用RabbitMQ并做好優(yōu)化)
- 靈活: 幾乎Celery的每個(gè)部分都可以自行擴(kuò)展或使用, 自定義池實(shí)現(xiàn), 序列化器, 壓縮方案, 日志記錄, 調(diào)度程序, 消費(fèi)者, 生產(chǎn)者, 代理傳輸?shù)鹊?/li>
Celery 架構(gòu)

Broker: RabbitMQ, Redis
Backend: Redis, Memcached, SQLAlchemy, Cassandra, Elasticsearch
Concurrency: Prefork, Eventlet, Gevent
Serialization: Pickle, Json, Yaml, Zlib, Bzip2
Celery 特性
Monitoring 自檢
- Celery命令行(可以通過(guò)
celery <command> --help了解celery的各種命令) - Flower是一個(gè)Django搭建的celery實(shí)時(shí)監(jiān)測(cè)拓展
- RabbitMQ(
rabbitmqctl list_queue)和Redis, 查看各種broker和backend數(shù)據(jù) - 通過(guò)代碼使用celery events跟蹤任務(wù)
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks, # 任務(wù)狀態(tài):處理函數(shù)
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
Scheduling 調(diào)度
celery beat是一個(gè)調(diào)度器, 它定期啟動(dòng)任務(wù), 然后由群集中的可用工作節(jié)點(diǎn)執(zhí)行
- 將任務(wù)添加到節(jié)拍調(diào)度器 beat shedule
from celery import Celery
from celery.schedules import crontab
app = Celery()
app.conf.timezone = 'Asian/Shanghai' # 需要設(shè)置時(shí)區(qū), 默認(rèn)是UTC,
app.conf.beat_schedule = { # 配置中設(shè)置定時(shí)任務(wù)
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'schedule': solar('sunset', -37.81753, 144.96715),
'args': (16, 16)
},
}
@app.task
def add(a, b):
return a + b
# 使用定時(shí)函數(shù)設(shè)置定時(shí)任務(wù)
@app.on_after_configure.connect # 這個(gè)裝飾器確保了配置完成后才調(diào)用函數(shù)
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
- 啟動(dòng)定時(shí)任務(wù)
celery -A proj beat
Work-Flows 任務(wù)流
Signatures : 包裝單個(gè)任務(wù)調(diào)用的參數(shù), 關(guān)鍵字參數(shù)和執(zhí)行選項(xiàng), 以便可以將其傳遞給函數(shù), 甚至可以通過(guò)線路進(jìn)行序列化和發(fā)送
s = add.s(2, 2, {'debug': True}).set(countdown=1) # 星形參數(shù)的快捷方式, 通過(guò)set()函數(shù)定義options
s.args # (2, 2)
s.kwargs # {'debug': True}
s.options # {'countdown': 10}
# Partial可以實(shí)現(xiàn)部分用于回調(diào)
s() # 相當(dāng)在當(dāng)前進(jìn)程執(zhí)行add(2, 2)
s.delay() # 相當(dāng)在當(dāng)前進(jìn)程執(zhí)行add.delay(2, 2)
s.apply_async() # 相當(dāng)在當(dāng)前進(jìn)程執(zhí)行add.apply_asybc(2, 2)
p = add.s(2) # 所有參數(shù)都可以在后續(xù)流程中傳入
p.apply_async(args=(4,),kwargs={'debug': True}, countdown=1)
# Immutable不希望帶上上一個(gè)任務(wù)的結(jié)果
add.apply_async((2, 2), link=reset_buffers.signature(immutable=True))
add.apply_async((2, 2), link=reset_buffers.si())
add.si(2, 3) # 最好使用這個(gè)方式, 簡(jiǎn)單
# Callbacks可以用于任務(wù)回調(diào)
add.apply_async((2, 2), link=other_task.s())
# Chain 任務(wù)鏈?zhǔn)酵瓿? 上一個(gè)結(jié)果可以用于后續(xù)任務(wù)
from celery import chain
res = chain(add.s(2, 2), add.s(4), add.s(8))()
res.get() # 16
res1 = (add.s(2, 2) | add.s(4) | add.s(8))().get() # 鏈?zhǔn)絺鬟f
res1.get() # 16
res1.parent.get() # 8
res1.parent.parent.get() # 4
res2 = (add.si(2, 2) | add.si(4, 8) | add.si(10, 10))() # 結(jié)果不傳遞
res2.get() # 20
res2.parent.get() # 12
res2.parent.parent.get() # 4
# Group 創(chuàng)建一組要并行執(zhí)行的任務(wù)
from celery import group
res = group(add.s(i, i) for i in xrange(10))()
res.get(timeout=1) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# Chord 所有任務(wù)完成執(zhí)行時(shí)添加要調(diào)用的回調(diào)
from celery import chord
res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
res.get() # 90
# Map he Chunk
xsum.map([range(10), range(100)])
res = add.chunks(zip(range(100), range(100)), 10)()
Time & Rate Limits 時(shí)間評(píng)率限制
可以控制單個(gè)任務(wù)的請(qǐng)求評(píng)率和執(zhí)行時(shí)間限制
# 時(shí)間限制有soft和hard之分, 運(yùn)行超過(guò)soft會(huì)拋出SoftTimeLimitExceeded異常, 超過(guò)hard會(huì)直接終止任務(wù), 可以在configuration中設(shè)置
task_soft_time_limit = 60
task_time_limit = 120
task_default_rate_limit = '200/m'
# 可以在任務(wù)中設(shè)置
@app.task(time_soft_limit=60 ,time_limit=120, rate_limit='200/m')
def test():
pass
# 遠(yuǎn)程調(diào)用下面可以在運(yùn)行中修改任務(wù)時(shí)間和評(píng)率限制
app.control.time_limit('tasks.crawl_the_web', soft=60, hard=120, reply=True)
app.control.rate_limit('myapp.mytask', '200/m', destination=['celery@worker1.example.com']) # destination可以指定相應(yīng)節(jié)點(diǎn)限制評(píng)率
Resource Leak Protection 資源泄露保護(hù)
可以通過(guò)設(shè)置來(lái)保護(hù)工作節(jié)點(diǎn)的資源不被泄露, 使用此選項(xiàng), 您可以配置工作程序在被新進(jìn)程替換之前可以執(zhí)行的最大資源配置, 如果您無(wú)法控制資源使用, 例如來(lái)自閉源C擴(kuò)展, 則此功能非常有用
-
Max tasks per child setting 最大任務(wù)數(shù), 可以使用workers [
--max-tasks-per-child] -
Max memory per child 最大內(nèi)存, 可以使用workers [
--max-memory-per-child]
# 池工作進(jìn)程在用新工作進(jìn)程替換之前可以執(zhí)行的最大任務(wù)數(shù), 默認(rèn)是沒(méi)有限制的
worker_max_tasks_per_child = 10
# 在新worker替換之前,worker可能消耗的最大駐留內(nèi)存量, 如果單個(gè)任務(wù)導(dǎo)致worker超過(guò)此限制, 則任務(wù)將完成,worker將被替換
worker_max_memory_per_child = 12000 # 12MB
User Components 用戶組件
可以定制每個(gè)工作組件, 并且可以由用戶定義其他組件. 工作人員使用“bootsteps”構(gòu)建 - 一個(gè)依賴關(guān)系圖, 可以對(duì)工人的內(nèi)部進(jìn)行細(xì)粒度控制