細(xì)說(shuō)@synchronized和dispatch_once

工欲善其事,必先利其器

通常我們?cè)趯?shí)現(xiàn)單例時(shí)候都會(huì)使用synchronized或者dispatch_once方法,初始化往往是下面的樣子:
使用synchronized方法實(shí)現(xiàn):

static id obj = nil;
+(instancetype)shareInstance
{
    @synchronized(self) {
        if (!obj) {
            obj = [[SingletonObj alloc] init];
        }
    }
    return obj;
}

使用dispatch_once方法實(shí)現(xiàn):

static id obj = nil;
+(instancetype)shareInstance
{
    static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
        obj = [[SingletonObj alloc] init];
    });
    return obj;
}
性能差異

上面的這些寫(xiě)法大家應(yīng)該都很熟悉,既然兩種方式都能實(shí)現(xiàn),我們來(lái)看看兩者的性能差異,這里簡(jiǎn)單寫(xiě)了個(gè)測(cè)試的demo,使用兩個(gè)方法分單線程跟多線程(采用dispatch_apply方式,性能相對(duì)較高)去訪問(wèn)一個(gè)單例對(duì)象一百萬(wàn)次,對(duì)比這期間的耗時(shí),從iPod跟5s測(cè)試得到如下的結(jié)果

    //ipod,主線程
    SingletonTest[4285:446820] synchronized time cost:2.202945s
    SingletonTest[4285:446820] dispatch_once time cost:0.761034s
    
    //5s,主線程
    SingletonTest[5372:2394430] synchronized time cost:0.466293s
    SingletonTest[5372:2394430] dispatch_once time cost:0.070822s

    //ipod,多線程
    SingletonTest[4315:448499] synchronized time cost:3.385109s
    SingletonTest[4315:448499] dispatch_once time cost:0.908009s
    
    //5s,多線程
    SingletonTest[5391:2399069] synchronized time cost:0.507504s
    SingletonTest[5391:2399069] dispatch_once time cost:0.169934s

可以發(fā)現(xiàn)dispatch_once方法的性能要明顯優(yōu)于synchronized方法(多線程不采用dispathc_apply方式差距更明顯),所以在實(shí)際的應(yīng)用中我們可以多采用dispatch_once方式來(lái)實(shí)現(xiàn)單例。通常使用的時(shí)候了解這些就夠了,不過(guò)想知道兩者的具體差異就需要我們?cè)龠~進(jìn)一步。

深入@synchronized(object)

翻看蘋(píng)果的文檔可以發(fā)現(xiàn) @synchronized指令內(nèi)部使用鎖來(lái)實(shí)現(xiàn)多線程的安全訪問(wèn),并且隱式添加了一個(gè)異常處理的handler,當(dāng)異常發(fā)生時(shí)會(huì)自動(dòng)釋放鎖。在stackoverflow上看到@synchronized指令其實(shí)可以轉(zhuǎn)換成objc_sync_enter跟objc_sync_exit,可以在<objc/objc-sync.h>頭文件中找到這兩個(gè)函數(shù):

//Allocates recursive pthread_mutex associated with 'obj' if needed
int objc_sync_enter(id obj)

//End synchronizing on 'obj'
int objc_sync_exit(id obj)

根據(jù)注釋文檔,objc_sync_enter會(huì)根據(jù)需要給每個(gè)傳進(jìn)來(lái)的對(duì)象創(chuàng)建一個(gè)互斥鎖并lock,然后objc_sync_exit的時(shí)候unlock,這樣就可以通過(guò)這個(gè)鎖來(lái)實(shí)現(xiàn)多線程的安全訪問(wèn),所以結(jié)合蘋(píng)果文檔可以認(rèn)為

@synchronized(self) {
    //thread safe code
}

等價(jià)于

@try {
    objc_sync_enter(self);
    // thread safe code
} @finally {
    objc_sync_exit(self);    
}

慶幸的是蘋(píng)果已經(jīng)將objc-runtime這部分開(kāi)源,所以我們可以更進(jìn)一步了解內(nèi)部的實(shí)現(xiàn),源碼在這里,有興趣也可以自己去查閱,這里簡(jiǎn)單介紹一下。
讓我們先來(lái)看看幾個(gè)數(shù)據(jù)結(jié)構(gòu),其中有些涉及到緩存,我們就不去考慮了:

typedef struct SyncData {
    struct SyncData* nextData;
    DisguisedPtr<objc_object> object;
    int32_t threadCount;  // number of THREADS using this block
    recursive_mutex_t mutex;
} SyncData;

struct SyncList {
    SyncData *data;
    spinlock_t lock;
};

