鎖的原理(二):自旋鎖、互斥鎖以及讀寫鎖

一、鎖的分類

在分析其它鎖之前,需要先區(qū)分清楚鎖的區(qū)別,基本的鎖包括了二類:互斥鎖自旋鎖。

1.1 自旋鎖

自旋鎖:線程反復檢查鎖變量是否可用。由于線程在這一過程中保持執(zhí)行, 因此是一種 忙等。一旦獲取了自旋鎖,線程會一直保持該鎖,直至顯式釋放自旋鎖。 自旋鎖避免了進程上下文的調(diào)度開銷,因此對于線程只會阻塞很短時間的場合是有效的。

自旋鎖 = 互斥鎖 + 忙等。OSSpinLock就是自旋鎖。

1.2 互斥鎖

互斥鎖:是一種用于多線程編程中,防止兩條線程同時對同一公共資源(比如全局變量)進行讀寫的機制。該目的通過將代碼切片成一個一個的臨界區(qū)而達成。
Posix Thread中定義有一套專?用于線程同步的mutex函數(shù),mutex用于保證在任何時刻,都只能有一個線程訪問該對象。 當獲取鎖操作失敗時,線程會進入睡眠,等待鎖釋放時被喚醒(閑等)。

創(chuàng)建和銷毀:

  • POSIX定義了一個宏PTHREAD_MUTEX_INITIALIZER來靜態(tài)初始化互斥鎖。
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr)
  • pthread_mutex_destroy ()用于注銷一個互斥鎖。

鎖操作相關(guān)API

 int pthread_mutex_lock(pthread_mutex_t *mutex)
 int pthread_mutex_unlock(pthread_mutex_t *mutex)
 int pthread_mutex_trylock(pthread_mutex_t *mutex)
  • pthread_mutex_trylock()語義與pthread_mutex_lock()類似,不同的是在鎖已經(jīng)被占據(jù)時返回EBUSY而不是掛起等待。

互斥鎖 分為 遞歸鎖非遞歸鎖

  • 遞歸鎖:
    @synchronized:多線程可遞歸。
    NSRecursiveLock:不支持多線程可遞歸。
    pthread_mutex_t(recursive):多線程可遞歸。
  • 非遞歸鎖:NSLock、pthread_mutex、dispatch_semaphore、os_unfair_lock。
  • 條件鎖:NSCondition、NSConditionLock。
  • 信號量(semaphore):是一種更高級的同步機制,互斥鎖可以說是
    semaphore在僅取值0/1時的特例。信號量可以有更多的取值空間,用來實
    現(xiàn)更加復雜的同步,而不單單是線程間互斥。dispatch_semaphore

1.2.1 讀寫鎖

讀寫鎖實際是一種特殊的互斥鎖,它把對共享資源的訪問者劃分成讀者和寫者,讀者只對共享資源進行讀訪問,寫者則需要對共享資源進行寫操作。這種鎖 相對于自旋鎖而言,能提高并發(fā)性。因為在多處理器系統(tǒng)中,它允許同時有多個讀者來訪問共享資源,最大可能的讀者數(shù)為實際的邏輯CPU 數(shù)。寫者是排他性的,一個讀寫鎖同時只能有一個寫者或多個讀者(與CPU數(shù)相關(guān)),但不能同時既有讀者又有寫者,在讀寫鎖保持期間也是搶占失效的。

如果讀寫鎖當前沒有讀者,也沒有寫者,那么寫者可以立刻獲得讀寫鎖,否則它必須自旋在那里, 直到?jīng)]有任何寫者或讀者。如果讀寫鎖沒有寫者,那么讀者可以立即獲得該讀寫鎖,否則讀者必須自旋在那里,直到寫者釋放該讀寫鎖。

一次只有一個線程可以占有寫模式的讀寫鎖,可以有多個線程同時占有讀模式的讀寫鎖。正是因為這個特性,當讀寫鎖是寫加鎖狀態(tài)時,在這個鎖被解鎖之前, 所有試圖對這個鎖加鎖的線程都會被阻塞。當讀寫鎖在讀加鎖狀態(tài)時, 所有試圖以讀模式對它進行加鎖的線程都可以得到訪問權(quán), 但是如果線程希望以寫模式對此鎖進行加鎖, 它必須直到所有的線程釋放鎖。
通常當讀寫鎖處于讀模式鎖住狀態(tài)時,如果有另外線程試圖以寫模式加鎖,讀寫鎖通常會阻塞隨后的讀模式鎖請求,這樣可以避免讀模式鎖?期占用而導致等待的寫模式鎖請求?期阻塞。
讀寫鎖適合于對數(shù)據(jù)結(jié)構(gòu)的讀次數(shù)比寫次數(shù)多得多的情況。 因為讀模式鎖定可以共享,寫模式鎖住時意味著獨占,所以讀寫鎖又叫共享-獨占鎖。

創(chuàng)建和銷毀API

#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

成功則返回0, 出錯則返回錯誤編號。

同互斥鎖一樣, 在釋放讀寫鎖占用的內(nèi)存之前, 需要先通過pthread_rwlock_destroy對讀寫鎖進行清理工作,釋放由init分配的資源。

鎖操作相關(guān)API:

#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

成功則返回0, 出錯則返回錯誤編號。這3個函數(shù)分別實現(xiàn)獲取讀鎖,獲取寫鎖和釋放鎖的操作。獲取鎖的兩個函數(shù)是阻塞操作,同樣非阻塞的函數(shù)為:

#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

非阻塞的獲取鎖操作,如果可以獲取則返回0, 否則返回錯誤的EBUSY

更多概念和解釋:
互斥鎖
線程同步

二、NSLock & NSRecursiveLock 的應用以及原理

2.1 案例一

__block NSMutableArray *array;
for (int i = 0; i < 10000; i++) {
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        array = [NSMutableArray array];
    });
}

