python之gevent(2)

在之前,我已經(jīng)在兩篇文章中分別介紹了gevent的使用以及gevent的底層greenlet的使用,可以閱讀文章回顧一下:python之gevent(1)python之greenlet。本文將結(jié)合gevent的源碼對其調(diào)度過程進(jìn)行解析。

需要了解的背景知識(shí)

在了解gevent的調(diào)度過程前,最好是對greenlet有一定了解,可查看文章python之greenlet,而最主要的知識(shí)點(diǎn)我認(rèn)為在于:
1、每一個(gè)greenlet.greenlet實(shí)例都有一個(gè)parent(可指定,默認(rèn)為創(chuàng)生新的greenlet.greenlet所在環(huán)境),當(dāng)greenlet.greenlet實(shí)例執(zhí)行完邏輯正常結(jié)束、或者拋出異常結(jié)束時(shí),執(zhí)行邏輯切回到其parent
2、可以繼承g(shù)reenlet.greenlet,子類需要實(shí)現(xiàn)run方法,當(dāng)調(diào)用greenlet.switch方法時(shí)會(huì)調(diào)用到這個(gè)run方法

在gevent中,有兩個(gè)類繼承了greenlet.greenlet,分別是gevent.hub.Hub和gevent.greenlet.Greenlet。后文中,如果是greenlet.greenlet這種寫法,那么指的是原生的類庫greentlet,如果是greenlet(或者Greenlet)那么指gevent封裝后的greenlet。

gevent調(diào)度流程

每個(gè)gevent線程都有一個(gè)hub,上面提到hub是greenlet.greenlet的實(shí)例。hub實(shí)例在需要的時(shí)候創(chuàng)生,那么其parent是main greenlet。之后任何的Greenlet(greenlet.greenlet的子類)實(shí)例的parent都設(shè)置成hub。hub調(diào)用libev提供的事件循環(huán)來處理Greenlet代表的任務(wù),當(dāng)Greenlet實(shí)例結(jié)束(正常或者異常)之后,執(zhí)行邏輯又切換到hub。

gevent調(diào)度實(shí)例1

最基礎(chǔ)的代碼是這樣的:


image.png

雖然這兩行代碼很簡單,但是gevent的核心都包含在其中,下面來看具體的源碼:
首先是在上圖中調(diào)用的sleep函數(shù)(gevent.hub.sleep):

def sleep(seconds=0, ref=True):
    hub = get_hub()
    loop = hub.loop
    if seconds <= 0:
        waiter = Waiter()
        loop.run_callback(waiter.switch)
        waiter.get()
    else:
        hub.wait(loop.timer(seconds, ref=ref))

進(jìn)入函數(shù)之后,首先是獲取hub(get_hub()),然后在hub上wait這個(gè)定時(shí)器事件(上面代碼最后一行)。get_hub源碼如下(gevent.hub.get_hub):

def get_hub(*args, **kwargs):
    """
    Return the hub for the current thread.

    """
    hub = _threadlocal.hub
    if hub is None:
        hubtype = get_hub_class()
        hub = _threadlocal.hub = hubtype(*args, **kwargs)
    return hub

可以看到,hub是線程內(nèi)唯一的,之前也提到過greenlet是線程獨(dú)立的,每個(gè)線程有各自的greenlet棧。hubtype默認(rèn)就是gevent.hub.Hub,在hub的初始化函數(shù)(init)中,會(huì)創(chuàng)建loop屬性,默認(rèn)也就是libev的python封裝。

回到sleep函數(shù)定義,hub.wait(loop.timer(seconds, ref=ref))。hub.wait函數(shù)非常關(guān)鍵,對于任何阻塞性操作,比如timer、io都會(huì)調(diào)用這個(gè)函數(shù),其作用一句話概括:從當(dāng)前協(xié)程切換到hub,直到watcher對應(yīng)的事件就緒再從hub切換回來。wait函數(shù)源碼如下(gevent.hub.Hub.wait):