#define LOCK_FOR_OBJ(obj) sDataLists[obj].lock
#define LIST_FOR_OBJ(obj) sDataLists[obj].data
static StripedMap<SyncList> sDataLists;

首先看看SyncData這個(gè)數(shù)據(jù)結(jié)構(gòu),包含一個(gè)指向object的指針,這個(gè)object對(duì)象就是我們@synchronized時(shí)傳進(jìn)來(lái)的對(duì)象,也包含一個(gè)跟object關(guān)聯(lián)的遞歸互斥鎖recursive_mutex_t,該鎖用來(lái)互斥訪問(wèn)object對(duì)象;同時(shí)還包含一個(gè)指向下一個(gè)SyncData的指針nextData,可以看出SyncData是一個(gè)鏈表中的節(jié)點(diǎn);至于threadCount,這個(gè)值標(biāo)示有幾個(gè)線程正在訪問(wèn)這個(gè)對(duì)象,當(dāng)threadCount==0的時(shí)候,會(huì)重用該SyncData對(duì)象,這是為了節(jié)省內(nèi)存。
??接下來(lái)看看SyncList,SyncList其實(shí)就是一個(gè)鏈表,data指向第一個(gè)SyncData節(jié)點(diǎn),lock則是為了多線程安全訪問(wèn)該鏈表。
??最后看下sDataLists靜態(tài)哈希表對(duì)象,它以obj的指針為key,對(duì)應(yīng)的value為SyncList鏈表。
??了解上面之后,我們就可以看看objc_sync_enter跟objc_sync_exit的具體實(shí)現(xiàn)(摘取部分代碼)


//根據(jù)object對(duì)象去查詢(xún)相應(yīng)的SyncData對(duì)象,如果沒(méi)有則創(chuàng)建一個(gè)新的
static SyncData* id2data(id object, enum usage why)
{
    spinlock_t *lockp = &LOCK_FOR_OBJ(object);
    SyncData **listp = &LIST_FOR_OBJ(object);
    SyncData* result = NULL;
    
    //lock,多線程安全訪問(wèn)SyncList
    lockp->lock();
    {
        SyncData* p;
        SyncData* firstUnused = NULL;
        for (p = *listp; p != NULL; p = p->nextData) {
            //找到object對(duì)象對(duì)應(yīng)的SyncData對(duì)象,增加其threadCount計(jì)數(shù),然后返回
            if ( p->object == object ) {
                result = p;
                OSAtomicIncrement32Barrier(&result->threadCount);
                goto done;
            }
            //當(dāng)threadCount == 0時(shí),設(shè)置當(dāng)前SyncData為可重用
            if ( (firstUnused == NULL) && (p->threadCount == 0) )
                firstUnused = p;
        }
        // 如果有可重用的節(jié)點(diǎn),則使用當(dāng)前SyncData節(jié)點(diǎn),SyncData的object指針指向新的object對(duì)象
        if ( firstUnused != NULL ) {
            result = firstUnused;
            result->object = (objc_object *)object;
            result->threadCount = 1;
            goto done;
        }
    }

    //如果沒(méi)有可重用的節(jié)點(diǎn),則創(chuàng)建一個(gè)新的SyncData節(jié)點(diǎn)
    result = (SyncData*)calloc(sizeof(SyncData), 1);

    //將新的SyncData節(jié)點(diǎn)的object指針指向傳進(jìn)來(lái)的object對(duì)象
    result->object = (objc_object *)object;
    result->threadCount = 1;

    //創(chuàng)建一個(gè)新的與該object關(guān)聯(lián)的遞歸互斥鎖
    new (&result->mutex) recursive_mutex_t();
    result->nextData = *listp;
    *listp = result;
    
 done:
    lockp->unlock();
    return result;
}

int objc_sync_enter(id obj)
{
    int result = OBJC_SYNC_SUCCESS;
    if (obj) {
        //根據(jù)obj指針的哈希值查找對(duì)應(yīng)的SyncData,threadcount計(jì)數(shù)加一
        SyncData* data = id2data(obj, ACQUIRE);

        //使用SyncData的互斥鎖上鎖
        data->mutex.lock();
    } else {
        // @synchronized(nil) 傳入nil時(shí)什么也不處理
    }
    return result;
}

int objc_sync_exit(id obj)
{
    int result = OBJC_SYNC_SUCCESS;
    if (obj) {
        //根據(jù)obj指針的哈希值查找對(duì)應(yīng)的SyncData,threadcount計(jì)數(shù)減一
        SyncData* data = id2data(obj, RELEASE);

        //使用SyncData的互斥鎖解鎖 
        bool okay = data->mutex.tryUnlock();
        if (!okay) {
           result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
        }
    } else {
        // @synchronized(nil) 傳入nil時(shí)什么也不處理
    }
    return result;
}