對于上面的代碼運行會發(fā)生崩潰,常規(guī)處理是對它加一個鎖,如下:

__block NSMutableArray *array;
self.lock = [[NSLock alloc] init];
for (int i = 0; i < 10000; i++) {
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        [self.lock lock];
        array = [NSMutableArray array];
        [self.lock unlock];
    });
}

這樣就能解決array的創(chuàng)建問題了。

2.2 案例二

for (int i = 0; i < 10; i++) {
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        static void (^testMethod)(int);
        testMethod = ^(int value){
            if (value > 0) {
                NSLog(@"current value = %d",value);
                testMethod(value - 1);
            }
        };
        testMethod(10);
    });
}

上面的例子中最終輸出會錯亂:


image.png

可以在block調(diào)用前后加解鎖解決:

for (int i = 0; i < 10; i++) {
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        static void (^testMethod)(int);
        testMethod = ^(int value){
            if (value > 0) {
                NSLog(@"current value = %d",value);
                testMethod(value - 1);
            }
        };
        [self.lock lock];
        testMethod(10);
        [self.lock unlock];
    });
}

但是在實際開發(fā)中鎖往往是與業(yè)務代碼綁定在一起的,如下:


image.png

這個時候block在執(zhí)行前會同一時間進入多次,相當于多次加鎖了(遞歸),這樣就產(chǎn)生了死鎖。NSLog只會執(zhí)行一次。

NSLock改為NSRecursiveLock可以解決NSLock存在的死鎖問題:

for (int i = 0; i < 10; i++) {
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        static void (^testMethod)(int);
        testMethod = ^(int value){
            [self.recursiveLock lock];
            if (value > 0) {
                NSLog(@"current value = %d",value);
                testMethod(value - 1);
            }
            [self.recursiveLock unlock];
        };
        testMethod(10);
    });
}

但是在執(zhí)行testMethod一次(也有可能是多次)遞歸調(diào)用后沒有繼續(xù)輸出:

image.png

由于NSRecursiveLock不支持多線程可遞歸。所以改為@synchronized

    for (int i = 0; i < 10; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            static void (^testMethod)(int);
            testMethod = ^(int value){
                @synchronized (self) {
                    if (value > 0) {
                        NSLog(@"current value = %d",value);
                        testMethod(value - 1);
                    }
                }
            };
            testMethod(10);
        });
    }

就能完美解決問題了。

NSRecursiveLock 解決了 NSLock 遞歸問題,@synchronized 解決了 NSRecursiveLock 多線程可遞歸問題問題。

2.3 原理分析

NSLockNSRecursiveLock是定義在Foundation框架中的,Foundation框架并沒有開源。有三種方式來探索:

  • 分析Foundation動態(tài)庫的匯編代碼。
  • 斷點跟蹤加鎖解鎖流程。
  • Swift Foundation源碼分析。雖然Foundation框架本身沒有,但是蘋果開源了Swift Foundation的代碼。原理是想通的。swift-corelibs-foundation

當然有興趣可以嘗試編譯可運行版本進行調(diào)試 swift-foundation 源碼編譯

FoundationlockunlockNSLocking協(xié)議提供的方法:

@protocol NSLocking
- (void)lock;
- (void)unlock;
@end

Swift Foundation源碼中同樣有NSLocking協(xié)議:

public protocol NSLocking {
    func lock()
    func unlock()
}

2.3.1 NSLock 源碼分析

image.png

底層是對pthread_mutex_init的封裝。lockunlock同樣是對pthread_mutex_lockpthread_mutex_unlock的封裝:
image.png

通過宏定義可以看到Swift的跨平臺支持。

2.3.2 NSRecursiveLock 源碼分析

image.png

內(nèi)部是對PTHREAD_MUTEX_RECURSIVE的封裝。lockunlock同樣是對pthread_mutex_lockpthread_mutex_unlock的封裝。

三、NSCondition 原理

NSCondition實際上作為一個 和一個 線程檢查器。鎖主要為了當檢測條件時保護數(shù)據(jù)源,執(zhí)行條件引發(fā)的任務;線程檢查器主要是根據(jù)條件決定是否繼續(xù)運行線程,即線程是否被阻塞。

  • [condition lock]:一般用于多線程同時訪問、修改同一個數(shù)據(jù)源,保證在同一 時間內(nèi)數(shù)據(jù)源只被訪問、修改一次,其他線程的命令需要在lock 外等待,只到 unlock后才可訪問。
  • [condition unlock]:與lock同時使用。
  • [condition wait]:讓當前線程處于等待狀態(tài)。
  • [condition signal]CPU發(fā)信號告訴線程不用在等待,可以繼續(xù)執(zhí)行。

3.1 生產(chǎn)者-消費者 案例

- (void)testNSCondition {
    //創(chuàng)建生產(chǎn)-消費者
    for (int i = 0; i < 10; i++) {
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
            [self test_producer];
        });
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
            [self test_consumer];
        });
        
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
            [self test_consumer];
        });
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
            [self test_producer];
        });
    }
}

- (void)test_producer{
    [self.condition lock];
    self.ticketCount = self.ticketCount + 1;
    NSLog(@"生產(chǎn) + 1 剩余: %zd",self.ticketCount);
    [self.condition signal]; // 信號
    [self.condition unlock];
}

- (void)test_consumer{
    [self.condition lock];
    if (self.ticketCount == 0) {
        NSLog(@"等待 剩余: %zd",self.ticketCount);
        [self.condition wait];
    }
    //消費行為,要在等待條件判斷之后
    self.ticketCount -= 1;
    NSLog(@"消費 - 1 剩余: %zd ",self.ticketCount);
    [self.condition unlock];
}

輸出:

生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0 
等待 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0 
等待 剩余: 0
等待 剩余: 0
等待 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0 
等待 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0 
等待 剩余: 0
等待 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0 
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0 
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0 
等待 剩余: 0
等待 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0 
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0 
等待 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0 
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0 
生產(chǎn) + 1 剩余: 1
生產(chǎn) + 1 剩余: 2
消費 - 1 剩余: 1 
消費 - 1 剩余: 0 
生產(chǎn) + 1 剩余: 1
生產(chǎn) + 1 剩余: 2
生產(chǎn) + 1 剩余: 3
消費 - 1 剩余: 2 
生產(chǎn) + 1 剩余: 3
消費 - 1 剩余: 2 
生產(chǎn) + 1 剩余: 3
消費 - 1 剩余: 2 
消費 - 1 剩余: 1 
生產(chǎn) + 1 剩余: 2
生產(chǎn) + 1 剩余: 3
消費 - 1 剩余: 2 
消費 - 1 剩余: 1 
消費 - 1 剩余: 0 

因為有condition的存在保證了消費行為是在對應的生產(chǎn)行為之后。在這個過程中會有消費等待行為,signal信號通知消費。

  • 生產(chǎn)和消費的加鎖保證了各個事務的額安全。
  • waitsignal保證了事務之間的安全。

3.2 源碼分析

image.png

內(nèi)部也是對pthread_mutex_init的包裝,多了一個pthread_cond_init

open func lock() {
    pthread_mutex_lock(mutex)
}

open func unlock() {
    pthread_mutex_unlock(mutex)
}

open func wait() {
    pthread_cond_wait(cond, mutex)
}

open func signal() {
    pthread_cond_signal(cond)
}

open func broadcast() {
    pthread_cond_broadcast(cond)
}

代碼中去掉了windows相關(guān)宏邏輯:

  • NSCondition:鎖(pthread_mutex_t) + 線程檢查器(pthread_cond_t
  • 鎖(pthread_mutex_t):lock(pthread_mutex_lock) + unlock(pthread_mutex_unlock)
  • 線程檢查器(pthread_cond_t):wait(pthread_cond_wait) + signal(pthread_cond_signal)

四、NSConditionLock 使用和原理

NSConditionLock也是鎖,一旦一個線程獲得鎖,其他線程一定等待。它同樣遵循NSLocking協(xié)議,相關(guān)API:

- (void)lockWhenCondition:(NSInteger)condition;
- (void)unlockWithCondition:(NSInteger)condition;
- (BOOL)lockWhenCondition:(NSInteger)condition beforeDate:(NSDate *)limit;
  • [conditionLock lock]:表示 conditionLock 期待獲得鎖,如果沒有其他線程獲得鎖(不需要判斷內(nèi)部的condition) 那它能執(zhí)行此行以下代碼,如果已經(jīng)有其他線程獲得鎖(可能是條件鎖,或者無條件鎖),則等待,直至其他線程解鎖。
  • [conditionLock lockWhenCondition:A條件]:表示如果沒有其他線程獲得該鎖,但是該鎖內(nèi)部的 condition不等于A條件,它依然不能獲得鎖,仍然等待。如果內(nèi)部的condition等于A條件,并且沒有其他線程獲得該鎖,則進入代碼區(qū),同時設(shè)置它獲得該鎖,其他任何線程都將等待它代碼的完成,直至它解鎖。
  • [conditionLock unlockWithCondition:A條件]: 表示釋放鎖,同時把內(nèi)部的condition設(shè)置為A條件。
  • return = [conditionLock lockWhenCondition:A條件 beforeDate:A時間]: 表示如果被鎖定(沒獲得鎖),并超過該時間則不再阻塞線程。注意:返回的值是NO,它沒有改變鎖的狀態(tài),這個函數(shù)的目的在于可以實現(xiàn)兩種狀態(tài)下的處理。
  • 所謂的condition就是整數(shù),內(nèi)部通過整數(shù)比較條件。

4.1案例

NSConditionLock *conditionLock = [[NSConditionLock alloc] initWithCondition:2];
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
    [conditionLock lockWhenCondition:1];
    NSLog(@"1");
    [conditionLock unlockWithCondition:0];
});

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
    [conditionLock lockWhenCondition:2];
    sleep(1);
    NSLog(@"2");
    [conditionLock unlockWithCondition:1];
});

dispatch_async(dispatch_get_global_queue(0, 0), ^{
   [conditionLock lock];
   NSLog(@"3");
   [conditionLock unlock];
});

上面的案例2一定比1先執(zhí)行,23之間無序。
輸出:3 2 1,如果任務2的優(yōu)先級改為High則輸出順序變?yōu)?code>2 1 3。

那么有以下疑問:

  • 1.NSConditionLockNSCondition有關(guān)系么?
  • 2.NSConditionLock初始化的時候condition是什么?
  • 3.lockWhenCondition是如何控制的?
  • 4.unlockWithCondition是如何控制的?

4.2 斷點調(diào)試分析邏輯

在拿不到源碼以及拿不到動態(tài)庫的情況下,斷點分析調(diào)用流程是一個比較好的方案。
分別在測試代碼中打下以下斷點:


image.png

運行工程到達斷點后下符號斷點-[NSConditionLock initWithCondition:]過掉斷點:

image.png

這個時候就進入了initWithCondition的匯編實現(xiàn)。在匯編中對所有的b(跳轉(zhuǎn)指令)下斷點配合寄存器的值跟蹤流程。

  • -[NSConditionLock initWithCondition:]:
    image.png

    可以通過lldb讀取寄存器的值,也可以查看全部寄存器中對應的值。

過掉斷點繼續(xù):

image.png

執(zhí)行了:- [xxx init]。繼續(xù)下一步:
image.png

調(diào)用了-[NSConditionLock init],繼續(xù)下一步:
image.png

調(diào)用了-[NSConditionLock zone],繼續(xù)下異步:
image.png

調(diào)用了+[NSCondition allocWithZone:],繼續(xù)下一步:
image.png

調(diào)用了-[NSCondition init],繼續(xù):
image.png