def wait(self, watcher):
        """
        Wait until the *watcher* (which should not be started) is ready.

        """
        waiter = Waiter()
        unique = object()
        watcher.start(waiter.switch, unique)
        try:
            result = waiter.get()
            if result is not unique:
                raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
        finally:
            watcher.stop()

形參watcher就是loop.timer實(shí)例,其cython描述在corecext.pyx,我們簡單理解成是一個(gè)定時(shí)器事件就行了。上面的代碼中,創(chuàng)建了一個(gè)Waiter(gevent.hub.Waiter)對象,這個(gè)對象起什么作用在這個(gè)類的doc中寫得非常清楚:

Waiter.__doc__  
A low level communication utility for greenlets.
Waiter is a wrapper around greenlet's switch() and throw() calls that makes them somewhat safer:
* switching will occur only if the waiting greenlet is executing :meth:get method currently;
* any error raised in the greenlet is handled inside :meth:switch and :meth:throw
* if :meth:switch/:meth:throw is called before the receiver calls :meth:get, then :class:Waiter
will store the value/exception. The following :meth:get will return the value/raise the exception

總的來說,是對greenlet.greenlet類switch 和 throw函數(shù)的分裝,用來存儲(chǔ)返回值greenlet的返回值或者捕獲在greenlet中拋出的異常。我們知道,在原生的greenlet中,如果一個(gè)greenlet拋出了異常,那么該異常將會(huì)展開至其parent greenlet。

回到Hub.wait函數(shù),watcher.start(waiter.switch, unique) 注冊了一個(gè)回調(diào),在一定時(shí)間(1s)之后調(diào)用回調(diào)函數(shù)waiter.switch。注意,waiter.switch此時(shí)并沒有執(zhí)行。然后后面調(diào)用waiter.get??纯催@個(gè)get函數(shù)(gevent.hub.Waiter.get):

def get(self):
        """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
        if self._exception is not _NONE:
            if self._exception is None:
                return self.value
            else:
                getcurrent().throw(*self._exception)
        else:
            if self.greenlet is not None:
                raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
            self.greenlet = getcurrent() # 存儲(chǔ)當(dāng)前協(xié)程,之后從hub switch回來的時(shí)候使用
            try:
                return self.hub.switch() # switch到hub
            finally:
                self.greenlet = None

核心的邏輯在這段代碼最后幾行,也就是第11到15行,11行中,getcurrent獲取當(dāng)前的greenlet(在這個(gè)測試代碼中,是main greenlet,即最原始的greenlet),將其復(fù)制給waiter.greenlet。然后13行switch到hub,在greenlet回顧章節(jié)的第二條提到,greenlet.greenlet的子類需要重寫run方法,當(dāng)調(diào)用子類的switch時(shí)會(huì)調(diào)用到該run方法。Hub的run方法實(shí)現(xiàn)如下:

def run(self):
       """
       Entry-point to running the loop. This method is called automatically
       when the hub greenlet is scheduled; do not call it directly.

       :raises LoopExit: If the loop finishes running. This means
          that there are no other scheduled greenlets, and no active
          watchers or servers. In some situations, this indicates a
          programming error.
       """
       assert self is getcurrent(), 'Do not call Hub.run() directly'
       while True:
           loop = self.loop
           loop.error_handler = self
           try:
               loop.run()
           finally:
               loop.error_handler = None  # break the refcount cycle
           self.parent.throw(LoopExit('This operation would block forever', self))

loop自然是libev的事件循環(huán)。doc中提到,這個(gè)loop理論上會(huì)一直循環(huán),如果結(jié)束,那么表明沒有任何監(jiān)聽的事件(包括IO 定時(shí)等)。之前在Hub.wait函數(shù)中注冊了定時(shí)器,那么在這個(gè)run中,如果時(shí)間到了,那么會(huì)調(diào)用定時(shí)器的callback,也就是之前的waiter.switch, 我們再來看看這個(gè)函數(shù)(gevent.hub.Waiter.switch):

def switch(self, value=None):
        """Switch to the greenlet if one's available. Otherwise store the value."""
        greenlet = self.greenlet
        if greenlet is None:
            self.value = value
            self._exception = None
        else:
            assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
            switch = greenlet.switch
            try:
                switch(value)
            except:
                self.hub.handle_error(switch, *sys.exc_info())

