Lock
原始鎖: 實(shí)現(xiàn)原始鎖對象的類。一旦一個(gè)線程獲得一個(gè)鎖,會阻塞隨后嘗試獲得鎖的線程,直到它被釋放;
任何線程都可以釋放它。
class Lock:
def acquire(self, blocking=True, timeout=-1):
'''
blocking 阻塞: 阻塞狀態(tài)下會維持到timeout時(shí)間然后拋出Runtime異常
非阻塞: 直接返回該鎖狀態(tài)(已鎖---False 未鎖---True)
timeout 定義超時(shí)時(shí)間,值為-1時(shí)不定義超時(shí)時(shí)間
'''
return False
def release(self):
'''
釋放鎖(任何線程都可以調(diào)用這個(gè)方法)
'''
pass
原始鎖的原理相當(dāng)?shù)暮唵?,就是一種互斥的機(jī)制,一個(gè)線程獲取了該鎖,其他線程就在獲取鎖的過程中發(fā)生阻塞或者返回一個(gè)狀態(tài)碼(bool),在此基礎(chǔ)上實(shí)現(xiàn)下面的幾種同步概念
RLock
可重入鎖(遞歸鎖),解決在同一線程中多次上鎖引發(fā)的死鎖問題
class RLock:
def __init__(self):
# 維護(hù)一個(gè)原始鎖
self._block = Lock()
# 重入鎖當(dāng)前擁有者(線程id標(biāo)識)
self._owner = None
# 當(dāng)前擁有者的上鎖次數(shù)
self._count = 0
def acquire(self, blocking=True, timeout=-1):
# 獲取當(dāng)前線程id
me = get_ident()
# 判斷當(dāng)前重入鎖擁有者是否為自身
if self._owner == me:
# 如果為自身的就不阻塞而令count值 +1
self._count += 1
return 1
# 非擁有者則搶占鎖(阻塞或返回True值表示該鎖未有擁有者)
rc = self._block.acquire(blocking, timeout)
if rc:
# 未上鎖情況下定義擁有者為自身
self._owner = me
self._count = 1
return rc
def release(self):
# 只允許擁有者自身解鎖
if self._owner != get_ident():
raise RuntimeError("cannot release un-acquired lock")
# 解鎖一次令count - 1
self._count = count = self._count - 1
# 當(dāng)前線程的鎖被全部解開后釋放該RLock實(shí)例
if not count:
self._owner = None
# 解除對其余線程的阻塞
self._block.release()
可重入鎖實(shí)際上利用了原始鎖做了一層封裝,利用count記錄當(dāng)前線程的入鎖次數(shù)而并不是每次都搶占_block鎖,因?yàn)樵谕痪€程中搶占同一個(gè)鎖會導(dǎo)致死鎖,在擁有RLock實(shí)例的線程中需要入鎖解鎖相同次數(shù)才能讓別的線程通過acquire獲取到鎖。
Condition
條件變量,允許一個(gè)或多個(gè)線程在被其它線程所通知之前進(jìn)行等待。
class Condition:
def __init__(self, lock=None):
# 提供傳入的鎖對象, 支持RLock/Lock,可以利用同一個(gè)鎖對象來控制多個(gè)條件變量
if lock is None:
lock = RLock()
self._lock = lock
self.acquire = lock.acquire
self.release = lock.release
# 如果lock對象存在下列方法則覆蓋當(dāng)前Condition類的方法
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
# 定義waiters隊(duì)列
self._waiters = _deque()
def _release_save(self):
self._lock.release()
def _acquire_restore(self, x):
self._lock.acquire()
def _is_owned(self):
if self._lock.acquire(0):
self._lock.release()
return False
else:
return True
def wait(self, timeout=None):
# lock: lock對象未上鎖時(shí)不能執(zhí)行wait接口
# RLock: RLock對象則判斷當(dāng)前擁有者未非自身時(shí)不能執(zhí)行wait接口
# 在這里為何要判斷is_owned,就是為了對公共部分(_waiters)進(jìn)行上鎖,故此要配合with Condition進(jìn)行使用
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
# 此處生成的waiter實(shí)際上就是對當(dāng)前線程生成一把原始鎖,提供給擁有者進(jìn)行釋放并且放行該線程
waiter = _allocate_lock()
# 先對waiter上一次鎖
waiter.acquire()
# 放入waiter隊(duì)列供給擁有者訪問并釋放
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
# 阻塞在當(dāng)前線程并等待擁有者釋放
waiter.acquire()
gotit = True
else:
if timeout > 0:
# 只阻塞超時(shí)時(shí)間,過時(shí)后返回False
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
# 過時(shí)或者設(shè)定的非阻塞而返回的是False就移除該waiter
self._waiters.remove(waiter)
except ValueError:
pass
def notify(self, n=1):
# 只有擁有者可以對waiter隊(duì)列中的n個(gè)線程進(jìn)行'喚醒'功能
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
# 循環(huán)對waiter中需要喚醒的線程進(jìn)行'喚醒'
for waiter in waiters_to_notify:
waiter.release()
try:
all_waiters.remove(waiter)
except ValueError:
pass
條件變量比較關(guān)鍵的接口解析如上所述,實(shí)際上是包裝了一個(gè)鎖對象,可以是原始鎖Lock或者是可重入鎖RLock,多個(gè)線程通過共用一把鎖來進(jìn)行身份控制(主從),當(dāng)有線程對其鎖對象上鎖后則其余線程對此條件變量就只有wait方法。
Semaphore
信號量,初始化管理一個(gè)內(nèi)部計(jì)數(shù)器,當(dāng)計(jì)數(shù)器未為0時(shí)將不會阻塞線程
class Semaphore:
def __init__(self, value=1):
# 設(shè)定內(nèi)部計(jì)數(shù)器不能小于0
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
# 當(dāng)計(jì)數(shù)值為0時(shí)需要等待條件喚醒
self._cond.wait(timeout)
else:
self._value -= 1
rc = True
return rc
def release(self):
with self._cond:
# 其中一個(gè)線程進(jìn)行釋放的時(shí)候就會喚醒一個(gè)等待隊(duì)列中的線程,并令計(jì)數(shù)值不為0
self._value += 1
self._cond.notify()
信號量封裝了條件變量,利用其對其余線程的條件阻塞控制并發(fā)。
Event
事件對象,維護(hù)著一個(gè)標(biāo)識位(bool),通過set方法被設(shè)定為True,clear方法設(shè)定為False,wait方法將阻塞到標(biāo)志位為True的時(shí)候
class Event:
def __init__(self):
self._cond = Condition(Lock())
self._flag = False
def _reset_internal_locks(self):
# 重新設(shè)定條件變量里面的鎖
self._cond.__init__(Lock())
def is_set(self):
"""Return true if and only if the internal flag is true."""
return self._flag
def set(self):
with self._cond:
self._flag = True
# 喚醒所有的waiter
self._cond.notify_all()
def clear(self):
with self._cond:
self._flag = False
def wait(self, timeout=None):
# 等待同一事件喚醒所有等待該事件的線程
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait(timeout)
return signaled
事件對象封裝了條件變量,對標(biāo)志位為False的進(jìn)行阻塞,直至有線程對事件對象進(jìn)行set操作。
總結(jié)
python原生庫里面控制線程并發(fā)的幾個(gè)對象:Lock, Rlock, Condition, Semaphore, Event。實(shí)際上都是在基礎(chǔ)鎖的功能下進(jìn)行封裝修改,通過對這幾個(gè)對象的相互配合就能應(yīng)對較為復(fù)雜的線程并發(fā)開發(fā)需求。