一、鎖的分類
在分析其它鎖之前,需要先區(qū)分清楚鎖的區(qū)別,基本的鎖包括了二類:互斥鎖 和 自旋鎖。
1.1 自旋鎖
自旋鎖:線程反復檢查鎖變量是否可用。由于線程在這一過程中保持執(zhí)行, 因此是一種 忙等。一旦獲取了自旋鎖,線程會一直保持該鎖,直至顯式釋放自旋鎖。 自旋鎖避免了進程上下文的調(diào)度開銷,因此對于線程只會阻塞很短時間的場合是有效的。
自旋鎖 = 互斥鎖 + 忙等。OSSpinLock就是自旋鎖。
1.2 互斥鎖
互斥鎖:是一種用于多線程編程中,防止兩條線程同時對同一公共資源(比如全局變量)進行讀寫的機制。該目的通過將代碼切片成一個一個的臨界區(qū)而達成。
在Posix Thread中定義有一套專?用于線程同步的mutex函數(shù),mutex用于保證在任何時刻,都只能有一個線程訪問該對象。 當獲取鎖操作失敗時,線程會進入睡眠,等待鎖釋放時被喚醒(閑等)。
創(chuàng)建和銷毀:
-
POSIX定義了一個宏PTHREAD_MUTEX_INITIALIZER來靜態(tài)初始化互斥鎖。
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr)
-
pthread_mutex_destroy ()用于注銷一個互斥鎖。
鎖操作相關(guān)API:
int pthread_mutex_lock(pthread_mutex_t *mutex)
int pthread_mutex_unlock(pthread_mutex_t *mutex)
int pthread_mutex_trylock(pthread_mutex_t *mutex)
-
pthread_mutex_trylock()語義與pthread_mutex_lock()類似,不同的是在鎖已經(jīng)被占據(jù)時返回EBUSY而不是掛起等待。
互斥鎖 分為 遞歸鎖 和 非遞歸鎖
- 遞歸鎖:
@synchronized:多線程可遞歸。
NSRecursiveLock:不支持多線程可遞歸。
pthread_mutex_t(recursive):多線程可遞歸。 - 非遞歸鎖:
NSLock、pthread_mutex、dispatch_semaphore、os_unfair_lock。 - 條件鎖:
NSCondition、NSConditionLock。 - 信號量(
semaphore):是一種更高級的同步機制,互斥鎖可以說是
semaphore在僅取值0/1時的特例。信號量可以有更多的取值空間,用來實
現(xiàn)更加復雜的同步,而不單單是線程間互斥。dispatch_semaphore
1.2.1 讀寫鎖
讀寫鎖實際是一種特殊的互斥鎖,它把對共享資源的訪問者劃分成讀者和寫者,讀者只對共享資源進行讀訪問,寫者則需要對共享資源進行寫操作。這種鎖 相對于自旋鎖而言,能提高并發(fā)性。因為在多處理器系統(tǒng)中,它允許同時有多個讀者來訪問共享資源,最大可能的讀者數(shù)為實際的邏輯CPU 數(shù)。寫者是排他性的,一個讀寫鎖同時只能有一個寫者或多個讀者(與CPU數(shù)相關(guān)),但不能同時既有讀者又有寫者,在讀寫鎖保持期間也是搶占失效的。
如果讀寫鎖當前沒有讀者,也沒有寫者,那么寫者可以立刻獲得讀寫鎖,否則它必須自旋在那里, 直到?jīng)]有任何寫者或讀者。如果讀寫鎖沒有寫者,那么讀者可以立即獲得該讀寫鎖,否則讀者必須自旋在那里,直到寫者釋放該讀寫鎖。
一次只有一個線程可以占有寫模式的讀寫鎖,可以有多個線程同時占有讀模式的讀寫鎖。正是因為這個特性,當讀寫鎖是寫加鎖狀態(tài)時,在這個鎖被解鎖之前, 所有試圖對這個鎖加鎖的線程都會被阻塞。當讀寫鎖在讀加鎖狀態(tài)時, 所有試圖以讀模式對它進行加鎖的線程都可以得到訪問權(quán), 但是如果線程希望以寫模式對此鎖進行加鎖, 它必須直到所有的線程釋放鎖。
通常當讀寫鎖處于讀模式鎖住狀態(tài)時,如果有另外線程試圖以寫模式加鎖,讀寫鎖通常會阻塞隨后的讀模式鎖請求,這樣可以避免讀模式鎖?期占用而導致等待的寫模式鎖請求?期阻塞。
讀寫鎖適合于對數(shù)據(jù)結(jié)構(gòu)的讀次數(shù)比寫次數(shù)多得多的情況。 因為讀模式鎖定可以共享,寫模式鎖住時意味著獨占,所以讀寫鎖又叫共享-獨占鎖。
創(chuàng)建和銷毀API:
#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
成功則返回0, 出錯則返回錯誤編號。
同互斥鎖一樣, 在釋放讀寫鎖占用的內(nèi)存之前, 需要先通過pthread_rwlock_destroy對讀寫鎖進行清理工作,釋放由init分配的資源。
鎖操作相關(guān)API:
#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
成功則返回0, 出錯則返回錯誤編號。這3個函數(shù)分別實現(xiàn)獲取讀鎖,獲取寫鎖和釋放鎖的操作。獲取鎖的兩個函數(shù)是阻塞操作,同樣非阻塞的函數(shù)為:
#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
非阻塞的獲取鎖操作,如果可以獲取則返回0, 否則返回錯誤的EBUSY。
二、NSLock & NSRecursiveLock 的應用以及原理
2.1 案例一
__block NSMutableArray *array;
for (int i = 0; i < 10000; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
array = [NSMutableArray array];
});
}
對于上面的代碼運行會發(fā)生崩潰,常規(guī)處理是對它加一個鎖,如下:
__block NSMutableArray *array;
self.lock = [[NSLock alloc] init];
for (int i = 0; i < 10000; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
[self.lock lock];
array = [NSMutableArray array];
[self.lock unlock];
});
}
這樣就能解決array的創(chuàng)建問題了。
2.2 案例二
for (int i = 0; i < 10; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
static void (^testMethod)(int);
testMethod = ^(int value){
if (value > 0) {
NSLog(@"current value = %d",value);
testMethod(value - 1);
}
};
testMethod(10);
});
}
上面的例子中最終輸出會錯亂:

可以在block調(diào)用前后加解鎖解決:
for (int i = 0; i < 10; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
static void (^testMethod)(int);
testMethod = ^(int value){
if (value > 0) {
NSLog(@"current value = %d",value);
testMethod(value - 1);
}
};
[self.lock lock];
testMethod(10);
[self.lock unlock];
});
}
但是在實際開發(fā)中鎖往往是與業(yè)務代碼綁定在一起的,如下:

這個時候block在執(zhí)行前會同一時間進入多次,相當于多次加鎖了(遞歸),這樣就產(chǎn)生了死鎖。NSLog只會執(zhí)行一次。
將NSLock改為NSRecursiveLock可以解決NSLock存在的死鎖問題:
for (int i = 0; i < 10; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
static void (^testMethod)(int);
testMethod = ^(int value){
[self.recursiveLock lock];
if (value > 0) {
NSLog(@"current value = %d",value);
testMethod(value - 1);
}
[self.recursiveLock unlock];
};
testMethod(10);
});
}
但是在執(zhí)行testMethod一次(也有可能是多次)遞歸調(diào)用后沒有繼續(xù)輸出:

由于NSRecursiveLock不支持多線程可遞歸。所以改為@synchronized:
for (int i = 0; i < 10; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
static void (^testMethod)(int);
testMethod = ^(int value){
@synchronized (self) {
if (value > 0) {
NSLog(@"current value = %d",value);
testMethod(value - 1);
}
}
};
testMethod(10);
});
}
就能完美解決問題了。
NSRecursiveLock 解決了 NSLock 遞歸問題,@synchronized 解決了 NSRecursiveLock 多線程可遞歸問題問題。
2.3 原理分析
NSLock與NSRecursiveLock是定義在Foundation框架中的,Foundation框架并沒有開源。有三種方式來探索:
- 分析
Foundation動態(tài)庫的匯編代碼。 - 斷點跟蹤加鎖解鎖流程。
-
Swift Foundation源碼分析。雖然Foundation框架本身沒有,但是蘋果開源了Swift Foundation的代碼。原理是想通的。swift-corelibs-foundation
當然有興趣可以嘗試編譯可運行版本進行調(diào)試 swift-foundation 源碼編譯
在Foundation中lock與unlock是NSLocking協(xié)議提供的方法:
@protocol NSLocking
- (void)lock;
- (void)unlock;
@end
在Swift Foundation源碼中同樣有NSLocking協(xié)議:
public protocol NSLocking {
func lock()
func unlock()
}
2.3.1 NSLock 源碼分析

底層是對
pthread_mutex_init的封裝。lock和unlock同樣是對pthread_mutex_lock與pthread_mutex_unlock的封裝:
通過宏定義可以看到
Swift的跨平臺支持。
2.3.2 NSRecursiveLock 源碼分析

內(nèi)部是對
PTHREAD_MUTEX_RECURSIVE的封裝。lock與unlock同樣是對pthread_mutex_lock與pthread_mutex_unlock的封裝。
三、NSCondition 原理
NSCondition實際上作為一個 鎖 和一個 線程檢查器。鎖主要為了當檢測條件時保護數(shù)據(jù)源,執(zhí)行條件引發(fā)的任務;線程檢查器主要是根據(jù)條件決定是否繼續(xù)運行線程,即線程是否被阻塞。
-
[condition lock]:一般用于多線程同時訪問、修改同一個數(shù)據(jù)源,保證在同一 時間內(nèi)數(shù)據(jù)源只被訪問、修改一次,其他線程的命令需要在lock外等待,只到unlock后才可訪問。 -
[condition unlock]:與lock同時使用。 -
[condition wait]:讓當前線程處于等待狀態(tài)。 -
[condition signal]:CPU發(fā)信號告訴線程不用在等待,可以繼續(xù)執(zhí)行。
3.1 生產(chǎn)者-消費者 案例
- (void)testNSCondition {
//創(chuàng)建生產(chǎn)-消費者
for (int i = 0; i < 10; i++) {
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
[self test_producer];
});
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
[self test_consumer];
});
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
[self test_consumer];
});
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
[self test_producer];
});
}
}
- (void)test_producer{
[self.condition lock];
self.ticketCount = self.ticketCount + 1;
NSLog(@"生產(chǎn) + 1 剩余: %zd",self.ticketCount);
[self.condition signal]; // 信號
[self.condition unlock];
}
- (void)test_consumer{
[self.condition lock];
if (self.ticketCount == 0) {
NSLog(@"等待 剩余: %zd",self.ticketCount);
[self.condition wait];
}
//消費行為,要在等待條件判斷之后
self.ticketCount -= 1;
NSLog(@"消費 - 1 剩余: %zd ",self.ticketCount);
[self.condition unlock];
}
輸出:
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0
等待 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0
等待 剩余: 0
等待 剩余: 0
等待 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0
等待 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0
等待 剩余: 0
等待 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0
等待 剩余: 0
等待 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0
等待 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0
生產(chǎn) + 1 剩余: 1
消費 - 1 剩余: 0
生產(chǎn) + 1 剩余: 1
生產(chǎn) + 1 剩余: 2
消費 - 1 剩余: 1
消費 - 1 剩余: 0
生產(chǎn) + 1 剩余: 1
生產(chǎn) + 1 剩余: 2
生產(chǎn) + 1 剩余: 3
消費 - 1 剩余: 2
生產(chǎn) + 1 剩余: 3
消費 - 1 剩余: 2
生產(chǎn) + 1 剩余: 3
消費 - 1 剩余: 2
消費 - 1 剩余: 1
生產(chǎn) + 1 剩余: 2
生產(chǎn) + 1 剩余: 3
消費 - 1 剩余: 2
消費 - 1 剩余: 1
消費 - 1 剩余: 0
因為有condition的存在保證了消費行為是在對應的生產(chǎn)行為之后。在這個過程中會有消費等待行為,signal信號通知消費。
- 生產(chǎn)和消費的加鎖保證了各個事務的額安全。
-
wait和signal保證了事務之間的安全。
3.2 源碼分析