最終返回了創(chuàng)建的NSConditionLock對象,它持有NSCondition對象以及初始化傳的condition參數(shù)2。
-[NSConditionLock initWithCondition:]流程:

-[NSConditionLock initWithCondition:]
    -[xxx init]
    -[NSConditionLock init]
    -[NSConditionLock zone]
    +[NSCondition allocWithZone:]
    -[NSCondition init]
  • -[NSConditionLock lockWhenCondition:]
    同樣添加-[NSConditionLock lockWhenCondition:]符號斷點:
    image.png

    調(diào)用了[NSDate distantFuture],繼續(xù):
    image.png

    調(diào)用了-[NSConditionLock lockWhenCondition:beforeDate:],對它增加一個符號斷點:
    image.png

調(diào)用了-[NSCondition lock],繼續(xù)跟蹤:

image.png

調(diào)用了-[NSCondition waitUntilDate:]
image.png

在調(diào)試的過程中會跳轉(zhuǎn)另外一個線程的-[NSConditionLock lockWhenCondition:],流程與上面的一致,繼續(xù)調(diào)用會調(diào)用了-[NSCondition unlock]:
image.png

這個時候直接返回1(true),接著對-[NSConditionLock unlockWithCondition:]下符號斷點:
image.png

調(diào)用了-[NSCondition lock]
image.png

調(diào)用了-[NSCondition broadcast]:
image.png

調(diào)用了-[NSCondition unlock],這個時候繼續(xù)過斷點就又會回到線程4,調(diào)用邏輯和線程3相同。

完整調(diào)用邏輯如下:

線程4
-[NSConditionLock lockWhenCondition:]
    +[NSDate distantFuture]
    -[NSConditionLock lockWhenCondition:beforeDate:]
        -[NSCondition lock]
        -[NSCondition waitUntilDate:]
            線程3
            -[NSConditionLock lockWhenCondition:]
                +[NSDate distantFuture]
                -[NSConditionLock lockWhenCondition:beforeDate:]
                    -[NSCondition lock]
                    -[NSCondition unlock]
                    返回1(true)
            -[NSConditionLock unlockWithCondition:]
                -[NSCondition lock]
                -[NSCondition broadcast]
                -[NSCondition unlock]
        //回到線程4
        -[NSCondition unlock]
        返回1(true)
-[NSConditionLock unlockWithCondition:]
    -[NSCondition lock]
    -[NSCondition broadcast]
    -[NSCondition unlock]

流程總結(jié):

  • 線程4調(diào)用[NSConditionLock lockWhenCondition:],此時因為不滿足當前條件,所
    以會進入 waiting 狀態(tài),當前進入到 waiting 時,會釋放當前的互斥鎖。
  • 此時當前的線程2 調(diào)用[NSConditionLock lock:],本質(zhì)上是調(diào)用[NSConditionLock lockBeforeDate:]這里不需要比對條件值,所以任務3會執(zhí)行。
  • 接下來線程3 執(zhí)行[NSConditionLock lockWhenCondition:],因為滿足條件值,所以線任務2會執(zhí)行,執(zhí)行完成后會調(diào)用[NSConditionLock unlockWithCondition:],這個時候?qū)?br> condition 設(shè)置為 1,并發(fā)送 boradcast, 此時線程 4接收到當前的信號,喚醒執(zhí)行并打印。
  • 這個時候任務執(zhí)行順序為任務3 -> 任務2 -> 任務1。
  • [NSConditionLock lockWhenCondition:]會根據(jù)傳入的 condition
    行對比,如果不相等,這里就會阻塞,進入線程池,否則的話就繼續(xù)代碼執(zhí)行。
  • [NSConditionLock unlockWithCondition:] 會先更改當前的condition值,然后進行廣播,喚醒當前的線程。

4.3 源碼分析

  • initWithCondition

    image.png

    保存了condition參數(shù)以及NSCondition的創(chuàng)建。

  • lockWhenCondition

open func lock(whenCondition condition: Int) {
    let _ = lock(whenCondition: condition, before: Date.distantFuture)
}

內(nèi)部調(diào)用了lockWhenCondition: before:,默認值傳的Date.distantFuture

open func lock(whenCondition condition: Int, before limit: Date) -> Bool {
    _cond.lock()
    while _thread != nil || _value != condition {
        if !_cond.wait(until: limit) {
            _cond.unlock()
            return false
        }
    }
    _thread = pthread_self()
    _cond.unlock()
    return true
}

NSCondition加鎖判斷condition條件是否滿足,不滿足調(diào)用NSConditionwait waitUntilDate方法進入等待,超時后解鎖返回false。滿足的情況下賦值_thread解鎖返回true

  • unlockWithCondition
open func unlock(withCondition condition: Int) {
    _cond.lock()
    _thread = nil
    _value = condition
    _cond.broadcast()
    _cond.unlock()
}

加鎖后釋放_thread,更新condition,調(diào)用broadcast后解鎖。

  • lock
open func lock() {
    let _ = lock(before: Date.distantFuture)
}

open func lock(before limit: Date) -> Bool {
    _cond.lock()
    while _thread != nil {
        if !_cond.wait(until: limit) {
            _cond.unlock()
            return false
        }
    }
    _thread = pthread_self()
    _cond.unlock()
    return true
}

判斷是否有其它任務阻塞,沒有阻塞直接創(chuàng)建_thread返回true。

  • unlock
open func unlock() {
    _cond.lock()
    _thread = nil
    _cond.broadcast()
    _cond.unlock()
}

廣播并且釋放_thread。

4.4 反匯編分析

  • initWithCondition

    image.png

  • lockWhenCondition

-(int)lockWhenCondition:(int)arg2 {
    r0 = [arg0 lockWhenCondition:arg2 beforeDate:[NSDate distantFuture]];
    return r0;
}

調(diào)用自己的lockWhenCondition: beforeDate :

image.png

  • unlockWithCondition:
