python標準庫threading源碼解讀【一】

轉載自本人知乎:https://zhuanlan.zhihu.com/p/92702108

目錄

1.with

2.condition

3.semaphore


1. with

還是有必要先講解一下with的用法。它管理著類的“前世后生”,也就是在進入和退出類的時候調用。當然不是什么類都能夠使用“with”:它必須實現(xiàn)兩種“特殊方法”,即enter()和exit();字面意思就可以看出來“入口”和“出口”。

基本用法:緊跟with后面的語句被求值后,返回對象的?enter() 方法被調用,這個方法的返回值將被賦值給as后面的變量。當with后面的代碼塊全部被執(zhí)行完之后,將調用前面返回對象的?exit()方法。

such as:

而在threading中,with常常和“鎖”連用,常用方法如下:

with Lock as lock:

? ? XXX

? ? XXX

print(“END”)

# Just Like:

lock = Lock()

lock.acquire() # Notice!

XXX

XXX

print(“END”)

lock.release() # Notice!

我們在下面的源碼中會經常看到。

2. condition

condition的本質也是一把“鎖”,而且是個“遞歸鎖”。最重要的是,它提供了wait()和notify()方法,而wait()方法就是利用“遞歸鎖”,采用兩級鎖的方法實現(xiàn)了線程的等待。codition有兩級鎖,一把一級鎖會在進入wait方法的時候釋放,離開wait方法的時候再次獲取,二級鎖會在每次調用wait時分配一個新的鎖,并放入condition的等待隊列中,而notify負責釋放這個鎖。如下圖:


源碼(2-1/3)

class Condition:

???def init(self, lock=None):

???????if lock is None:

???????????lock = RLock()

???????self.lock = lock

???????# Export the lock’s acquire() and release() methods

???????self.acquire = lock.acquire

???????self.release = lock.release

???????# If the lock defines releasesave() and/or acquirerestore(),

???????# these override the default implementations (which just call

???????# release() and acquire() on the lock).?Ditto for isowned().

???????try:

???????????self.releasesave = lock.releasesave

???????except AttributeError:

???????????pass

???????try:

???????????self.acquirerestore = lock.acquirerestore

???????except AttributeError:

???????????pass

???????try:

???????????self.isowned = lock.isowned

???????except AttributeError:

???????????pass

???????self.waiters = deque()

???def enter(self):

???????return self.lock.enter()

???def exit(self, args):

???????return self.lock._exit(args)

???def repr(self):

???????return “” % (self.lock, len(self.waiters))

???def releasesave(self):

???????self.lock.release()?????????? # Nostate to save

???def acquirerestore(self, x):

???????self.lock.acquire()?????????? #Ignore saved state

???def isowned(self):

???????# Return True if lock is owned by current_thread.

???????# This method is called only if _lock doesn’t have _is_owned().

???????if self._lock.acquire(0):

???????????self._lock.release()

???????????return False

???????else:

???????????return True

上述包括:__init__(), __enter__(), __exit__(), __repr__(), _release_save(), _acquire_restore(), _is_owned();

(1).__enter__(), __exit__():直接繼承于Lock的上下文管理方式;

(2).__repr__():特殊方法之一,好像是將condition類用str字符串的方法輸出時候,調用此方法

(3)._is_owned():如果一級鎖由當前線程獲得,返回True。它是怎么判斷的呢?

他首先嘗試獲得一級鎖,如果可以獲得,那就說明當前線程沒有鎖,立馬釋放掉,返回false;反之,返回True;

(4)._release_save(), _acquire_restore():

release_save = lock.release(),釋放掉一級鎖,保存狀態(tài)

_acquire_restore = lock.acquire(),恢復一級鎖

(5).__init__():如果lock為none,則創(chuàng)建一個;

定義lock,即為一級鎖;condition類繼承l(wèi)ocktype的acquire()和release()方法;這些都是非常有用的,且必須的!(廢話嗎這不是2333333)

如果lock類型定義了_release_save和_acquire_restore嘗試重載他們;重載不掉就過,說明lock類型里面就有這種方式,則直接pass;

(6).定義 _waiters 雙端隊列,注意文件開頭的:from collection import deque as _deque.

源碼(2-2/3)

(1).wait()用于阻塞本線程,直到其他線程調用notify()喚醒或者timeout結束

Line2-3:線程想等待,自己要有鎖吧;所以判斷當前線程是否有鎖,沒有就拋出異常;

Line4-6:再定義一個lock方法,變量waiter,并獲得這個二級鎖(注意,RLock是遞歸鎖,可以嵌套);把這個鎖添加進全局私有_waiter 集合最后