內(nèi)部也是對
pthread_mutex_init的包裝,多了一個pthread_cond_init。
open func lock() {
pthread_mutex_lock(mutex)
}
open func unlock() {
pthread_mutex_unlock(mutex)
}
open func wait() {
pthread_cond_wait(cond, mutex)
}
open func signal() {
pthread_cond_signal(cond)
}
open func broadcast() {
pthread_cond_broadcast(cond)
}
代碼中去掉了windows相關(guān)宏邏輯:
-
NSCondition:鎖(pthread_mutex_t) + 線程檢查器(pthread_cond_t) - 鎖(
pthread_mutex_t):lock(pthread_mutex_lock)+unlock(pthread_mutex_unlock) - 線程檢查器(
pthread_cond_t):wait(pthread_cond_wait)+signal(pthread_cond_signal)
四、NSConditionLock 使用和原理
NSConditionLock也是鎖,一旦一個線程獲得鎖,其他線程一定等待。它同樣遵循NSLocking協(xié)議,相關(guān)API:
- (void)lockWhenCondition:(NSInteger)condition;
- (void)unlockWithCondition:(NSInteger)condition;
- (BOOL)lockWhenCondition:(NSInteger)condition beforeDate:(NSDate *)limit;
-
[conditionLock lock]:表示conditionLock期待獲得鎖,如果沒有其他線程獲得鎖(不需要判斷內(nèi)部的condition) 那它能執(zhí)行此行以下代碼,如果已經(jīng)有其他線程獲得鎖(可能是條件鎖,或者無條件鎖),則等待,直至其他線程解鎖。 -
[conditionLock lockWhenCondition:A條件]:表示如果沒有其他線程獲得該鎖,但是該鎖內(nèi)部的condition不等于A條件,它依然不能獲得鎖,仍然等待。如果內(nèi)部的condition等于A條件,并且沒有其他線程獲得該鎖,則進入代碼區(qū),同時設(shè)置它獲得該鎖,其他任何線程都將等待它代碼的完成,直至它解鎖。 -
[conditionLock unlockWithCondition:A條件]: 表示釋放鎖,同時把內(nèi)部的condition設(shè)置為A條件。 -
return = [conditionLock lockWhenCondition:A條件 beforeDate:A時間]: 表示如果被鎖定(沒獲得鎖),并超過該時間則不再阻塞線程。注意:返回的值是NO,它沒有改變鎖的狀態(tài),這個函數(shù)的目的在于可以實現(xiàn)兩種狀態(tài)下的處理。 - 所謂的
condition就是整數(shù),內(nèi)部通過整數(shù)比較條件。
4.1案例
NSConditionLock *conditionLock = [[NSConditionLock alloc] initWithCondition:2];
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
[conditionLock lockWhenCondition:1];
NSLog(@"1");
[conditionLock unlockWithCondition:0];
});
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
[conditionLock lockWhenCondition:2];
sleep(1);
NSLog(@"2");
[conditionLock unlockWithCondition:1];
});
dispatch_async(dispatch_get_global_queue(0, 0), ^{
[conditionLock lock];
NSLog(@"3");
[conditionLock unlock];
});
上面的案例2一定比1先執(zhí)行,2與3之間無序。
輸出:3 2 1,如果任務2的優(yōu)先級改為High則輸出順序變?yōu)?code>2 1 3。
那么有以下疑問:
- 1.
NSConditionLock與NSCondition有關(guān)系么? - 2.
NSConditionLock初始化的時候condition是什么? - 3.
lockWhenCondition是如何控制的? - 4.
unlockWithCondition是如何控制的?
4.2 斷點調(diào)試分析邏輯
在拿不到源碼以及拿不到動態(tài)庫的情況下,斷點分析調(diào)用流程是一個比較好的方案。
分別在測試代碼中打下以下斷點:

運行工程到達斷點后下符號斷點-[NSConditionLock initWithCondition:]過掉斷點:

這個時候就進入了
initWithCondition的匯編實現(xiàn)。在匯編中對所有的b(跳轉(zhuǎn)指令)下斷點配合寄存器的值跟蹤流程。
-
-[NSConditionLock initWithCondition:]:
image.png
可以通過lldb讀取寄存器的值,也可以查看全部寄存器中對應的值。
過掉斷點繼續(xù):