-(int)unlockWithCondition:(int)arg2 {
    r0 = object_getIndexedIvars(arg0);
    [*r0 lock];
    *(int128_t *)(r0 + 0x8) = 0x0;
    *(int128_t *)(r0 + 0x10) = arg2;
    [*r0 broadcast];
    r0 = *r0;
    r0 = [r0 unlock];
    return r0;
}
  • lock
int -[NSConditionLock lock](int arg0) {
    r0 = [arg0 lockBeforeDate:[NSDate distantFuture]];
    return r0;
}

lockBeforeDate:

image.png

  • unlock:
int -[NSConditionLock unlock]() {
    r0 = object_getIndexedIvars(r0);
    [*r0 lock];
    *(r0 + 0x8) = 0x0;
    [*r0 broadcast];
    r0 = *r0;
    r0 = [r0 unlock];
    return r0;
}

匯編、源碼以及斷點調(diào)試邏輯相同。
NSConditionLock 內(nèi)部封裝了NSCondition。

五、OSSpinLock & os_unfair_lock

image.png

OSSpinLockAPI注釋以及它自己的命名說明了這是一把自旋鎖,自iOS10之后被os_unfair_lock替代。
image.png

  • os_unfair_lock必須以OS_UNFAIR_LOCK_INIT初始化。
  • 它是用來代替OSSpinLock的。
  • 它不是自旋鎖(忙等),是被內(nèi)核喚醒的(閑等)。
image.png

image.png

可以看到這兩個鎖都是定義在libsystem_platform.dylib中的??梢栽?code>openSource中找到他們libplatform的源碼libplatform,實現(xiàn)是在/src/os目錄下的lock.c文件中。

5.1 OSSpinLock 源碼分析

OSSpinLock的使用一般會用到以下API

OSSpinLock hp_spinlock = OS_SPINLOCK_INIT;
OSSpinLockLock(&hp_spinlock);
OSSpinLockUnlock(&hp_spinlock);
  • OSSpinLock
typedef int32_t OSSpinLock OSSPINLOCK_DEPRECATED_REPLACE_WITH(os_unfair_lock);

#define OS_SPINLOCK_INIT    0

OSSpinLock本身是一個int32_t類型的值,初始化默認值為0。

5.1.1 OSSpinLockLock

void
OSSpinLockLock(volatile OSSpinLock *l)
{
    OS_ATOMIC_ALIAS(spin_lock, OSSpinLockLock);
    OS_ATOMIC_ALIAS(_spin_lock, OSSpinLockLock);
    bool r = os_atomic_cmpxchg(l, 0, _OSSpinLockLocked, acquire);
    if (likely(r)) return;
    return _OSSpinLockLockSlow(l);
}

#if TARGET_OS_IPHONE && !TARGET_OS_SIMULATOR
static const OSSpinLock _OSSpinLockLocked = 1;
#else
static const OSSpinLock _OSSpinLockLocked = -1;
#endif

OS_ATOMIC_ALIAS定義如下:

#undef OS_ATOMIC_ALIAS
#define OS_ATOMIC_ALIAS(n, o)
static void _OSSpinLockLock(volatile OSSpinLock *l);

這里相當于分了兩條路徑,通過_OSSpinLockLocked標記是否被鎖定。在源碼中并沒有找到_OSSpinLockLock函數(shù)的實現(xiàn)。

5.1.1.1 _OSSpinLockLockSlow

#if OS_ATOMIC_UP
void
_OSSpinLockLockSlow(volatile OSSpinLock *l)
{
    return _OSSpinLockLockYield(l); // Don't spin on UP
}
#elif defined(__arm64__)
// Exclusive monitor must be held during WFE <rdar://problem/22300054>
#if defined(__ARM_ARCH_8_2__)
void
_OSSpinLockLockSlow(volatile OSSpinLock *l)
{
    uint32_t tries = OS_LOCK_SPIN_SPIN_TRIES;
    OSSpinLock lock;
_spin:
    while (unlikely(lock = os_atomic_load_exclusive(l, relaxed))) {
        if (unlikely(lock != _OSSpinLockLocked)) {
            os_atomic_clear_exclusive();
            return _os_lock_corruption_abort((void *)l, (uintptr_t)lock);
        }
        if (unlikely(!tries--)) {
            os_atomic_clear_exclusive();
            return _OSSpinLockLockYield(l);
        }
        OS_LOCK_SPIN_PAUSE();
    }
    os_atomic_clear_exclusive();
    bool r = os_atomic_cmpxchg(l, 0, _OSSpinLockLocked, acquire);
    if (likely(r)) return;
    goto _spin;
}
#else // !__ARM_ARCH_8_2__
void
_OSSpinLockLockSlow(volatile OSSpinLock *l)
{
    uint32_t tries = OS_LOCK_SPIN_SPIN_TRIES;
    OSSpinLock lock;
    os_atomic_rmw_loop(l, lock, _OSSpinLockLocked, acquire, if (unlikely(lock)){
        if (unlikely(lock != _OSSpinLockLocked)) {
            os_atomic_rmw_loop_give_up(return
                    _os_lock_corruption_abort((void *)l, (uintptr_t)lock));
        }
        if (unlikely(!tries--)) {
            os_atomic_rmw_loop_give_up(return _OSSpinLockLockYield(l));
        }
        OS_LOCK_SPIN_PAUSE();
        continue;
    });
}
#endif // !__ARM_ARCH_8_2__
#else // !OS_ATOMIC_UP
void
_OSSpinLockLockSlow(volatile OSSpinLock *l)
{
    uint32_t tries = OS_LOCK_SPIN_SPIN_TRIES;
    OSSpinLock lock;
    while (unlikely(lock = *l)) {
_spin:
        if (unlikely(lock != _OSSpinLockLocked)) {
            return _os_lock_corruption_abort((void *)l, (uintptr_t)lock);
        }
        if (unlikely(!tries--)) return _OSSpinLockLockYield(l);
        OS_LOCK_SPIN_PAUSE();
    }
    bool r = os_atomic_cmpxchgv(l, 0, _OSSpinLockLocked, &lock, acquire);
    if (likely(r)) return;
    goto _spin;
}
#endif // !OS_ATOMIC_UP

