Celery作為一個(gè)分布式任務(wù)框架,提供了events對(duì)外進(jìn)行狀態(tài)和信息傳遞,而程序運(yùn)行過程的中的數(shù)據(jù)是較為關(guān)鍵的。
事件關(guān)注點(diǎn)
對(duì)于事件,有以下關(guān)注點(diǎn)
- 事件是如何產(chǎn)生的
- 事件是如何傳遞的
- 事件是如何捕獲的
對(duì)于項(xiàng)目結(jié)構(gòu),有以下關(guān)注點(diǎn)
- 對(duì)象之間的關(guān)系
- 實(shí)現(xiàn)的是否耦合,是否可插拔
evnet sender
Celery內(nèi)置的事件的類型根據(jù)使用者的不同大致可分為Worker/Task的事件。從worker的事件看起。
先看個(gè)自己寫的demo:
def event_sender():
# use to send events
with app.events.default_dispatcher() as d:
d.send('task-result', msg='i am cute result {}'.format(datetime.datetime.now()), name='gin')
d.flush()
dispatcher作為事件的分發(fā)器,并且由于dispatcher內(nèi)部實(shí)現(xiàn)了上下文管理器,所以這里直接就用with來進(jìn)行初始化了(上下文管理器非常方便~實(shí)名推薦一波)。這里發(fā)送的事件task-result是我們自定義的一個(gè)類型,用于和內(nèi)置類型區(qū)分開,方便觀察結(jié)果。在這里先實(shí)例化了一個(gè)dispatcher,然后調(diào)用了dispatcher.send方法。這里我們的broker使用redis,所以我們看一下redis的dispatcher的send方法。
def send(self, type, blind=False, utcoffset=utcoffset, retry=False,
retry_policy=None, Event=Event, **fields):
if self.enabled:
groups, group = self.groups, group_from(type)
if groups and group not in groups:
return
if group in self.buffer_group:
clock = self.clock.forward()
event = Event(type, hostname=self.hostname,
utcoffset=utcoffset(),
pid=self.pid, clock=clock, **fields)
buf = self._group_buffer[group]
buf.append(event)
if len(buf) >= self.buffer_limit:
self.flush()
elif self.on_send_buffered:
self.on_send_buffered()
else:
return self.publish(type, fields, self.producer, blind=blind,
Event=Event, retry=retry,
retry_policy=retry_policy)
前面根據(jù)type分組然后判斷flush or store in the buffer,然后用參數(shù)實(shí)例化events,這里做的比較人性化的一點(diǎn)是可傳入自定義的fields來初始化Events,因?yàn)樽鳛槭录碚f,通常有自定義的數(shù)據(jù)的需求,而這樣處理就比較優(yōu)雅了。然后發(fā)送者發(fā)送整個(gè)事件,也就意味著所有發(fā)送者希望發(fā)送的消息能夠被完整的傳遞。with mutex,用了一個(gè)線程的互斥鎖,然后進(jìn)行了publish, 然后是這里最為關(guān)鍵是publish,看看publish的底層實(shí)現(xiàn)
from kombu import Producer
def _publish(self, event, producer, routing_key, retry=False,
retry_policy=None, utcoffset=utcoffset):
exchange = self.exchange
try:
res = producer.publish(
event,
routing_key=routing_key,
exchange=exchange.name,
retry=retry,
retry_policy=retry_policy,
declare=[exchange],
serializer=self.serializer,
headers=self.headers,
delivery_mode=self.delivery_mode,
)
except Exception as exc: # pylint: disable=broad-except
if not self.buffer_while_offline:
raise
self._outbound_buffer.append((event, routing_key, exc))
def enable(self):
self.producer = Producer(self.channel or self.connection,
exchange=self.exchange,
serializer=self.serializer,
auto_declare=False)
self.enabled = True
for callback in self.on_enabled:
callback()
主要邏輯是調(diào)用了self.producer.publish,而self.producer是在dispatcher的enable里創(chuàng)建的,可以看到enable用了kombu的producer,kombu是celery’內(nèi)部對(duì)于amqp的封裝,用于實(shí)現(xiàn)消息傳遞,其支持了各種的broker,比如說redis,rabbitmq。鑒于咱們使用的是redis的broker,所以也很明顯這里使用redis為broker的pub/sub模式。所以就很清楚啦,celery events的實(shí)現(xiàn)是基于redis的pub/sub模式,這也解釋了為什么在筆者測(cè)試的時(shí)候,沒有消息的存儲(chǔ),以及events在開啟監(jiān)聽以后才能夠收到。
event receiver
最后貼一個(gè)自己寫的事件監(jiān)聽。
def monitor_events():
def on_event(event):
# this is the callback when events come in
print('[recv] {} '.format(event))
with app.connection() as conn:
recv = app.events.Receiver(conn, handlers={'task-result':
on_event})
recv.capture(limit=None, timeout=None, wakeup=True)
events使用場(chǎng)景
基于celery內(nèi)置的事件,可以對(duì)于task的執(zhí)行狀態(tài)信息,和執(zhí)行結(jié)果信息進(jìn)行實(shí)時(shí)處理,例如可以將所有的事件執(zhí)行結(jié)果進(jìn)行暫存然后結(jié)合時(shí)間序列數(shù)據(jù)庫類似influxdb和展示平臺(tái)類似grafana進(jìn)行展示,這樣就有了比較完善的一套結(jié)果數(shù)據(jù)的流程。
也可以自定義事件,對(duì)一些關(guān)注的信息進(jìn)行實(shí)時(shí)處理。例如在task執(zhí)行過程中的信息收集,可以通過events來完成。
總結(jié)
所以對(duì)于上面我們提出的問題,有以下總結(jié)
- 事件是如何產(chǎn)生的:事件由事件的發(fā)送者的初始化,并且發(fā)送,在celery里事件的發(fā)送往往伴隨著狀態(tài)的改變,例如對(duì)于 task的事件中包括 發(fā)送,被接收,被執(zhí)行,執(zhí)行成功,執(zhí)行失敗,重試。
- 事件是如何傳遞的:如果采用redis作為broker,那么事件是基于redis的pub/sub模式而傳遞的。
- 事件是如何捕獲的:同上,基于訂閱模式,事件得以被接收。
對(duì)于對(duì)象的關(guān)系呢
在這一部分我們接觸到的對(duì)象有
app,dispatcher,events,publisher,receiver
其中app作為拉起整個(gè)celery項(xiàng)目的核心對(duì)象,events對(duì)象被掛載到app上,而dispatcher和receiver則被掛載到了events上。dispatcher和receiver借由kombu的底層實(shí)現(xiàn)無關(guān)性,直接傳入不同的connection uri實(shí)例化kombu的message queue。整個(gè)結(jié)構(gòu)還是比較清晰的,并且面向接口編程也少了很多的耦合??偟膩碚f,還是非常值得借鑒的一種寫法。
參考