celery源碼解析一: Events的實(shí)現(xiàn)

Celery作為一個(gè)分布式任務(wù)框架,提供了events對(duì)外進(jìn)行狀態(tài)和信息傳遞,而程序運(yùn)行過程的中的數(shù)據(jù)是較為關(guān)鍵的。

事件關(guān)注點(diǎn)

對(duì)于事件,有以下關(guān)注點(diǎn)

  1. 事件是如何產(chǎn)生的
  2. 事件是如何傳遞的
  3. 事件是如何捕獲的

對(duì)于項(xiàng)目結(jié)構(gòu),有以下關(guān)注點(diǎn)

  1. 對(duì)象之間的關(guān)系
  2. 實(shí)現(xiàn)的是否耦合,是否可插拔

evnet sender

Celery內(nèi)置的事件的類型根據(jù)使用者的不同大致可分為Worker/Task的事件。從worker的事件看起。

先看個(gè)自己寫的demo:

def event_sender():
    # use to send events
    with app.events.default_dispatcher() as d:
        d.send('task-result', msg='i am cute result {}'.format(datetime.datetime.now()), name='gin')
        d.flush()

dispatcher作為事件的分發(fā)器,并且由于dispatcher內(nèi)部實(shí)現(xiàn)了上下文管理器,所以這里直接就用with來進(jìn)行初始化了(上下文管理器非常方便~實(shí)名推薦一波)。這里發(fā)送的事件task-result是我們自定義的一個(gè)類型,用于和內(nèi)置類型區(qū)分開,方便觀察結(jié)果。在這里先實(shí)例化了一個(gè)dispatcher,然后調(diào)用了dispatcher.send方法。這里我們的broker使用redis,所以我們看一下redis的dispatcher的send方法。

    def send(self, type, blind=False, utcoffset=utcoffset, retry=False,
             retry_policy=None, Event=Event, **fields):
   
        if self.enabled:
            groups, group = self.groups, group_from(type)
            if groups and group not in groups:
                return
            if group in self.buffer_group:
                clock = self.clock.forward()
                event = Event(type, hostname=self.hostname,
                              utcoffset=utcoffset(),
                              pid=self.pid, clock=clock, **fields)
                buf = self._group_buffer[group]
                buf.append(event)
                if len(buf) >= self.buffer_limit:
                    self.flush()
                elif self.on_send_buffered:
                    self.on_send_buffered()
            else:
                return self.publish(type, fields, self.producer, blind=blind,
                                    Event=Event, retry=retry,
                                    retry_policy=retry_policy)

前面根據(jù)type分組然后判斷flush or store in the buffer,然后用參數(shù)實(shí)例化events,這里做的比較人性化的一點(diǎn)是可傳入自定義的fields來初始化Events,因?yàn)樽鳛槭录碚f,通常有自定義的數(shù)據(jù)的需求,而這樣處理就比較優(yōu)雅了。然后發(fā)送者發(fā)送整個(gè)事件,也就意味著所有發(fā)送者希望發(fā)送的消息能夠被完整的傳遞。with mutex,用了一個(gè)線程的互斥鎖,然后進(jìn)行了publish, 然后是這里最為關(guān)鍵是publish,看看publish的底層實(shí)現(xiàn)

from kombu import Producer
    def _publish(self, event, producer, routing_key, retry=False,
                 retry_policy=None, utcoffset=utcoffset):
        exchange = self.exchange
        try:
            res = producer.publish(
                event,
                routing_key=routing_key,
                exchange=exchange.name,
                retry=retry,
                retry_policy=retry_policy,
                declare=[exchange],
                serializer=self.serializer,
                headers=self.headers,
                delivery_mode=self.delivery_mode,
            )
        except Exception as exc:  # pylint: disable=broad-except
            if not self.buffer_while_offline:
                raise
            self._outbound_buffer.append((event, routing_key, exc))

   def enable(self):
        self.producer = Producer(self.channel or self.connection,
                                 exchange=self.exchange,
                                 serializer=self.serializer,
                                 auto_declare=False)
        self.enabled = True
        for callback in self.on_enabled:
            callback()

主要邏輯是調(diào)用了self.producer.publish,而self.producer是在dispatcher的enable里創(chuàng)建的,可以看到enable用了kombu的producer,kombu是celery’內(nèi)部對(duì)于amqp的封裝,用于實(shí)現(xiàn)消息傳遞,其支持了各種的broker,比如說redis,rabbitmq。鑒于咱們使用的是redis的broker,所以也很明顯這里使用redis為broker的pub/sub模式。所以就很清楚啦,celery events的實(shí)現(xiàn)是基于redis的pub/sub模式,這也解釋了為什么在筆者測(cè)試的時(shí)候,沒有消息的存儲(chǔ),以及events在開啟監(jiān)聽以后才能夠收到。

event receiver

最后貼一個(gè)自己寫的事件監(jiān)聽。

def monitor_events():
    def on_event(event):
        # this is the callback when events come in
        print('[recv] {} '.format(event))

    with app.connection() as conn:
        recv = app.events.Receiver(conn, handlers={'task-result':
        on_event})
        recv.capture(limit=None, timeout=None, wakeup=True)

events使用場(chǎng)景

基于celery內(nèi)置的事件,可以對(duì)于task的執(zhí)行狀態(tài)信息,和執(zhí)行結(jié)果信息進(jìn)行實(shí)時(shí)處理,例如可以將所有的事件執(zhí)行結(jié)果進(jìn)行暫存然后結(jié)合時(shí)間序列數(shù)據(jù)庫類似influxdb和展示平臺(tái)類似grafana進(jìn)行展示,這樣就有了比較完善的一套結(jié)果數(shù)據(jù)的流程。

也可以自定義事件,對(duì)一些關(guān)注的信息進(jìn)行實(shí)時(shí)處理。例如在task執(zhí)行過程中的信息收集,可以通過events來完成。

總結(jié)

所以對(duì)于上面我們提出的問題,有以下總結(jié)

  1. 事件是如何產(chǎn)生的:事件由事件的發(fā)送者的初始化,并且發(fā)送,在celery里事件的發(fā)送往往伴隨著狀態(tài)的改變,例如對(duì)于 task的事件中包括 發(fā)送,被接收,被執(zhí)行,執(zhí)行成功,執(zhí)行失敗,重試。
  2. 事件是如何傳遞的:如果采用redis作為broker,那么事件是基于redis的pub/sub模式而傳遞的。
  3. 事件是如何捕獲的:同上,基于訂閱模式,事件得以被接收。

對(duì)于對(duì)象的關(guān)系呢
在這一部分我們接觸到的對(duì)象有
app,dispatcher,events,publisher,receiver

其中app作為拉起整個(gè)celery項(xiàng)目的核心對(duì)象,events對(duì)象被掛載到app上,而dispatcher和receiver則被掛載到了events上。dispatcher和receiver借由kombu的底層實(shí)現(xiàn)無關(guān)性,直接傳入不同的connection uri實(shí)例化kombu的message queue。整個(gè)結(jié)構(gòu)還是比較清晰的,并且面向接口編程也少了很多的耦合??偟膩碚f,還是非常值得借鑒的一種寫法。

參考

  1. Monitoring and Management Guide — Celery 3.1.7 文檔
  2. Kombu Documentation — Kombu 4.6.0 documentation
  3. 立強(qiáng)的博客
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容