簡(jiǎn)單來(lái)說(shuō),調(diào)用objc_sync_enter(obj)時(shí),會(huì)根據(jù)obj指針在哈希表sDataLists對(duì)應(yīng)的鏈表SyncList,然后在鏈表中查詢(xún)對(duì)應(yīng)obj的SyncData對(duì)象,如果查詢(xún)不到則創(chuàng)建一個(gè)新的SyncData對(duì)象(包含創(chuàng)建跟obj相關(guān)的遞歸互斥鎖)并添加到鏈表中,然后使用SyncData對(duì)象上鎖;調(diào)用objc_sync_exit(obj)時(shí),使用SyncData對(duì)象解鎖,因此通過(guò)這個(gè)鎖便可確保@synchronized之間的代碼線程安全。

sDataLists
深入dispatch_once

探討了synchronized之后,我們?cè)賮?lái)說(shuō)說(shuō)dispatch_once。

void dispatch_once(dispatch_once_t *predicate, dispatch_block_t block);

根據(jù)官方文檔,dispatch_once可以用來(lái)初始化一些全局的數(shù)據(jù),它能夠確保block代碼在app的生命周期內(nèi)僅被運(yùn)行一次,而且還是線程安全的,不需要額外加鎖;predicate必須指向一個(gè)全局或者靜態(tài)的變量,不過(guò)使用predicate的話結(jié)果是未定義的,不過(guò)predicate有啥作用,如何實(shí)現(xiàn)block在整個(gè)生命周期執(zhí)行一次?那我們只能從源碼查找(源碼地址:once)。
不過(guò)在這之前先簡(jiǎn)要介紹一下:

  • bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
    提供原子的比較和交換操作,如果當(dāng)前值 *ptr == oldval,就將newval寫(xiě)入ptr,當(dāng)比較賦值操作成功后返回true

  • *__sync_synchronize (...)
    調(diào)用這個(gè)函數(shù)會(huì)產(chǎn)生一個(gè)full memory barrier ,用于保證CPU按照我們代碼編寫(xiě)的順序來(lái)執(zhí)行代碼,比如:

doJob1();
 doJob2();
 __sync_synchronize();  //Job3會(huì)在Job1跟Job2完成后才執(zhí)行
doJob3();
  • type __sync_swap(type *ptr, type value, ...)
    提供原子交換操作的函數(shù),交換第一個(gè)跟第二個(gè)參數(shù)的值,然后返回交換前第一個(gè)參數(shù)的舊值。
  • _dispatch_hardware_pause()
    調(diào)用這個(gè)函數(shù)主要是暗示處理器不要做額外的優(yōu)化處理等,提高性能,節(jié)省CPU時(shí)間,可以查看這里了解更多
  • 信號(hào)量
    信號(hào)量是一個(gè)非負(fù)整數(shù),定義了兩種原子操作:wait跟signal來(lái)進(jìn)行訪,信號(hào)量主要用于線程同步。當(dāng)一個(gè)線程調(diào)用wait操作時(shí),如果信號(hào)量的值大于0,則獲得資源并將信號(hào)量值減一,如果等于0線程睡眠直到信號(hào)量值大于0或者超時(shí);singal將信號(hào)量的值加1,如果這時(shí)候有正在等待的線程,喚醒該線程。
// 創(chuàng)建一個(gè)信號(hào)量,其值為0        
dispatch_semaphore_t sema = dispatch_semaphore_create(0);
ABAddressBookRequestAccessWithCompletion(addressBook, ^(bool granted, CFErrorRef error) {
    //操作完成后,調(diào)用signal信號(hào)量+1
    dispatch_semaphore_signal(sema);
});
//等待dispatch_semaphore_signal將信號(hào)量值加1后才繼續(xù)運(yùn)行
dispatch_semaphore_wait(sema, DISPATCH_TIME_FOREVER);

接下來(lái)看看具體代碼,當(dāng)我們調(diào)用dispatch_once時(shí)候,內(nèi)部是調(diào)用dispatch_once_f函數(shù),其中val就是外部傳入的predicate值,ctxt為Block的指針,func則是Block內(nèi)部具體實(shí)現(xiàn)的函數(shù)指針,由于源碼比較短,所以我直接把源碼貼出來(lái)(為了方便查看,有些不使用宏定義)。