這段代碼的主要內(nèi)容在第8到13行,第8行保證調(diào)用到該函數(shù)的時(shí)候一定在hub這個(gè)協(xié)程中,這是很自然的,因?yàn)檫@個(gè)函數(shù)一定是在Hub.run中被調(diào)用。第11行switch到waiter.greenlet這個(gè)協(xié)程,在講解waiter.get的時(shí)候就提到了waiter.greenlet是main greenlet。注意,這里得switch會(huì)回到main greenlet被切出的地方(也就是main greenlet掛起的地方),那就是在waiter.get的第10行,整個(gè)邏輯也就恢復(fù)到main greenlet繼續(xù)執(zhí)行。

總結(jié):sleep的作用很簡單,觸發(fā)一個(gè)阻塞的操作,導(dǎo)致調(diào)用hub.wait,從當(dāng)前greenlet.greenlet切換至Hub,超時(shí)之后再從hub切換到之前的greenlet繼續(xù)執(zhí)行。通過這個(gè)例子可以知道,gevent將任何阻塞性的操作封裝成一個(gè)Watcher,然后從調(diào)用阻塞操作的協(xié)程切換到Hub,等到阻塞操作完成之后,再從Hub切換到之前的協(xié)程。

gevent調(diào)度實(shí)例2

上面的例子中,雖然能夠理順gevent的調(diào)度流程,但事實(shí)上并沒有體現(xiàn)出gevent 協(xié)作的優(yōu)勢。接下來再看一個(gè)例子:

import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

上述代碼運(yùn)行后輸出如下:


image.png

從輸出可以看到, foo和bar依次輸出,顯然是在gevent.sleep的時(shí)候發(fā)生了執(zhí)行流程切換,gevent.sleep再前面已經(jīng)介紹了,那么這里主要關(guān)注spawn和joinall函數(shù)。

gevent.spawn本質(zhì)調(diào)用了gevent.greenlet.Greenlet的類方法spawn:

@classmethod
def spawn(cls, *args, **kwargs):
    g = cls(*args, **kwargs)
    g.start()
    return g

這個(gè)類方法調(diào)用了Greenlet的兩個(gè)函數(shù),init 和 start. init函數(shù)中最為關(guān)鍵的是這段代碼:

    def __init__(self, run=None, *args, **kwargs):
        greenlet.__init__(self, None, get_hub()) # 將新創(chuàng)生的greenlet實(shí)例的parent一律設(shè)置成hub
        if run is not None:
            self._run = run

start函數(shù)的定義(gevent.greenlet.Greenlet.start):

def start(self):
    """Schedule the greenlet to run in this loop iteration"""
    if self._start_event is None:
        self._start_event = self.parent.loop.run_callback(self.switch)

注冊回調(diào)事件self.switch到hub.loop,注意Greenlet.switch最終會(huì)調(diào)用到Greenlet._run, 也就是spawn函數(shù)傳入的callable對象(foo、bar)。這里僅僅是注冊,但還沒有開始事件輪詢,gevent.joinall就是用來啟動(dòng)事件輪詢并等待運(yùn)行結(jié)果的。

joinall函數(shù)會(huì)一路調(diào)用到gevent.hub.iwait函數(shù):