執(zhí)行了:
- [xxx init]。繼續(xù)下一步:
調(diào)用了
-[NSConditionLock init],繼續(xù)下一步:
調(diào)用了
-[NSConditionLock zone],繼續(xù)下異步:
調(diào)用了
+[NSCondition allocWithZone:],繼續(xù)下一步:
調(diào)用了
-[NSCondition init],繼續(xù):
最終返回了創(chuàng)建的
NSConditionLock對象,它持有NSCondition對象以及初始化傳的condition參數(shù)2。-[NSConditionLock initWithCondition:]流程:
-[NSConditionLock initWithCondition:]
-[xxx init]
-[NSConditionLock init]
-[NSConditionLock zone]
+[NSCondition allocWithZone:]
-[NSCondition init]
-
-[NSConditionLock lockWhenCondition:]:
同樣添加-[NSConditionLock lockWhenCondition:]符號斷點:
image.png
調(diào)用了[NSDate distantFuture],繼續(xù):
image.png
調(diào)用了-[NSConditionLock lockWhenCondition:beforeDate:],對它增加一個符號斷點:
image.png
調(diào)用了-[NSCondition lock],繼續(xù)跟蹤:

調(diào)用了
-[NSCondition waitUntilDate:]
在調(diào)試的過程中會跳轉(zhuǎn)另外一個線程的
-[NSConditionLock lockWhenCondition:],流程與上面的一致,繼續(xù)調(diào)用會調(diào)用了-[NSCondition unlock]:
這個時候直接返回
1(true),接著對-[NSConditionLock unlockWithCondition:]下符號斷點:
調(diào)用了
-[NSCondition lock]:
調(diào)用了
-[NSCondition broadcast]:
調(diào)用了
-[NSCondition unlock],這個時候繼續(xù)過斷點就又會回到線程4,調(diào)用邏輯和線程3相同。
完整調(diào)用邏輯如下:
線程4
-[NSConditionLock lockWhenCondition:]
+[NSDate distantFuture]
-[NSConditionLock lockWhenCondition:beforeDate:]
-[NSCondition lock]
-[NSCondition waitUntilDate:]
線程3
-[NSConditionLock lockWhenCondition:]
+[NSDate distantFuture]
-[NSConditionLock lockWhenCondition:beforeDate:]
-[NSCondition lock]
-[NSCondition unlock]
返回1(true)
-[NSConditionLock unlockWithCondition:]
-[NSCondition lock]
-[NSCondition broadcast]
-[NSCondition unlock]
//回到線程4
-[NSCondition unlock]
返回1(true)
-[NSConditionLock unlockWithCondition:]
-[NSCondition lock]
-[NSCondition broadcast]
-[NSCondition unlock]
流程總結(jié):
- 線程
4調(diào)用[NSConditionLock lockWhenCondition:],此時因為不滿足當前條件,所
以會進入waiting狀態(tài),當前進入到waiting時,會釋放當前的互斥鎖。 - 此時當前的線程
2調(diào)用[NSConditionLock lock:],本質(zhì)上是調(diào)用[NSConditionLock lockBeforeDate:]這里不需要比對條件值,所以任務3會執(zhí)行。 - 接下來線程
3執(zhí)行[NSConditionLock lockWhenCondition:],因為滿足條件值,所以線任務2會執(zhí)行,執(zhí)行完成后會調(diào)用[NSConditionLock unlockWithCondition:],這個時候?qū)?br>condition設(shè)置為1,并發(fā)送boradcast, 此時線程4接收到當前的信號,喚醒執(zhí)行并打印。 - 這個時候任務執(zhí)行順序為
任務3 -> 任務2 -> 任務1。 -
[NSConditionLock lockWhenCondition:]會根據(jù)傳入的condition進
行對比,如果不相等,這里就會阻塞,進入線程池,否則的話就繼續(xù)代碼執(zhí)行。 -
[NSConditionLock unlockWithCondition:]會先更改當前的condition值,然后進行廣播,喚醒當前的線程。
4.3 源碼分析
-
initWithCondition:
image.png
保存了condition參數(shù)以及NSCondition的創(chuàng)建。 lockWhenCondition:
open func lock(whenCondition condition: Int) {
let _ = lock(whenCondition: condition, before: Date.distantFuture)
}
內(nèi)部調(diào)用了lockWhenCondition: before:,默認值傳的Date.distantFuture:
open func lock(whenCondition condition: Int, before limit: Date) -> Bool {
_cond.lock()
while _thread != nil || _value != condition {
if !_cond.wait(until: limit) {
_cond.unlock()
return false
}
}
_thread = pthread_self()
_cond.unlock()
return true
}
NSCondition加鎖判斷condition條件是否滿足,不滿足調(diào)用NSCondition的wait waitUntilDate方法進入等待,超時后解鎖返回false。滿足的情況下賦值_thread解鎖返回true。
-
unlockWithCondition:
open func unlock(withCondition condition: Int) {
_cond.lock()
_thread = nil
_value = condition
_cond.broadcast()
_cond.unlock()
}
加鎖后釋放_thread,更新condition,調(diào)用broadcast后解鎖。
lock
open func lock() {
let _ = lock(before: Date.distantFuture)
}
open func lock(before limit: Date) -> Bool {
_cond.lock()
while _thread != nil {
if !_cond.wait(until: limit) {
_cond.unlock()
return false
}
}
_thread = pthread_self()
_cond.unlock()
return true
}
判斷是否有其它任務阻塞,沒有阻塞直接創(chuàng)建_thread返回true。
unlock
open func unlock() {
_cond.lock()
_thread = nil
_cond.broadcast()
_cond.unlock()
}
廣播并且釋放_thread。
4.4 反匯編分析
-
initWithCondition:
image.png lockWhenCondition:
-(int)lockWhenCondition:(int)arg2 {
r0 = [arg0 lockWhenCondition:arg2 beforeDate:[NSDate distantFuture]];
return r0;
}
調(diào)用自己的lockWhenCondition: beforeDate ::