struct _dispatch_once_waiter_s {
    volatile struct _dispatch_once_waiter_s *volatile dow_next;
    _dispatch_thread_semaphore_t dow_sema;
};
#define DISPATCH_ONCE_DONE ((struct _dispatch_once_waiter_s *)~0l)
void dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
    struct Block_basic *bb = (void *)block;
    dispatch_once_f(val, block, (void *)bb->Block_invoke);
}

void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
    //volatile,標(biāo)示該變量隨時(shí)可能改變,編譯器不會(huì)對(duì)訪問(wèn)該變量的代碼進(jìn)行優(yōu)化,每次都從內(nèi)存去讀取,而不使用寄存器里的值
    struct _dispatch_once_waiter_s * volatile *vval =
            (struct _dispatch_once_waiter_s**)val;
    struct _dispatch_once_waiter_s dow = { NULL, 0 };
    struct _dispatch_once_waiter_s *tail, *tmp;
    _dispatch_thread_semaphore_t sema;

    //第一次執(zhí)行的時(shí)候,predicate的值為0,所以vval=NULL,原子比較交換函數(shù)返回true
    //然后vval指向dow(dispatch_once_waiter_s,信號(hào)量的值為0,即等待中)
    if (__sync_bool_compare_and_swap(vval, NULL, &dow)) {

        //空的宏定義,啥也不做
        dispatch_atomic_acquire_barrier();

        //執(zhí)行dispatch_once傳進(jìn)來(lái)的block
        _dispatch_client_callout(ctxt, func);
        
        //后面解釋
        dispatch_atomic_maximally_synchronizing_barrier();
        
        //執(zhí)行完block之后,將vval的值設(shè)為DISPATCH_ONCE_DONE(即predicate設(shè)為~0l)
        tmp = __sync_swap(vval, DISPATCH_ONCE_DONE);  
        tail = &dow;

        //1.如果在block的執(zhí)行過(guò)程中,沒(méi)有其線程調(diào)用該函數(shù)等待,tmp的值也為&dow,tail==tmp,循環(huán)的條件不滿足,函數(shù)執(zhí)行完畢
        //2.如果在block的執(zhí)行過(guò)程中,有其線程調(diào)用該函數(shù)等待,歷遍信號(hào)量鏈表,逐個(gè)喚醒線程繼續(xù)運(yùn)行
        while (tail != tmp) {
            //如果中途有其它線程將vval賦值&dow,這期間dow_next值為NULL,需要等待,參見(jiàn)else分支的__sync_bool_compare_and_swap調(diào)用
            while (!tmp->dow_next) {
                _dispatch_hardware_pause();
            }
            sema = tmp->dow_sema;
            tmp = (struct _dispatch_once_waiter_s*)tmp->dow_next;
            _dispatch_thread_semaphore_signal(sema);
        }
    }
    else 
    {   
        //如果vval不等NULL,走這個(gè)分支,非第一次調(diào)用dispatch_once,其它線程調(diào)用
        //獲取信號(hào)量,如果有信號(hào)量則返回該信號(hào)量,如果沒(méi)有則在當(dāng)前線程創(chuàng)建一個(gè)新的信號(hào)量
        dow.dow_sema = _dispatch_get_thread_semaphore();
        for (;;) {
            tmp = *vval;

            //vval已經(jīng)被賦值為~0l,證明block已經(jīng)被執(zhí)行了,退出然后調(diào)用_dispatch_put_thread_semaphore銷(xiāo)毀信號(hào)量
            if (tmp == DISPATCH_ONCE_DONE) {
                break;
            }
            //空的宏定義,啥也不做
            dispatch_atomic_store_barrier();

            //將當(dāng)前信號(hào)量加入到信號(hào)鏈表中,然后線程等待,
            if (__sync_bool_compare_and_swap(vval, tmp, &dow)) {
                dow.dow_next = tmp;
                _dispatch_thread_semaphore_wait(dow.dow_sema);
            }

            //如果vval的指向值不再是tmp,可能其它線程同時(shí)進(jìn)入該分支,然后調(diào)用__sync_bool_compare_and_swap原子操作將vval指向了新的節(jié)點(diǎn),
            //則重新開(kāi)始for循環(huán)
        }
        _dispatch_put_thread_semaphore(dow.dow_sema);
    }
}

讓我們來(lái)看看dispatch_once是如何確保block只執(zhí)行一次。簡(jiǎn)單來(lái)說(shuō),當(dāng)線程A在調(diào)用執(zhí)行block并設(shè)置predicate為DISPATCH_ONCE_DONE(~0l)期間,如果有其他線程也在調(diào)用disptach_once,則這些線程會(huì)等待,各線程對(duì)應(yīng)的信號(hào)量會(huì)加入到信號(hào)量鏈表中,等predicate設(shè)置為DISPATCH_ONCE_DONE后,也就是block執(zhí)行完了,會(huì)根據(jù)信號(hào)量鏈表喚醒各個(gè)線程使其繼續(xù)執(zhí)行。


