Python的鎖源碼剖析

Lock

原始鎖: 實(shí)現(xiàn)原始鎖對象的類。一旦一個(gè)線程獲得一個(gè)鎖,會阻塞隨后嘗試獲得鎖的線程,直到它被釋放;
任何線程都可以釋放它。

class Lock:
    def acquire(self, blocking=True, timeout=-1):
        '''
        blocking 阻塞: 阻塞狀態(tài)下會維持到timeout時(shí)間然后拋出Runtime異常
                 非阻塞: 直接返回該鎖狀態(tài)(已鎖---False  未鎖---True)
        timeout 定義超時(shí)時(shí)間,值為-1時(shí)不定義超時(shí)時(shí)間
        '''
        return False
    def release(self):
        '''
        釋放鎖(任何線程都可以調(diào)用這個(gè)方法)
        '''
        pass

原始鎖的原理相當(dāng)?shù)暮唵?,就是一種互斥的機(jī)制,一個(gè)線程獲取了該鎖,其他線程就在獲取鎖的過程中發(fā)生阻塞或者返回一個(gè)狀態(tài)碼(bool),在此基礎(chǔ)上實(shí)現(xiàn)下面的幾種同步概念

RLock

可重入鎖(遞歸鎖),解決在同一線程中多次上鎖引發(fā)的死鎖問題

class RLock:
    def __init__(self):
        # 維護(hù)一個(gè)原始鎖
        self._block = Lock()
        # 重入鎖當(dāng)前擁有者(線程id標(biāo)識)
        self._owner = None
        # 當(dāng)前擁有者的上鎖次數(shù)
        self._count = 0
    def acquire(self, blocking=True, timeout=-1):
        # 獲取當(dāng)前線程id
        me = get_ident()
        # 判斷當(dāng)前重入鎖擁有者是否為自身
        if self._owner == me:
            # 如果為自身的就不阻塞而令count值 +1
            self._count += 1
            return 1
        # 非擁有者則搶占鎖(阻塞或返回True值表示該鎖未有擁有者)
        rc = self._block.acquire(blocking, timeout)
        if rc:
            # 未上鎖情況下定義擁有者為自身
            self._owner = me
            self._count = 1
        return rc
    def release(self):
        # 只允許擁有者自身解鎖
        if self._owner != get_ident():
            raise RuntimeError("cannot release un-acquired lock")
        # 解鎖一次令count - 1
        self._count = count = self._count - 1
        # 當(dāng)前線程的鎖被全部解開后釋放該RLock實(shí)例
        if not count:
            self._owner = None
            # 解除對其余線程的阻塞
            self._block.release()

可重入鎖實(shí)際上利用了原始鎖做了一層封裝,利用count記錄當(dāng)前線程的入鎖次數(shù)而并不是每次都搶占_block鎖,因?yàn)樵谕痪€程中搶占同一個(gè)鎖會導(dǎo)致死鎖,在擁有RLock實(shí)例的線程中需要入鎖解鎖相同次數(shù)才能讓別的線程通過acquire獲取到鎖。

Condition

條件變量,允許一個(gè)或多個(gè)線程在被其它線程所通知之前進(jìn)行等待。

class Condition:
    def __init__(self, lock=None):
        # 提供傳入的鎖對象, 支持RLock/Lock,可以利用同一個(gè)鎖對象來控制多個(gè)條件變量
        if lock is None:
            lock = RLock()
        self._lock = lock
        self.acquire = lock.acquire
        self.release = lock.release
        # 如果lock對象存在下列方法則覆蓋當(dāng)前Condition類的方法
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        # 定義waiters隊(duì)列
        self._waiters = _deque()
        def _release_save(self):
        self._lock.release()           

    def _acquire_restore(self, x):
        self._lock.acquire()           

    def _is_owned(self):
        if self._lock.acquire(0):
            self._lock.release()
            return False
        else:
            return True
    
    def wait(self, timeout=None):
        # lock: lock對象未上鎖時(shí)不能執(zhí)行wait接口
        # RLock: RLock對象則判斷當(dāng)前擁有者未非自身時(shí)不能執(zhí)行wait接口
        # 在這里為何要判斷is_owned,就是為了對公共部分(_waiters)進(jìn)行上鎖,故此要配合with Condition進(jìn)行使用
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        # 此處生成的waiter實(shí)際上就是對當(dāng)前線程生成一把原始鎖,提供給擁有者進(jìn)行釋放并且放行該線程
        waiter = _allocate_lock()
        # 先對waiter上一次鎖
        waiter.acquire()
        # 放入waiter隊(duì)列供給擁有者訪問并釋放 
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                # 阻塞在當(dāng)前線程并等待擁有者釋放
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
                    # 只阻塞超時(shí)時(shí)間,過時(shí)后返回False
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)
            if not gotit:
                try:
                    # 過時(shí)或者設(shè)定的非阻塞而返回的是False就移除該waiter
                    self._waiters.remove(waiter)
                except ValueError:
                    pass

    def notify(self, n=1):
        # 只有擁有者可以對waiter隊(duì)列中的n個(gè)線程進(jìn)行'喚醒'功能
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        # 循環(huán)對waiter中需要喚醒的線程進(jìn)行'喚醒'
        for waiter in waiters_to_notify:
            waiter.release()
            try:
                all_waiters.remove(waiter)
            except ValueError:
                pass