def iwait(objects, timeout=None, count=None):
    """
    Iteratively yield *objects* as they are ready, until all (or *count*) are ready
    or *timeout* expired.
    """
    # QQQ would be nice to support iterable here that can be generated slowly (why?)
    if objects is None:
        yield get_hub().join(timeout=timeout)
        return

    count = len(objects) if count is None else min(count, len(objects))
    waiter = _MultipleWaiter() # _MultipleWaiter是Waiter的子類
    switch = waiter.switch

    if timeout is not None:
        timer = get_hub().loop.timer(timeout, priority=-1)
        timer.start(switch, _NONE)

    try:
        for obj in objects:
            obj.rawlink(switch) # 這里往hub.loop注冊了回調(diào)
 
        for idx in xrange(count):    #第23行
            print 'for in iwait', idx
            item = waiter.get() # 這里會(huì)切換到hub
            print 'come here ', item, getcurrent()
            waiter.clear()
            if item is _NONE:
                return
            yield item
    finally:
        if timeout is not None:
            timer.stop()
        for obj in objects:
            unlink = getattr(obj, 'unlink', None)
            if unlink:
                try:
                    unlink(switch)
                except:
                    traceback.print_exc()

然后iwait函數(shù)第23行開始的循環(huán),逐個(gè)調(diào)用waiter.get。這里的waiter是_MultipleWaiter(Waiter)的實(shí)例,其get函數(shù)最終調(diào)用到Waiter.get。前面已經(jīng)詳細(xì)介紹了Waiter.get,簡而言之,就是switch到hub。我們利用greenlet的tracing功能可以看到整個(gè)greenlet.greenlet的switch流程,修改后的代碼如下:

import gevent
import greenlet
def callback(event, args):
    print event, args[0], '===:>>>>', args[1]

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

print 'main greenlet info: ', greenlet.greenlet.getcurrent()
print 'hub info', gevent.get_hub()
oldtrace = greenlet.settrace(callback)
        
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
greenlet.settrace(oldtrace)

總結(jié):gevent.spawn創(chuàng)建一個(gè)新的Greenlet,并注冊到hub的loop上,調(diào)用gevent.joinall或者Greenlet.join的時(shí)候開始切換到hub。

本文通過兩個(gè)簡單的例子并結(jié)合源碼分析了gevent的協(xié)程調(diào)度流程。gevent的使用非常方便,尤其是在web server中,基本上應(yīng)用App什么都不用做就能享受gevent帶來的好處。本文寫出來的主要目的在于想了解gevent對greenlet的封裝和使用,greenlet很強(qiáng)大,強(qiáng)大到容易出錯(cuò),而gevent保證在兩層協(xié)程之間切換,值得學(xué)習(xí)并加以使用。

推薦閱讀:
http://www.gevent.org/
https://pypi.org/project/greenlet/
gevent源碼

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 協(xié)程 閱讀目錄 一 引子 二 協(xié)程介紹 三 Greenlet模塊 四 Gevent模塊 引子 之前我們學(xué)習(xí)了線程、...
    go以恒閱讀 789評論 0 1
  • //Clojure入門教程: Clojure – Functional Programming for the J...
    葡萄喃喃囈語閱讀 4,055評論 0 7
  • 在python之gevent(1)一文中我們簡單的介紹了gevent的使用。python由于GIL的原因,導(dǎo)致線程...
    WolfLC閱讀 12,597評論 0 14
  • 前述 進(jìn)程 線程 協(xié)程 異步 并發(fā)編程(不是并行)目前有四種方式:多進(jìn)程、多線程、協(xié)程和異步。 多進(jìn)程編程在pyt...
    softlns閱讀 6,400評論 2 24
  • 暑假的尾巴還剩下一點(diǎn),我們進(jìn)入了學(xué)期開始的序曲,暑期備課開始了!大校長開始做工作總結(jié),歷數(shù)潁上五中去年取得...
    打燈籠的小星星閱讀 312評論 0 0

友情鏈接更多精彩內(nèi)容