-
unlockWithCondition:
-(int)unlockWithCondition:(int)arg2 {
r0 = object_getIndexedIvars(arg0);
[*r0 lock];
*(int128_t *)(r0 + 0x8) = 0x0;
*(int128_t *)(r0 + 0x10) = arg2;
[*r0 broadcast];
r0 = *r0;
r0 = [r0 unlock];
return r0;
}
-
lock:
int -[NSConditionLock lock](int arg0) {
r0 = [arg0 lockBeforeDate:[NSDate distantFuture]];
return r0;
}
lockBeforeDate:

-
unlock:
int -[NSConditionLock unlock]() {
r0 = object_getIndexedIvars(r0);
[*r0 lock];
*(r0 + 0x8) = 0x0;
[*r0 broadcast];
r0 = *r0;
r0 = [r0 unlock];
return r0;
}
匯編、源碼以及斷點調(diào)試邏輯相同。
NSConditionLock 內(nèi)部封裝了NSCondition。
五、OSSpinLock & os_unfair_lock

OSSpinLock的API注釋以及它自己的命名說明了這是一把自旋鎖,自iOS10之后被os_unfair_lock替代。
-
os_unfair_lock必須以OS_UNFAIR_LOCK_INIT初始化。 - 它是用來代替
OSSpinLock的。 - 它不是自旋鎖(忙等),是被內(nèi)核喚醒的(閑等)。


