在之前,我已經(jīng)在兩篇文章中分別介紹了gevent的使用以及gevent的底層greenlet的使用,可以閱讀文章回顧一下:python之gevent(1),python之greenlet。本文將結(jié)合gevent的源碼對其調(diào)度過程進(jìn)行解析。
需要了解的背景知識(shí)
在了解gevent的調(diào)度過程前,最好是對greenlet有一定了解,可查看文章python之greenlet,而最主要的知識(shí)點(diǎn)我認(rèn)為在于:
1、每一個(gè)greenlet.greenlet實(shí)例都有一個(gè)parent(可指定,默認(rèn)為創(chuàng)生新的greenlet.greenlet所在環(huán)境),當(dāng)greenlet.greenlet實(shí)例執(zhí)行完邏輯正常結(jié)束、或者拋出異常結(jié)束時(shí),執(zhí)行邏輯切回到其parent
2、可以繼承g(shù)reenlet.greenlet,子類需要實(shí)現(xiàn)run方法,當(dāng)調(diào)用greenlet.switch方法時(shí)會(huì)調(diào)用到這個(gè)run方法
在gevent中,有兩個(gè)類繼承了greenlet.greenlet,分別是gevent.hub.Hub和gevent.greenlet.Greenlet。后文中,如果是greenlet.greenlet這種寫法,那么指的是原生的類庫greentlet,如果是greenlet(或者Greenlet)那么指gevent封裝后的greenlet。
gevent調(diào)度流程
每個(gè)gevent線程都有一個(gè)hub,上面提到hub是greenlet.greenlet的實(shí)例。hub實(shí)例在需要的時(shí)候創(chuàng)生,那么其parent是main greenlet。之后任何的Greenlet(greenlet.greenlet的子類)實(shí)例的parent都設(shè)置成hub。hub調(diào)用libev提供的事件循環(huán)來處理Greenlet代表的任務(wù),當(dāng)Greenlet實(shí)例結(jié)束(正常或者異常)之后,執(zhí)行邏輯又切換到hub。
gevent調(diào)度實(shí)例1
最基礎(chǔ)的代碼是這樣的:

雖然這兩行代碼很簡單,但是gevent的核心都包含在其中,下面來看具體的源碼:
首先是在上圖中調(diào)用的sleep函數(shù)(gevent.hub.sleep):
def sleep(seconds=0, ref=True):
hub = get_hub()
loop = hub.loop
if seconds <= 0:
waiter = Waiter()
loop.run_callback(waiter.switch)
waiter.get()
else:
hub.wait(loop.timer(seconds, ref=ref))
進(jìn)入函數(shù)之后,首先是獲取hub(get_hub()),然后在hub上wait這個(gè)定時(shí)器事件(上面代碼最后一行)。get_hub源碼如下(gevent.hub.get_hub):
def get_hub(*args, **kwargs):
"""
Return the hub for the current thread.
"""
hub = _threadlocal.hub
if hub is None:
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype(*args, **kwargs)
return hub
可以看到,hub是線程內(nèi)唯一的,之前也提到過greenlet是線程獨(dú)立的,每個(gè)線程有各自的greenlet棧。hubtype默認(rèn)就是gevent.hub.Hub,在hub的初始化函數(shù)(init)中,會(huì)創(chuàng)建loop屬性,默認(rèn)也就是libev的python封裝。
回到sleep函數(shù)定義,hub.wait(loop.timer(seconds, ref=ref))。hub.wait函數(shù)非常關(guān)鍵,對于任何阻塞性操作,比如timer、io都會(huì)調(diào)用這個(gè)函數(shù),其作用一句話概括:從當(dāng)前協(xié)程切換到hub,直到watcher對應(yīng)的事件就緒再從hub切換回來。wait函數(shù)源碼如下(gevent.hub.Hub.wait):
def wait(self, watcher):
"""
Wait until the *watcher* (which should not be started) is ready.
"""
waiter = Waiter()
unique = object()
watcher.start(waiter.switch, unique)
try:
result = waiter.get()
if result is not unique:
raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
finally:
watcher.stop()
形參watcher就是loop.timer實(shí)例,其cython描述在corecext.pyx,我們簡單理解成是一個(gè)定時(shí)器事件就行了。上面的代碼中,創(chuàng)建了一個(gè)Waiter(gevent.hub.Waiter)對象,這個(gè)對象起什么作用在這個(gè)類的doc中寫得非常清楚:
Waiter.__doc__
A low level communication utility for greenlets.
Waiter is a wrapper around greenlet'sswitch()andthrow()calls that makes them somewhat safer:
* switching will occur only if the waiting greenlet is executing :meth:getmethod currently;
* any error raised in the greenlet is handled inside :meth:switchand :meth:throw
* if :meth:switch/:meth:throwis called before the receiver calls :meth:get, then :class:Waiter
will store the value/exception. The following :meth:getwill return the value/raise the exception
總的來說,是對greenlet.greenlet類switch 和 throw函數(shù)的分裝,用來存儲(chǔ)返回值greenlet的返回值或者捕獲在greenlet中拋出的異常。我們知道,在原生的greenlet中,如果一個(gè)greenlet拋出了異常,那么該異常將會(huì)展開至其parent greenlet。
回到Hub.wait函數(shù),watcher.start(waiter.switch, unique) 注冊了一個(gè)回調(diào),在一定時(shí)間(1s)之后調(diào)用回調(diào)函數(shù)waiter.switch。注意,waiter.switch此時(shí)并沒有執(zhí)行。然后后面調(diào)用waiter.get??纯催@個(gè)get函數(shù)(gevent.hub.Waiter.get):
def get(self):
"""If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
if self._exception is not _NONE:
if self._exception is None:
return self.value
else:
getcurrent().throw(*self._exception)
else:
if self.greenlet is not None:
raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
self.greenlet = getcurrent() # 存儲(chǔ)當(dāng)前協(xié)程,之后從hub switch回來的時(shí)候使用
try:
return self.hub.switch() # switch到hub
finally:
self.greenlet = None
核心的邏輯在這段代碼最后幾行,也就是第11到15行,11行中,getcurrent獲取當(dāng)前的greenlet(在這個(gè)測試代碼中,是main greenlet,即最原始的greenlet),將其復(fù)制給waiter.greenlet。然后13行switch到hub,在greenlet回顧章節(jié)的第二條提到,greenlet.greenlet的子類需要重寫run方法,當(dāng)調(diào)用子類的switch時(shí)會(huì)調(diào)用到該run方法。Hub的run方法實(shí)現(xiàn)如下:
def run(self):
"""
Entry-point to running the loop. This method is called automatically
when the hub greenlet is scheduled; do not call it directly.
:raises LoopExit: If the loop finishes running. This means
that there are no other scheduled greenlets, and no active
watchers or servers. In some situations, this indicates a
programming error.
"""
assert self is getcurrent(), 'Do not call Hub.run() directly'
while True:
loop = self.loop
loop.error_handler = self
try:
loop.run()
finally:
loop.error_handler = None # break the refcount cycle
self.parent.throw(LoopExit('This operation would block forever', self))
loop自然是libev的事件循環(huán)。doc中提到,這個(gè)loop理論上會(huì)一直循環(huán),如果結(jié)束,那么表明沒有任何監(jiān)聽的事件(包括IO 定時(shí)等)。之前在Hub.wait函數(shù)中注冊了定時(shí)器,那么在這個(gè)run中,如果時(shí)間到了,那么會(huì)調(diào)用定時(shí)器的callback,也就是之前的waiter.switch, 我們再來看看這個(gè)函數(shù)(gevent.hub.Waiter.switch):
def switch(self, value=None):
"""Switch to the greenlet if one's available. Otherwise store the value."""
greenlet = self.greenlet
if greenlet is None:
self.value = value
self._exception = None
else:
assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
switch = greenlet.switch
try:
switch(value)
except:
self.hub.handle_error(switch, *sys.exc_info())
這段代碼的主要內(nèi)容在第8到13行,第8行保證調(diào)用到該函數(shù)的時(shí)候一定在hub這個(gè)協(xié)程中,這是很自然的,因?yàn)檫@個(gè)函數(shù)一定是在Hub.run中被調(diào)用。第11行switch到waiter.greenlet這個(gè)協(xié)程,在講解waiter.get的時(shí)候就提到了waiter.greenlet是main greenlet。注意,這里得switch會(huì)回到main greenlet被切出的地方(也就是main greenlet掛起的地方),那就是在waiter.get的第10行,整個(gè)邏輯也就恢復(fù)到main greenlet繼續(xù)執(zhí)行。
總結(jié):sleep的作用很簡單,觸發(fā)一個(gè)阻塞的操作,導(dǎo)致調(diào)用hub.wait,從當(dāng)前greenlet.greenlet切換至Hub,超時(shí)之后再從hub切換到之前的greenlet繼續(xù)執(zhí)行。通過這個(gè)例子可以知道,gevent將任何阻塞性的操作封裝成一個(gè)Watcher,然后從調(diào)用阻塞操作的協(xié)程切換到Hub,等到阻塞操作完成之后,再從Hub切換到之前的協(xié)程。
gevent調(diào)度實(shí)例2
上面的例子中,雖然能夠理順gevent的調(diào)度流程,但事實(shí)上并沒有體現(xiàn)出gevent 協(xié)作的優(yōu)勢。接下來再看一個(gè)例子:
import gevent
def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')
def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
上述代碼運(yùn)行后輸出如下:

從輸出可以看到, foo和bar依次輸出,顯然是在gevent.sleep的時(shí)候發(fā)生了執(zhí)行流程切換,gevent.sleep再前面已經(jīng)介紹了,那么這里主要關(guān)注spawn和joinall函數(shù)。
gevent.spawn本質(zhì)調(diào)用了gevent.greenlet.Greenlet的類方法spawn:
@classmethod
def spawn(cls, *args, **kwargs):
g = cls(*args, **kwargs)
g.start()
return g
這個(gè)類方法調(diào)用了Greenlet的兩個(gè)函數(shù),init 和 start. init函數(shù)中最為關(guān)鍵的是這段代碼:
def __init__(self, run=None, *args, **kwargs):
greenlet.__init__(self, None, get_hub()) # 將新創(chuàng)生的greenlet實(shí)例的parent一律設(shè)置成hub
if run is not None:
self._run = run
start函數(shù)的定義(gevent.greenlet.Greenlet.start):
def start(self):
"""Schedule the greenlet to run in this loop iteration"""
if self._start_event is None:
self._start_event = self.parent.loop.run_callback(self.switch)
注冊回調(diào)事件self.switch到hub.loop,注意Greenlet.switch最終會(huì)調(diào)用到Greenlet._run, 也就是spawn函數(shù)傳入的callable對象(foo、bar)。這里僅僅是注冊,但還沒有開始事件輪詢,gevent.joinall就是用來啟動(dòng)事件輪詢并等待運(yùn)行結(jié)果的。
joinall函數(shù)會(huì)一路調(diào)用到gevent.hub.iwait函數(shù):
def iwait(objects, timeout=None, count=None):
"""
Iteratively yield *objects* as they are ready, until all (or *count*) are ready
or *timeout* expired.
"""
# QQQ would be nice to support iterable here that can be generated slowly (why?)
if objects is None:
yield get_hub().join(timeout=timeout)
return
count = len(objects) if count is None else min(count, len(objects))
waiter = _MultipleWaiter() # _MultipleWaiter是Waiter的子類
switch = waiter.switch
if timeout is not None:
timer = get_hub().loop.timer(timeout, priority=-1)
timer.start(switch, _NONE)
try:
for obj in objects:
obj.rawlink(switch) # 這里往hub.loop注冊了回調(diào)
for idx in xrange(count): #第23行
print 'for in iwait', idx
item = waiter.get() # 這里會(huì)切換到hub
print 'come here ', item, getcurrent()
waiter.clear()
if item is _NONE:
return
yield item
finally:
if timeout is not None:
timer.stop()
for obj in objects:
unlink = getattr(obj, 'unlink', None)
if unlink:
try:
unlink(switch)
except:
traceback.print_exc()
然后iwait函數(shù)第23行開始的循環(huán),逐個(gè)調(diào)用waiter.get。這里的waiter是_MultipleWaiter(Waiter)的實(shí)例,其get函數(shù)最終調(diào)用到Waiter.get。前面已經(jīng)詳細(xì)介紹了Waiter.get,簡而言之,就是switch到hub。我們利用greenlet的tracing功能可以看到整個(gè)greenlet.greenlet的switch流程,修改后的代碼如下:
import gevent
import greenlet
def callback(event, args):
print event, args[0], '===:>>>>', args[1]
def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')
def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')
print 'main greenlet info: ', greenlet.greenlet.getcurrent()
print 'hub info', gevent.get_hub()
oldtrace = greenlet.settrace(callback)
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
greenlet.settrace(oldtrace)
總結(jié):gevent.spawn創(chuàng)建一個(gè)新的Greenlet,并注冊到hub的loop上,調(diào)用gevent.joinall或者Greenlet.join的時(shí)候開始切換到hub。
本文通過兩個(gè)簡單的例子并結(jié)合源碼分析了gevent的協(xié)程調(diào)度流程。gevent的使用非常方便,尤其是在web server中,基本上應(yīng)用App什么都不用做就能享受gevent帶來的好處。本文寫出來的主要目的在于想了解gevent對greenlet的封裝和使用,greenlet很強(qiáng)大,強(qiáng)大到容易出錯(cuò),而gevent保證在兩層協(xié)程之間切換,值得學(xué)習(xí)并加以使用。
推薦閱讀:
http://www.gevent.org/
https://pypi.org/project/greenlet/
gevent源碼