可以看到內(nèi)部有自轉(zhuǎn)邏輯,這里直接分析_OSSpinLockLockYield

5.1.1.2 _OSSpinLockLockYield

static void
_OSSpinLockLockYield(volatile OSSpinLock *l)
{
    int option = SWITCH_OPTION_DEPRESS;
    mach_msg_timeout_t timeout = 1;
    uint64_t deadline = _os_lock_yield_deadline(timeout);
    OSSpinLock lock;
    while (unlikely(lock = *l)) {
_yield:
        if (unlikely(lock != _OSSpinLockLocked)) {
            _os_lock_corruption_abort((void *)l, (uintptr_t)lock);
        }
        thread_switch(MACH_PORT_NULL, option, timeout);
        if (option == SWITCH_OPTION_WAIT) {
            timeout++;
        } else if (!_os_lock_yield_until(deadline)) {
            option = SWITCH_OPTION_WAIT;
        }
    }
    bool r = os_atomic_cmpxchgv(l, 0, _OSSpinLockLocked, &lock, acquire);
    if (likely(r)) return;
    goto _yield;
}

內(nèi)部有超時時間以及線程切換邏輯。

5.1.2 OSSpinLockUnlock

void
OSSpinLockUnlock(volatile OSSpinLock *l)
{
    OS_ATOMIC_ALIAS(spin_unlock, OSSpinLockUnlock);
    OS_ATOMIC_ALIAS(_spin_unlock, OSSpinLockUnlock);
    return _os_nospin_lock_unlock((_os_nospin_lock_t)l);
}

內(nèi)部調(diào)用了_os_nospin_lock_unlock。

5.1.2.1 _os_nospin_lock_unlock

void
_os_nospin_lock_unlock(_os_nospin_lock_t l)
{
    os_lock_owner_t self = _os_lock_owner_get_self();
    os_ulock_value_t current;
    current = os_atomic_xchg(&l->oul_value, OS_LOCK_NO_OWNER, release);
    if (likely(current == self)) return;
    return _os_nospin_lock_unlock_slow(l, current);
}

_os_nospin_lock_unlock_slow

static void
_os_nospin_lock_unlock_slow(_os_nospin_lock_t l, os_ulock_value_t current)
{
    os_lock_owner_t self = _os_lock_owner_get_self();
    if (unlikely(OS_ULOCK_OWNER(current) != self)) {
        return; // no unowned_abort for drop-in compatibility with OSSpinLock
    }
    if (current & OS_ULOCK_NOWAITERS_BIT) {
        __LIBPLATFORM_INTERNAL_CRASH__(current, "unlock_slow with no waiters");
    }
    for (;;) {
        int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, l, 0);
        if (unlikely(ret < 0)) {
            switch (-ret) {
            case EINTR:
                continue;
            case ENOENT:
                break;
            default:
                __LIBPLATFORM_INTERNAL_CRASH__(-ret, "ulock_wake failure");
            }
        }
        break;
    }
}

5.2 os_unfair_lock 源碼分析

typedef struct os_unfair_lock_s {
    uint32_t _os_unfair_lock_opaque;
} os_unfair_lock, *os_unfair_lock_t;

初始化OS_UNFAIR_LOCK_INIT直接設(shè)置了默認值0

#define OS_UNFAIR_LOCK_INIT ((os_unfair_lock){0})

5.2.1 os_unfair_lock_lock

void
os_unfair_lock_lock(os_unfair_lock_t lock)
{
    _os_unfair_lock_t l = (_os_unfair_lock_t)lock;
    os_lock_owner_t self = _os_lock_owner_get_self();
    bool r = os_atomic_cmpxchg(&l->oul_value, OS_LOCK_NO_OWNER, self, acquire);
    if (likely(r)) return;
    return _os_unfair_lock_lock_slow(l, OS_UNFAIR_LOCK_NONE, self);
}

_os_lock_owner_get_self

OS_ALWAYS_INLINE OS_CONST
static inline os_lock_owner_t
_os_lock_owner_get_self(void)
{
    os_lock_owner_t self;
    self = (os_lock_owner_t)_os_tsd_get_direct(__TSD_MACH_THREAD_SELF);
    return self;
}

_os_unfair_lock_lock_slow

static void
_os_unfair_lock_lock_slow(_os_unfair_lock_t l,
        os_unfair_lock_options_t options, os_lock_owner_t self)
{
    os_unfair_lock_options_t allow_anonymous_owner =
            options & OS_UNFAIR_LOCK_ALLOW_ANONYMOUS_OWNER;
    options &= ~OS_UNFAIR_LOCK_ALLOW_ANONYMOUS_OWNER;
    if (unlikely(options & ~OS_UNFAIR_LOCK_OPTIONS_MASK)) {
        __LIBPLATFORM_CLIENT_CRASH__(options, "Invalid options");
    }
    os_ulock_value_t current, new, waiters_mask = 0;
    while (unlikely((current = os_atomic_load(&l->oul_value, relaxed)) !=
            OS_LOCK_NO_OWNER)) {
_retry:
        if (unlikely(OS_ULOCK_IS_OWNER(current, self, allow_anonymous_owner))) {
            return _os_unfair_lock_recursive_abort(self);
        }
        new = current & ~OS_ULOCK_NOWAITERS_BIT;
        if (current != new) {
            // Clear nowaiters bit in lock value before waiting
            if (!os_atomic_cmpxchgv(&l->oul_value, current, new, &current,
                    relaxed)){
                continue;
            }
            current = new;
        }
        int ret = __ulock_wait(UL_UNFAIR_LOCK | ULF_NO_ERRNO | options,
                l, current, 0);
        if (unlikely(ret < 0)) {
            switch (-ret) {
            case EINTR:
            case EFAULT:
                continue;
            case EOWNERDEAD:
                _os_unfair_lock_corruption_abort(current);
                break;
            default:
                __LIBPLATFORM_INTERNAL_CRASH__(-ret, "ulock_wait failure");
            }
        }
        if (ret > 0) {
            // If there are more waiters, unset nowaiters bit when acquiring lock
            waiters_mask = OS_ULOCK_NOWAITERS_BIT;
        }
    }
    new = self & ~waiters_mask;
    bool r = os_atomic_cmpxchgv(&l->oul_value, OS_LOCK_NO_OWNER, new,
            &current, acquire);
    if (unlikely(!r)) goto _retry;
}