條件變量比較關(guān)鍵的接口解析如上所述,實(shí)際上是包裝了一個(gè)鎖對象,可以是原始鎖Lock或者是可重入鎖RLock,多個(gè)線程通過共用一把鎖來進(jìn)行身份控制(主從),當(dāng)有線程對其鎖對象上鎖后則其余線程對此條件變量就只有wait方法。

Semaphore

信號量,初始化管理一個(gè)內(nèi)部計(jì)數(shù)器,當(dāng)計(jì)數(shù)器未為0時(shí)將不會阻塞線程

class Semaphore:
    def __init__(self, value=1):
        # 設(shè)定內(nèi)部計(jì)數(shù)器不能小于0
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._cond = Condition(Lock())
        self._value = value

    def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        with self._cond:
            while self._value == 0:
                if not blocking:
                    break
                if timeout is not None:
                    if endtime is None:
                        endtime = _time() + timeout
                    else:
                        timeout = endtime - _time()
                        if timeout <= 0:
                            break
                # 當(dāng)計(jì)數(shù)值為0時(shí)需要等待條件喚醒
                self._cond.wait(timeout)
            else:
                self._value -= 1
                rc = True
        return rc

    def release(self):
        with self._cond:
            # 其中一個(gè)線程進(jìn)行釋放的時(shí)候就會喚醒一個(gè)等待隊(duì)列中的線程,并令計(jì)數(shù)值不為0
            self._value += 1
            self._cond.notify()

信號量封裝了條件變量,利用其對其余線程的條件阻塞控制并發(fā)。

Event

事件對象,維護(hù)著一個(gè)標(biāo)識位(bool),通過set方法被設(shè)定為True,clear方法設(shè)定為False,wait方法將阻塞到標(biāo)志位為True的時(shí)候

class Event:
    def __init__(self):
        self._cond = Condition(Lock())
        self._flag = False

    def _reset_internal_locks(self):
        # 重新設(shè)定條件變量里面的鎖
        self._cond.__init__(Lock())

    def is_set(self):
        """Return true if and only if the internal flag is true."""
        return self._flag

    def set(self):
        with self._cond:
            self._flag = True
            # 喚醒所有的waiter
            self._cond.notify_all()
    
    def clear(self):
        with self._cond:
            self._flag = False
    
    def wait(self, timeout=None):
        # 等待同一事件喚醒所有等待該事件的線程
        with self._cond:
            signaled = self._flag
            if not signaled:
                signaled = self._cond.wait(timeout)
            return signaled

事件對象封裝了條件變量,對標(biāo)志位為False的進(jìn)行阻塞,直至有線程對事件對象進(jìn)行set操作。

總結(jié)

python原生庫里面控制線程并發(fā)的幾個(gè)對象:Lock, Rlock, Condition, Semaphore, Event。實(shí)際上都是在基礎(chǔ)鎖的功能下進(jìn)行封裝修改,通過對這幾個(gè)對象的相互配合就能應(yīng)對較為復(fù)雜的線程并發(fā)開發(fā)需求。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 線程狀態(tài)新建,就緒,運(yùn)行,阻塞,死亡。 線程同步多線程可以同時(shí)運(yùn)行多個(gè)任務(wù),線程需要共享數(shù)據(jù)的時(shí)候,可能出現(xiàn)數(shù)據(jù)不...
    KevinCool閱讀 878評論 0 0
  • 線程 操作系統(tǒng)線程理論 線程概念的引入背景 進(jìn)程 之前我們已經(jīng)了解了操作系統(tǒng)中進(jìn)程的概念,程序并不能單獨(dú)運(yùn)行,只有...
    go以恒閱讀 1,795評論 0 6
  • 關(guān)于Python多線程的概述 由于GIL的存在,Python的多線程在CPU密集型任務(wù)并沒有多大的優(yōu)勢,任何Pyt...
    千鳥月讀閱讀 595評論 0 0
  • 寫在前面的話 代碼中的# > 表示的是輸出結(jié)果 輸入 使用input()函數(shù) 用法 注意input函數(shù)輸出的均是字...
    FlyingLittlePG閱讀 3,214評論 0 9
  • 鎖是一種同步機(jī)制,用于多線程環(huán)境中對資源訪問的限制iOS中常見鎖的性能對比圖(摘自:ibireme): iOS鎖的...
    LiLS閱讀 1,627評論 0 6

友情鏈接更多精彩內(nèi)容