信號(hào)量鏈表.png

??不過(guò)有一種臨界情況,假如線程A在執(zhí)行block,但是創(chuàng)建單例對(duì)象obj還未完成,這時(shí)候線程B獲取該obj對(duì)象,此時(shí)obj=nil,而線程B在線程A將predicate設(shè)為DISPATCH_ONCE_DONE之后讀取predicate,這是線程B會(huì)認(rèn)為單例對(duì)象已經(jīng)初始化完成,然后使用空的obj對(duì)象,這就會(huì)導(dǎo)致錯(cuò)誤發(fā)生。因此dispatch_once會(huì)在執(zhí)行完block之后會(huì)執(zhí)行dispatch_atomic_maximally_synchronizing_barrier()調(diào)用,這個(gè)調(diào)用會(huì)執(zhí)行一些cpuid指令,確保線程A創(chuàng)建單例對(duì)象obj以及置predicate為DISPATCH_ONCE_DONE的時(shí)間TimeA大于線程B進(jìn)入block并讀取predicate值的時(shí)間TimeB。

#define dispatch_atomic_maximally_synchronizing_barrier() \
    do { unsigned long _clbr; __asm__ __volatile__( \
    "cpuid" \
    : "=a" (_clbr) : "0" (0) : "ebx", "ecx", "edx", "cc", "memory" \
    ); } while(0)

除此之外,每次調(diào)用dispatch_once的時(shí)候,都會(huì)先判斷predicate的值是否是~0l(也就是DISPATCH_ONCE_DONE),如果是則意味著block已經(jīng)執(zhí)行過(guò)了,便不再執(zhí)行,代碼如下:

void dispatch_once(dispatch_once_t *predicate, dispatch_block_t block);
#ifdef __GNUC__
#define dispatch_once(x, ...) do { if (__builtin_expect(*(x), ~0l) != ~0l) dispatch_once((x), (__VA_ARGS__)); } while (0)
#endif

讓我們看看這里面的__builtin_expect((x), (v)),這又是一個(gè)優(yōu)化的地方。。。

__builtin_expect()目的是將“分支轉(zhuǎn)移”的信息提供給編譯器,這樣編譯器可以對(duì)代碼進(jìn)行優(yōu)化,
以減少指令跳轉(zhuǎn)帶來(lái)的性能下降。
__builtin_expect((x),1) 表示 x 的值為真的可能性更大; 
__builtin_expect((x),0) 表示 x 的值為假的可能性更大。  

由于dispatch_once的只執(zhí)行block一次,所以我們更期望的是已經(jīng)block已經(jīng)執(zhí)行完了,也就是predict的值為~0l的可能性更大。
??現(xiàn)在我們清楚dispatch_once是如何確保block只執(zhí)行一次了,關(guān)鍵就在predict這個(gè)值,通過(guò)比較這個(gè)值等于0或者~0l來(lái)判斷block是否執(zhí)行過(guò),這也就是為啥我們需要將這個(gè)值設(shè)為static或者全局的緣故,因?yàn)楦鱾€(gè)線程都要去訪問(wèn)這個(gè)predict,有興趣的可以試試把predicate的初始值設(shè)為非0或者非靜態(tài)全局變量會(huì)發(fā)生什么~~

總結(jié)

通過(guò)上面的分析,我們知道@synchronized采用的是遞歸互斥鎖來(lái)實(shí)現(xiàn)線程安全,而dispatch_once的內(nèi)部則使用了很多原子操作來(lái)替代鎖,以及通過(guò)信號(hào)量來(lái)實(shí)現(xiàn)線程同步,而且有很多針對(duì)處理器優(yōu)化的地方,甚至在if判斷語(yǔ)句上也做了優(yōu)化(逼格有點(diǎn)高),使得其效率有很大的提升,雖然其源碼很短,但里面包含的東西卻很多,所以蘋(píng)果也推薦使用dispatch_once來(lái)創(chuàng)建單例。通過(guò)這個(gè)簡(jiǎn)短的dispatch_once,你也可以清楚為什么GCD的性能會(huì)這么高了,感興趣可以再去看看libdispatch的其它源碼。。

參考

objc-sync
synchronized
dispatch_once
Built-in functions for atomic memory access
__builtin_expect

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容