內(nèi)部是wait等待邏輯。

5.2.2 os_unfair_lock_unlock

void
os_unfair_lock_unlock(os_unfair_lock_t lock)
{
    _os_unfair_lock_t l = (_os_unfair_lock_t)lock;
    os_lock_owner_t self = _os_lock_owner_get_self();
    os_ulock_value_t current;
    current = os_atomic_xchg(&l->oul_value, OS_LOCK_NO_OWNER, release);
    if (likely(current == self)) return;
    return _os_unfair_lock_unlock_slow(l, self, current, 0);
}

內(nèi)部調(diào)用了_os_unfair_lock_unlock_slow

OS_NOINLINE
static void
_os_unfair_lock_unlock_slow(_os_unfair_lock_t l, os_lock_owner_t self,
        os_ulock_value_t current, os_unfair_lock_options_t options)
{
    os_unfair_lock_options_t allow_anonymous_owner =
            options & OS_UNFAIR_LOCK_ALLOW_ANONYMOUS_OWNER;
    options &= ~OS_UNFAIR_LOCK_ALLOW_ANONYMOUS_OWNER;
    if (unlikely(OS_ULOCK_IS_NOT_OWNER(current, self, allow_anonymous_owner))) {
        return _os_unfair_lock_unowned_abort(OS_ULOCK_OWNER(current));
    }
    if (current & OS_ULOCK_NOWAITERS_BIT) {
        __LIBPLATFORM_INTERNAL_CRASH__(current, "unlock_slow with no waiters");
    }
    for (;;) {
        int ret = __ulock_wake(UL_UNFAIR_LOCK | ULF_NO_ERRNO, l, 0);
        if (unlikely(ret < 0)) {
            switch (-ret) {
            case EINTR:
                continue;
            case ENOENT:
                break;
            default:
                __LIBPLATFORM_INTERNAL_CRASH__(-ret, "ulock_wake failure");
            }
        }
        break;
    }
}

可以看到內(nèi)部是有喚醒邏輯的。

六、讀寫鎖

讀操作可以共享,寫操作是排他的,讀可以有多個在讀,寫只有唯一個在寫,同時寫的時候不允許讀。要實現(xiàn)讀寫鎖核心邏輯是:

  • 多讀單寫
  • 寫寫互斥
  • 讀寫互斥
  • 寫不能阻塞任務執(zhí)行

有兩套方案:

  • 1.使用 柵欄函數(shù) 相關(guān)API。
  • 2.使用pthread_rwlock_t相關(guān)API

6.1 dispatch_barrier_async 實現(xiàn)多讀單寫

寫:通過柵欄函數(shù)可以實現(xiàn)寫寫互斥以及讀寫互斥,寫使用async可以保證寫邏輯不阻塞當前任務執(zhí)行。
讀:使用dispatch_sync同步效果實現(xiàn)多讀(放入并發(fā)隊列中)。

  • 首先定義一個并發(fā)隊列以及字典存儲數(shù)據(jù):
@property (nonatomic, strong) dispatch_queue_t concurrent_queue;
@property (nonatomic, strong) NSMutableDictionary *dataDic;

//初始化
self.concurrent_queue = dispatch_queue_create("rw_queue", DISPATCH_QUEUE_CONCURRENT);
self.dataDic = [NSMutableDictionary dictionary];
  • 寫入操作:
- (void)safeSetter:(NSString *)name time:(int)time {
    dispatch_barrier_async(self.concurrent_queue, ^{
        sleep(time);
        [self.dataDic setValue:name forKey:@"HotpotCat"];
        NSLog(@"write name:%@,currentThread:%@",name,[NSThread currentThread]);
    });
}

為了方便測試key值寫死,并且傳入一個time。barrier保證了寫之間互斥以及讀寫互斥。

  • 讀取操作:
- (NSString *)safeGetterWithTime:(int)time {
    __block NSString *result;
    //多條線程同時讀,阻塞的是當前線程,多條線程訪問就是多讀了。同步使用concurrent_queue是為了配合柵欄函數(shù)讀寫互斥。
    dispatch_sync(self.concurrent_queue, ^{
        sleep(time);
        result = self.dataDic[@"HotpotCat"];
    });
    NSLog(@"result:%@,currentThread:%@,time:%@",result,[NSThread currentThread],@(time));
    return result;
}

使用同步函數(shù)配合柵欄函數(shù)(柵欄函數(shù)只能針對同一隊列)實現(xiàn)讀寫互斥,當多條線程同時訪問safeGetterWithTime時就實現(xiàn)了多讀操作。

  • 寫入驗證:
//調(diào)用
[self safeSetter:@"1" time:4];
[self safeSetter:@"2" time:1];
[self safeSetter:@"3" time:2];
[self safeSetter:@"4" time:1];

輸出:

write name:1,currentThread:<NSThread: 0x281fea4c0>{number = 5, name = (null)}
write name:2,currentThread:<NSThread: 0x281fea4c0>{number = 5, name = (null)}
write name:3,currentThread:<NSThread: 0x281fea4c0>{number = 5, name = (null)}
write name:4,currentThread:<NSThread: 0x281fea4c0>{number = 5, name = (null)}