可以看到這兩個鎖都是定義在
libsystem_platform.dylib中的??梢栽?code>openSource中找到他們libplatform的源碼libplatform,實現(xiàn)是在/src/os目錄下的lock.c文件中。
5.1 OSSpinLock 源碼分析
OSSpinLock的使用一般會用到以下API:
OSSpinLock hp_spinlock = OS_SPINLOCK_INIT;
OSSpinLockLock(&hp_spinlock);
OSSpinLockUnlock(&hp_spinlock);
-
OSSpinLock:
typedef int32_t OSSpinLock OSSPINLOCK_DEPRECATED_REPLACE_WITH(os_unfair_lock);
#define OS_SPINLOCK_INIT 0
OSSpinLock本身是一個int32_t類型的值,初始化默認值為0。
5.1.1 OSSpinLockLock
void
OSSpinLockLock(volatile OSSpinLock *l)
{
OS_ATOMIC_ALIAS(spin_lock, OSSpinLockLock);
OS_ATOMIC_ALIAS(_spin_lock, OSSpinLockLock);
bool r = os_atomic_cmpxchg(l, 0, _OSSpinLockLocked, acquire);
if (likely(r)) return;
return _OSSpinLockLockSlow(l);
}
#if TARGET_OS_IPHONE && !TARGET_OS_SIMULATOR
static const OSSpinLock _OSSpinLockLocked = 1;
#else
static const OSSpinLock _OSSpinLockLocked = -1;
#endif
OS_ATOMIC_ALIAS定義如下:
#undef OS_ATOMIC_ALIAS
#define OS_ATOMIC_ALIAS(n, o)
static void _OSSpinLockLock(volatile OSSpinLock *l);
這里相當于分了兩條路徑,通過_OSSpinLockLocked標記是否被鎖定。在源碼中并沒有找到_OSSpinLockLock函數(shù)的實現(xiàn)。
5.1.1.1 _OSSpinLockLockSlow
#if OS_ATOMIC_UP
void
_OSSpinLockLockSlow(volatile OSSpinLock *l)
{
return _OSSpinLockLockYield(l); // Don't spin on UP
}
#elif defined(__arm64__)
// Exclusive monitor must be held during WFE <rdar://problem/22300054>
#if defined(__ARM_ARCH_8_2__)
void
_OSSpinLockLockSlow(volatile OSSpinLock *l)
{
uint32_t tries = OS_LOCK_SPIN_SPIN_TRIES;
OSSpinLock lock;
_spin:
while (unlikely(lock = os_atomic_load_exclusive(l, relaxed))) {
if (unlikely(lock != _OSSpinLockLocked)) {
os_atomic_clear_exclusive();
return _os_lock_corruption_abort((void *)l, (uintptr_t)lock);
}
if (unlikely(!tries--)) {
os_atomic_clear_exclusive();
return _OSSpinLockLockYield(l);
}
OS_LOCK_SPIN_PAUSE();
}
os_atomic_clear_exclusive();
bool r = os_atomic_cmpxchg(l, 0, _OSSpinLockLocked, acquire);
if (likely(r)) return;
goto _spin;
}
#else // !__ARM_ARCH_8_2__
void
_OSSpinLockLockSlow(volatile OSSpinLock *l)
{
uint32_t tries = OS_LOCK_SPIN_SPIN_TRIES;
OSSpinLock lock;
os_atomic_rmw_loop(l, lock, _OSSpinLockLocked, acquire, if (unlikely(lock)){
if (unlikely(lock != _OSSpinLockLocked)) {
os_atomic_rmw_loop_give_up(return
_os_lock_corruption_abort((void *)l, (uintptr_t)lock));
}
if (unlikely(!tries--)) {
os_atomic_rmw_loop_give_up(return _OSSpinLockLockYield(l));
}
OS_LOCK_SPIN_PAUSE();
continue;
});
}
#endif // !__ARM_ARCH_8_2__
#else // !OS_ATOMIC_UP
void
_OSSpinLockLockSlow(volatile OSSpinLock *l)
{
uint32_t tries = OS_LOCK_SPIN_SPIN_TRIES;
OSSpinLock lock;
while (unlikely(lock = *l)) {
_spin:
if (unlikely(lock != _OSSpinLockLocked)) {
return _os_lock_corruption_abort((void *)l, (uintptr_t)lock);
}
if (unlikely(!tries--)) return _OSSpinLockLockYield(l);
OS_LOCK_SPIN_PAUSE();
}
bool r = os_atomic_cmpxchgv(l, 0, _OSSpinLockLocked, &lock, acquire);
if (likely(r)) return;
goto _spin;
}
#endif // !OS_ATOMIC_UP
可以看到內(nèi)部有自轉(zhuǎn)邏輯,這里直接分析_OSSpinLockLockYield
5.1.1.2 _OSSpinLockLockYield
static void
_OSSpinLockLockYield(volatile OSSpinLock *l)
{
int option = SWITCH_OPTION_DEPRESS;
mach_msg_timeout_t timeout = 1;
uint64_t deadline = _os_lock_yield_deadline(timeout);
OSSpinLock lock;
while (unlikely(lock = *l)) {
_yield:
if (unlikely(lock != _OSSpinLockLocked)) {
_os_lock_corruption_abort((void *)l, (uintptr_t)lock);
}
thread_switch(MACH_PORT_NULL, option, timeout);
if (option == SWITCH_OPTION_WAIT) {
timeout++;
} else if (!_os_lock_yield_until(deadline)) {
option = SWITCH_OPTION_WAIT;
}
}
bool r = os_atomic_cmpxchgv(l, 0, _OSSpinLockLocked, &lock, acquire);
if (likely(r)) return;
goto _yield;
}
內(nèi)部有超時時間以及線程切換邏輯。
5.1.2 OSSpinLockUnlock
void
OSSpinLockUnlock(volatile OSSpinLock *l)
{
OS_ATOMIC_ALIAS(spin_unlock, OSSpinLockUnlock);
OS_ATOMIC_ALIAS(_spin_unlock, OSSpinLockUnlock);
return _os_nospin_lock_unlock((_os_nospin_lock_t)l);
}
內(nèi)部調(diào)用了_os_nospin_lock_unlock。
5.1.2.1 _os_nospin_lock_unlock
void
_os_nospin_lock_unlock(_os_nospin_lock_t l)
{
os_lock_owner_t self = _os_lock_owner_get_self();
os_ulock_value_t current;
current = os_atomic_xchg(&l->oul_value, OS_LOCK_NO_OWNER, release);
if (likely(current == self)) return;
return _os_nospin_lock_unlock_slow(l, current);
}
_os_nospin_lock_unlock_slow:
static void
_os_nospin_lock_unlock_slow(_os_nospin_lock_t l, os_ulock_value_t current)
{
os_lock_owner_t self = _os_lock_owner_get_self();
if (unlikely(OS_ULOCK_OWNER(current) != self)) {
return; // no unowned_abort for drop-in compatibility with OSSpinLock
}
if (current & OS_ULOCK_NOWAITERS_BIT) {
__LIBPLATFORM_INTERNAL_CRASH__(current, "unlock_slow with no waiters");
}
for (;;) {
int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, l, 0);
if (unlikely(ret < 0)) {
switch (-ret) {
case EINTR:
continue;
case ENOENT:
break;
default:
__LIBPLATFORM_INTERNAL_CRASH__(-ret, "ulock_wake failure");
}
}
break;
}
}
5.2 os_unfair_lock 源碼分析
typedef struct os_unfair_lock_s {
uint32_t _os_unfair_lock_opaque;
} os_unfair_lock, *os_unfair_lock_t;
初始化OS_UNFAIR_LOCK_INIT直接設(shè)置了默認值0:
#define OS_UNFAIR_LOCK_INIT ((os_unfair_lock){0})
5.2.1 os_unfair_lock_lock
void
os_unfair_lock_lock(os_unfair_lock_t lock)
{
_os_unfair_lock_t l = (_os_unfair_lock_t)lock;
os_lock_owner_t self = _os_lock_owner_get_self();
bool r = os_atomic_cmpxchg(&l->oul_value, OS_LOCK_NO_OWNER, self, acquire);
if (likely(r)) return;
return _os_unfair_lock_lock_slow(l, OS_UNFAIR_LOCK_NONE, self);
}
_os_lock_owner_get_self:
OS_ALWAYS_INLINE OS_CONST
static inline os_lock_owner_t
_os_lock_owner_get_self(void)
{
os_lock_owner_t self;
self = (os_lock_owner_t)_os_tsd_get_direct(__TSD_MACH_THREAD_SELF);
return self;
}
_os_unfair_lock_lock_slow:
static void
_os_unfair_lock_lock_slow(_os_unfair_lock_t l,
os_unfair_lock_options_t options, os_lock_owner_t self)
{
os_unfair_lock_options_t allow_anonymous_owner =
options & OS_UNFAIR_LOCK_ALLOW_ANONYMOUS_OWNER;
options &= ~OS_UNFAIR_LOCK_ALLOW_ANONYMOUS_OWNER;
if (unlikely(options & ~OS_UNFAIR_LOCK_OPTIONS_MASK)) {
__LIBPLATFORM_CLIENT_CRASH__(options, "Invalid options");
}
os_ulock_value_t current, new, waiters_mask = 0;
while (unlikely((current = os_atomic_load(&l->oul_value, relaxed)) !=
OS_LOCK_NO_OWNER)) {
_retry:
if (unlikely(OS_ULOCK_IS_OWNER(current, self, allow_anonymous_owner))) {
return _os_unfair_lock_recursive_abort(self);
}
new = current & ~OS_ULOCK_NOWAITERS_BIT;
if (current != new) {
// Clear nowaiters bit in lock value before waiting
if (!os_atomic_cmpxchgv(&l->oul_value, current, new, ¤t,
relaxed)){
continue;
}
current = new;
}
int ret = __ulock_wait(UL_UNFAIR_LOCK | ULF_NO_ERRNO | options,
l, current, 0);
if (unlikely(ret < 0)) {
switch (-ret) {
case EINTR:
case EFAULT:
continue;
case EOWNERDEAD:
_os_unfair_lock_corruption_abort(current);
break;
default:
__LIBPLATFORM_INTERNAL_CRASH__(-ret, "ulock_wait failure");
}
}
if (ret > 0) {
// If there are more waiters, unset nowaiters bit when acquiring lock
waiters_mask = OS_ULOCK_NOWAITERS_BIT;
}
}
new = self & ~waiters_mask;
bool r = os_atomic_cmpxchgv(&l->oul_value, OS_LOCK_NO_OWNER, new,
¤t, acquire);
if (unlikely(!r)) goto _retry;
}
內(nèi)部是wait等待邏輯。
5.2.2 os_unfair_lock_unlock
void
os_unfair_lock_unlock(os_unfair_lock_t lock)
{
_os_unfair_lock_t l = (_os_unfair_lock_t)lock;
os_lock_owner_t self = _os_lock_owner_get_self();
os_ulock_value_t current;
current = os_atomic_xchg(&l->oul_value, OS_LOCK_NO_OWNER, release);
if (likely(current == self)) return;
return _os_unfair_lock_unlock_slow(l, self, current, 0);
}
內(nèi)部調(diào)用了_os_unfair_lock_unlock_slow:
OS_NOINLINE
static void
_os_unfair_lock_unlock_slow(_os_unfair_lock_t l, os_lock_owner_t self,
os_ulock_value_t current, os_unfair_lock_options_t options)
{
os_unfair_lock_options_t allow_anonymous_owner =
options & OS_UNFAIR_LOCK_ALLOW_ANONYMOUS_OWNER;
options &= ~OS_UNFAIR_LOCK_ALLOW_ANONYMOUS_OWNER;
if (unlikely(OS_ULOCK_IS_NOT_OWNER(current, self, allow_anonymous_owner))) {
return _os_unfair_lock_unowned_abort(OS_ULOCK_OWNER(current));
}
if (current & OS_ULOCK_NOWAITERS_BIT) {
__LIBPLATFORM_INTERNAL_CRASH__(current, "unlock_slow with no waiters");
}
for (;;) {
int ret = __ulock_wake(UL_UNFAIR_LOCK | ULF_NO_ERRNO, l, 0);
if (unlikely(ret < 0)) {
switch (-ret) {
case EINTR:
continue;
case ENOENT:
break;
default:
__LIBPLATFORM_INTERNAL_CRASH__(-ret, "ulock_wake failure");
}
}
break;
}
}
可以看到內(nèi)部是有喚醒邏輯的。
六、讀寫鎖
讀操作可以共享,寫操作是排他的,讀可以有多個在讀,寫只有唯一個在寫,同時寫的時候不允許讀。要實現(xiàn)讀寫鎖核心邏輯是:
- 多讀單寫
- 寫寫互斥
- 讀寫互斥
- 寫不能阻塞任務執(zhí)行
有兩套方案:
- 1.使用 柵欄函數(shù) 相關(guān)
API。 - 2.使用
pthread_rwlock_t相關(guān)API。
6.1 dispatch_barrier_async 實現(xiàn)多讀單寫
寫:通過柵欄函數(shù)可以實現(xiàn)寫寫互斥以及讀寫互斥,寫使用async可以保證寫邏輯不阻塞當前任務執(zhí)行。
讀:使用dispatch_sync同步效果實現(xiàn)多讀(放入并發(fā)隊列中)。
- 首先定義一個并發(fā)隊列以及字典存儲數(shù)據(jù):
@property (nonatomic, strong) dispatch_queue_t concurrent_queue;
@property (nonatomic, strong) NSMutableDictionary *dataDic;
//初始化
self.concurrent_queue = dispatch_queue_create("rw_queue", DISPATCH_QUEUE_CONCURRENT);
self.dataDic = [NSMutableDictionary dictionary];
- 寫入操作:
- (void)safeSetter:(NSString *)name time:(int)time {
dispatch_barrier_async(self.concurrent_queue, ^{
sleep(time);
[self.dataDic setValue:name forKey:@"HotpotCat"];
NSLog(@"write name:%@,currentThread:%@",name,[NSThread currentThread]);
});
}
為了方便測試key值寫死,并且傳入一個time。barrier保證了寫之間互斥以及讀寫互斥。
- 讀取操作:
- (NSString *)safeGetterWithTime:(int)time {
__block NSString *result;
//多條線程同時讀,阻塞的是當前線程,多條線程訪問就是多讀了。同步使用concurrent_queue是為了配合柵欄函數(shù)讀寫互斥。
dispatch_sync(self.concurrent_queue, ^{
sleep(time);
result = self.dataDic[@"HotpotCat"];
});
NSLog(@"result:%@,currentThread:%@,time:%@",result,[NSThread currentThread],@(time));
return result;
}
使用同步函數(shù)配合柵欄函數(shù)(柵欄函數(shù)只能針對同一隊列)實現(xiàn)讀寫互斥,當多條線程同時訪問safeGetterWithTime時就實現(xiàn)了多讀操作。
- 寫入驗證:
//調(diào)用
[self safeSetter:@"1" time:4];
[self safeSetter:@"2" time:1];
[self safeSetter:@"3" time:2];
[self safeSetter:@"4" time:1];
輸出:
write name:1,currentThread:<NSThread: 0x281fea4c0>{number = 5, name = (null)}
write name:2,currentThread:<NSThread: 0x281fea4c0>{number = 5, name = (null)}
write name:3,currentThread:<NSThread: 0x281fea4c0>{number = 5, name = (null)}
write name:4,currentThread:<NSThread: 0x281fea4c0>{number = 5, name = (null)}
很明顯寫之間是互斥的,任務1沒有執(zhí)行完之前其它任務都在等待。
- 讀取驗證:
for (int i = 0; i < 5; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
NSString *result = [self safeGetterWithTime:5 - i];
NSLog(@"result:%@",result);
});
}
輸出:
result:4,currentThread:<NSThread: 0x281f80600>{number = 7, name = (null)},time:1
result:4,currentThread:<NSThread: 0x281fce540>{number = 8, name = (null)},time:2
result:4,currentThread:<NSThread: 0x281f80980>{number = 9, name = (null)},time:3
result:4,currentThread:<NSThread: 0x281feb540>{number = 10, name = (null)},time:4
result:4,currentThread:<NSThread: 0x281f80a80>{number = 11, name = (null)},time:5
任務并行執(zhí)行,順序是由于設(shè)置了sleep時間,如果去掉時間或者時間一致,每次執(zhí)行結(jié)果都不同了。
6.2 pthread_rwlock_t 實現(xiàn)多讀單寫
- 定義鎖以及字典數(shù)據(jù):
{
pthread_rwlock_t rw_lock;
pthread_rwlockattr_t rw_lock_attr;
}
@property (nonatomic, strong) NSMutableDictionary *dataDic;
pthread_rwlockattr_t讀寫屬性有兩種:lockkind與pshared。
lockkind:讀寫策略,包括讀取優(yōu)先(默認屬性)、寫入優(yōu)先。蘋果系統(tǒng)里面沒有提供 pthread_rwlockattr_getkind_np 與 pthread_rwlockattr_setkind_np 相關(guān)函數(shù)。
pshared:PTHREAD_PROCESS_PRIVATE(進程內(nèi)競爭讀寫鎖,默認屬性)PTHREAD_PROCESS_SHARED(進程間競爭讀寫鎖)
- 初始化:
self.dataDic = [NSMutableDictionary dictionary];
//初始化
pthread_rwlockattr_init(&rw_lock_attr);
pthread_rwlock_init(&rw_lock, &rw_lock_attr);
//進程內(nèi)
pthread_rwlockattr_setpshared(&rw_lock_attr, PTHREAD_PROCESS_PRIVATE);
pthread_rwlockattr_t讀寫設(shè)置為進程內(nèi)。
- 寫入操作如下:
- (void)safeSetter:(NSString *)name {
//寫鎖
pthread_rwlock_wrlock(&rw_lock);
[self.dataDic setValue:name forKey:@"HotpotCat"];
NSLog(@"write name:%@,currentThread:%@",name,[NSThread currentThread]);
//釋放
pthread_rwlock_unlock(&rw_lock);
}
- 讀取操作如下:
- (NSString *)safeGetter {
//讀鎖
pthread_rwlock_rdlock(&rw_lock);
NSString *result = self.dataDic[@"HotpotCat"];
//釋放
pthread_rwlock_unlock(&rw_lock);
NSLog(@"result:%@,currentThread:%@",result,[NSThread currentThread]);
return result;
}
- 寫入驗證:
dispatch_async(dispatch_get_global_queue(0, 0), ^{
[self safeSetter:@"1"];
});
dispatch_async(dispatch_get_global_queue(0, 0), ^{
[self safeSetter:@"2"];
});
dispatch_async(dispatch_get_global_queue(0, 0), ^{
[self safeSetter:@"3"];
});
dispatch_async(dispatch_get_global_queue(0, 0), ^{
[self safeSetter:@"4"];
});
輸出:
LockDemo[52251:5873172] write name:4,currentThread:<NSThread: 0x60000072e980>{number = 4, name = (null)}
LockDemo[52251:5873177] write name:1,currentThread:<NSThread: 0x60000075d100>{number = 6, name = (null)}
LockDemo[52251:5873170] write name:2,currentThread:<NSThread: 0x60000072f600>{number = 7, name = (null)}
LockDemo[52251:5873178] write name:3,currentThread:<NSThread: 0x60000073d480>{number = 5, name = (null)}
這里就與隊列調(diào)度有關(guān)了,順序不定,如果不加鎖大量并發(fā)調(diào)用下則會crash。
- 讀取驗證:
for (int i = 0; i < 5; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
NSString *result = [self safeGetter];
});
}
輸出:
result:4,currentThread:<NSThread: 0x600001cdc200>{number = 5, name = (null)}
result:4,currentThread:<NSThread: 0x600001cd1080>{number = 7, name = (null)}
result:4,currentThread:<NSThread: 0x600001c95f40>{number = 6, name = (null)}
result:4,currentThread:<NSThread: 0x600001c91ec0>{number = 3, name = (null)}
result:4,currentThread:<NSThread: 0x600001c94d80>{number = 4, name = (null)}
輸出順序也不一定。當然混合讀寫測試也可以,用數(shù)組更容易測試。
參考鏈接:pthread_rwlock
對于讀數(shù)據(jù)比修改數(shù)據(jù)頻繁的應用,用讀寫鎖代替互斥鎖可以提高效率。因為使用互斥鎖時,即使是讀出數(shù)據(jù)(相當于操作臨界區(qū)資源)都要上互斥鎖,而采用讀寫鎖,則可以在任一時刻允許多個讀出者存在,提高了更高的并發(fā)度,同時在某個寫入者修改數(shù)據(jù)期間保護該數(shù)據(jù),以免任何其它讀出者或?qū)懭胝叩母蓴_。
獲取一個讀寫鎖用于讀稱為共享鎖,獲取一個讀寫鎖用于寫稱為獨占鎖,因此對于某個給定資源的共享訪問也稱為共享-獨占上鎖。