Line7:saved_state = self._release_save(),保存狀態(tài),釋放一級鎖;可以讓其他線程獲得,這樣才能調用notify();

Line8:定義一個bool類型的返回值gotit;

Line9-18:直接進入“try”

如果timeout == None,waiter上鎖,線程就此阻塞,并且gotit = true ,等待有效;

否則判斷timeout的正負性,如果正的,就獲得鎖線程就此阻塞,返回True賦值給gotit;反之返回False,等待失??;

Line19-25:try……finally……語句通常與資源回收有關

不管try是否含有return,不管try是否拋出異常,finally都會return之前執(zhí)行。

注意到wait()有可能被用戶和系統(tǒng)中斷,導致沒有上鎖的waiter進入了_waiters隊列:

finally中首先嘗試恢復線程一級鎖,也就是等待其他線程釋放一級鎖;沒有就會一直等待。也就是說,調用notify函數(shù)的線程一定要釋放掉底層鎖,其他線程才會繼續(xù)執(zhí)行。

進一步判斷gotit,如果為False,說明waiter沒有釋放,我們從_waiters中剔除它,注意他會和下面的notify()函數(shù)同時嘗試,報錯的話pass就行。

(2).wait_for()斷言等待函數(shù)

Line27-29:定義了變量 endtime 結束時間和 waittime 等待時間,分別初始化為:None和傳入的參數(shù)timeout。

result 獲得斷言的返回值,值得注意的是,斷言的運行也是需要不少時間的,這個時間可能超過線程期望等待時間,下面就是要解決此問題。

Line30-40:斷言為False則進入循環(huán);

首先waittime,也就是timeout,不能是None;如果是None,那就沒啥好說的,直接等待吧;如果不是才要繼續(xù)判斷。

在第一次進入循環(huán)的時候,計算出剩余的等待時間(扣除第一次等待斷言返回的時間)。

這個時間太長就不能等待了;否則的話,等待剩余時間waittime,進而再計算斷言;不管斷言結果如何,都只能跳出循環(huán)了,因為waittime一定是<=0的。

3. semaphore

源碼(3-1/3)

Semaphore,俗稱信號量管理器,可以傳入?yún)?shù)value,默認為1,當然他不等小于0;

首先實例化一個Semaphore類,判斷value是否大于0,定義一個私有鎖_Condition、和私有變量_value。

源碼(3-2/3)

acquire(), release():這個類中的acquier和release函數(shù)并不是傳統(tǒng)意義上的獲得鎖和釋放鎖,他們在各自方法中分別獲得、釋放了一次鎖。

維護了一個counter計數(shù):它等于 = release()調用次數(shù)acquire()調用的數(shù)量 + 初始值。例如:當n個線程獲得鎖,acquire調用了n次,一次都沒有調用release()的情況下,n不能大于value。

參數(shù)blocking:必要時可以阻止acquire()方法,直到線程返回并且計數(shù)器大于0。

注意acquire()函數(shù)過程判斷傳入?yún)?shù),如果不能阻塞并且timeout還不為none,那肯定報錯了。

定義rc返回值、定義endtime;endtime和timeout相互作用的方式,和wait_for()一樣;

假設兩個線程t1和t2。默認情況下,value = 1,t1獲得私有鎖,while循環(huán)不成立,value – 1,rc = True,釋放私有鎖;此時t2一直等待獲得私有鎖;但是t1釋放私有鎖之后,有兩種可能:

(1)t2獲得,并且value = 0

那么t2進入wait()等待,牛逼的是,進入wait后,會釋放一級鎖!

釋放一級鎖之后,t1中的release()就會獲得;進一步調用notify函數(shù)喚醒t2!

(2)t1的release()獲得

這種情況就相當于兩個線程各玩各的,互不干擾,沒有線程可以喚醒,notify()直接pass就ok!

我畫了個簡圖:

release()函數(shù)過程

value加一,調用notify,喚醒一個線程;

__enter__()和__exit__()魔法方法:使得類可以使用“with”上下文管理;其中__exit__()指向release(),還type,value,traceback三個參數(shù),在with拋出異常的時候,分別表示異常的:類型,值和位置;不過也沒利用起來??

繼承類BoundedSemaphore

源碼(3-3/3)


繼承于Semaphore。

實例化此類,首先創(chuàng)建一個Semaphore類,和一個私有變量_initial_value,表示初始化的_value值;

重載了Semaphore的release()方法;

而私有變量_initial_value的作用便在這里體現(xiàn)出來,就是release調用不能多于acquier,否則就報錯!

后面的就和父類一樣了鴨~

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容