很明顯寫之間是互斥的,任務1沒有執(zhí)行完之前其它任務都在等待。

  • 讀取驗證:
for (int i = 0; i < 5; i++) {
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        NSString *result = [self safeGetterWithTime:5 - i];
        NSLog(@"result:%@",result);
    });
}

輸出:

result:4,currentThread:<NSThread: 0x281f80600>{number = 7, name = (null)},time:1
result:4,currentThread:<NSThread: 0x281fce540>{number = 8, name = (null)},time:2
result:4,currentThread:<NSThread: 0x281f80980>{number = 9, name = (null)},time:3
result:4,currentThread:<NSThread: 0x281feb540>{number = 10, name = (null)},time:4
result:4,currentThread:<NSThread: 0x281f80a80>{number = 11, name = (null)},time:5

任務并行執(zhí)行,順序是由于設(shè)置了sleep時間,如果去掉時間或者時間一致,每次執(zhí)行結(jié)果都不同了。

6.2 pthread_rwlock_t 實現(xiàn)多讀單寫

  • 定義鎖以及字典數(shù)據(jù):
{
    pthread_rwlock_t rw_lock;
    pthread_rwlockattr_t rw_lock_attr;
}
@property (nonatomic, strong) NSMutableDictionary *dataDic;

pthread_rwlockattr_t讀寫屬性有兩種:lockkindpshared
lockkind:讀寫策略,包括讀取優(yōu)先(默認屬性)、寫入優(yōu)先。蘋果系統(tǒng)里面沒有提供 pthread_rwlockattr_getkind_nppthread_rwlockattr_setkind_np 相關(guān)函數(shù)。
psharedPTHREAD_PROCESS_PRIVATE(進程內(nèi)競爭讀寫鎖,默認屬性)PTHREAD_PROCESS_SHARED(進程間競爭讀寫鎖)

  • 初始化:
self.dataDic = [NSMutableDictionary dictionary];
//初始化
pthread_rwlockattr_init(&rw_lock_attr);
pthread_rwlock_init(&rw_lock, &rw_lock_attr);
//進程內(nèi)
pthread_rwlockattr_setpshared(&rw_lock_attr, PTHREAD_PROCESS_PRIVATE);

pthread_rwlockattr_t讀寫設(shè)置為進程內(nèi)。

  • 寫入操作如下:
- (void)safeSetter:(NSString *)name {
    //寫鎖
    pthread_rwlock_wrlock(&rw_lock);
    [self.dataDic setValue:name forKey:@"HotpotCat"];
    NSLog(@"write name:%@,currentThread:%@",name,[NSThread currentThread]);
    //釋放
    pthread_rwlock_unlock(&rw_lock);
}
  • 讀取操作如下:
- (NSString *)safeGetter {
    //讀鎖
    pthread_rwlock_rdlock(&rw_lock);
    NSString *result = self.dataDic[@"HotpotCat"];
    //釋放
    pthread_rwlock_unlock(&rw_lock);
    NSLog(@"result:%@,currentThread:%@",result,[NSThread currentThread]);
    return result;
}
  • 寫入驗證:
dispatch_async(dispatch_get_global_queue(0, 0), ^{
    [self safeSetter:@"1"];
});
dispatch_async(dispatch_get_global_queue(0, 0), ^{
    [self safeSetter:@"2"];
});
dispatch_async(dispatch_get_global_queue(0, 0), ^{
    [self safeSetter:@"3"];
});
dispatch_async(dispatch_get_global_queue(0, 0), ^{
    [self safeSetter:@"4"];
});

輸出:

LockDemo[52251:5873172] write name:4,currentThread:<NSThread: 0x60000072e980>{number = 4, name = (null)}
LockDemo[52251:5873177] write name:1,currentThread:<NSThread: 0x60000075d100>{number = 6, name = (null)}
LockDemo[52251:5873170] write name:2,currentThread:<NSThread: 0x60000072f600>{number = 7, name = (null)}
LockDemo[52251:5873178] write name:3,currentThread:<NSThread: 0x60000073d480>{number = 5, name = (null)}

這里就與隊列調(diào)度有關(guān)了,順序不定,如果不加鎖大量并發(fā)調(diào)用下則會crash。

  • 讀取驗證:
for (int i = 0; i < 5; i++) {
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        NSString *result = [self safeGetter];
    });
}

輸出:

result:4,currentThread:<NSThread: 0x600001cdc200>{number = 5, name = (null)}
result:4,currentThread:<NSThread: 0x600001cd1080>{number = 7, name = (null)}
result:4,currentThread:<NSThread: 0x600001c95f40>{number = 6, name = (null)}
result:4,currentThread:<NSThread: 0x600001c91ec0>{number = 3, name = (null)}
result:4,currentThread:<NSThread: 0x600001c94d80>{number = 4, name = (null)}

輸出順序也不一定。當然混合讀寫測試也可以,用數(shù)組更容易測試。

參考鏈接:pthread_rwlock

  • 對于讀數(shù)據(jù)比修改數(shù)據(jù)頻繁的應用,用讀寫鎖代替互斥鎖可以提高效率。因為使用互斥鎖時,即使是讀出數(shù)據(jù)(相當于操作臨界區(qū)資源)都要上互斥鎖,而采用讀寫鎖,則可以在任一時刻允許多個讀出者存在,提高了更高的并發(fā)度,同時在某個寫入者修改數(shù)據(jù)期間保護該數(shù)據(jù),以免任何其它讀出者或?qū)懭胝叩母蓴_。

  • 獲取一個讀寫鎖用于讀稱為共享鎖,獲取一個讀寫鎖用于寫稱為獨占鎖,因此對于某個給定資源的共享訪問也稱為共享-獨占上鎖。

鎖對應關(guān)系
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容