說一說Kotlin協(xié)程中的同步鎖——Mutex

前言

在多線程并發(fā)的情況下會很容易出現(xiàn)同步問題,這時候就需要使用各種鎖來避免這些問題,在java開發(fā)中,最常用的就是使用synchronized。kotlin的協(xié)程也會遇到這樣的問題,因為在協(xié)程線程池中會同時存在多個運行的Worker,每一個Worker都是一個線程,這樣也會有并發(fā)問題。

雖然kotlin中也可以使用synchronized,但是有很大的問題。因為synchronized當獲取不到鎖的時候,會阻塞線程,這樣這個線程一段時間內(nèi)就無法處理其他任務,這不符合協(xié)程的思想。為此,kotlin提供了一個協(xié)程中可以使用的同步鎖——Mutex

Mutex

Mutex使用起來也非常簡單,只有幾個函數(shù)lock、unlock、tryLock,一看名字就知道是什么。還有一個holdsLock,就是返回當前鎖的狀態(tài)。

這里要注意,lock和unlock必須成對出現(xiàn),tryLock返回true的之后也必須在使用完執(zhí)行unlock。這樣使用的時候就比較麻煩,所以kotlin還提供了一個擴展函數(shù)withLock,它與synchronized類似,會在代碼執(zhí)行完成或異常的時候自動釋放鎖,這樣就避免了忘記釋放鎖導致程序出錯的情況。

withLock

withLock的代碼如下:

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
    contract { 
        callsInPlace(action, InvocationKind.EXACTLY_ONCE)
    }

    lock(owner)
    try {
        return action()
    } finally {
        unlock(owner)
    }
}

代碼非常簡單,就是先lock一下,然后執(zhí)行代碼,最終在finally中釋放鎖,這樣就保證了鎖一定會被釋放。

lock

這樣一看mutex好像跟synchronized或其他java的鎖差不多,那么為什么它是如何解決線程阻塞的問題呢。

這就要從lock和unlock的流程中來看,先來看看lock:

public override suspend fun lock(owner: Any?) {
    // fast-path -- try lock
    if (tryLock(owner)) return
    // slow-path -- suspend
    return lockSuspend(owner)
}

先是通過tryLock來獲取鎖,如果獲取到了就直接返回執(zhí)行代碼。重點來看獲取不到是如何處理的,獲取不到的時候會執(zhí)行l(wèi)ockSuspend,它的代碼如下:

private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> sc@ { cont ->
    var waiter = LockCont(owner, cont)  //1
    _state.loop { state ->
        when (state) {
            is Empty -> {
                if (state.locked !== UNLOCKED) {  //2
                    _state.compareAndSet(state, LockedQueue(state.locked)) //3
                } else {
                    // try lock
                    val update = if (owner == null) EMPTY_LOCKED else Empty(owner)
                    if (_state.compareAndSet(state, update)) { // locked
                        cont.resume(Unit) { unlock(owner) } //4
                        return@sc
                    }
                }
            }
            is LockedQueue -> {
                val curOwner = state.owner
                check(curOwner !== owner) { "Already locked by $owner" }

                state.addLast(waiter)  //5

                if (_state.value === state || !waiter.take()) {  //6
                    // added to waiter list
                    cont.removeOnCancellation(waiter)
                    return@sc
                }

                waiter = LockCont(owner, cont)
                return@loop
            }
            is OpDescriptor -> state.perform(this) // help
            else -> error("Illegal state $state")
        }
    }
}

可以看到這個函數(shù)是被suspend修飾的,所以這個是可掛起的函數(shù),當執(zhí)行到這里的時候線程就被掛起了,如果沒有立刻恢復,而且有其他任務,那么線程就可以先執(zhí)行其他任務,這樣就不會阻塞住了。那么是如何恢復的。

函數(shù)一開始創(chuàng)建了一個LockCont對象waiter,這個是后面的關鍵,不過現(xiàn)在還用不到。

Empty

繼續(xù)看根據(jù)不同的狀態(tài)執(zhí)行不同的代碼,先看看Empty(等待列表為空)狀態(tài),再判斷一下當前是否加鎖(代碼2),如果不是非加鎖則將狀態(tài)設置為LockedQueue狀態(tài)(代碼3);如果當前是非加鎖,則獲取鎖,獲取到之后執(zhí)行resume來喚醒線程來執(zhí)行后續(xù)代碼(代碼4),這種情況基本就是立刻獲取到鎖,所以不在這里細說了。

上面說了如果等待列表為空并且無法立刻獲取鎖,就會切換到LockedQueue狀態(tài)(代碼3),所以只要當前無法獲取鎖,最終都會進行LockedQueue狀態(tài),那么來看看這個狀態(tài)怎么處理的。

LockedQueue

這個狀態(tài)會就將函數(shù)一開始創(chuàng)建的waiter添加到state中(代碼5),然后還是再判斷一次當前狀態(tài),因為這時候可能鎖的狀態(tài)已經(jīng)改變了,如果沒有變則直接就返回了。

注意看到每個狀態(tài)里,都會反復的校驗當前鎖的狀態(tài)。

可以看到在LockedQueue這個流程結束后并沒有恢復線程,線程則一直是掛起狀態(tài),所以在恢復之前線程是可以處理其他事務的。

那么線程何時恢復?

unlock

來看看unlock代碼:

override fun unlock(owner: Any?) {
    _state.loop { state ->
        when (state) {
            is Empty -> {
                ...
            }
            is OpDescriptor -> state.perform(this)
            is LockedQueue -> {
                if (owner != null)
                    check(state.owner === owner) { "Mutex is locked by ${state.owner} but expected $owner" }
                val waiter = state.removeFirstOrNull()  //1
                if (waiter == null) {
                    ...
                } else {
                    if ((waiter as LockWaiter).tryResumeLockWaiter()) { //2
                        state.owner = waiter.owner ?: LOCKED
                        waiter.completeResumeLockWaiter() //3
                        return
                    }
                }
            }
            else -> error("Illegal state $state")
        }
    }
}

上面我們將waiter放入了等待隊列中,這時候狀態(tài)是LockedQueue,所以在unlock函數(shù)中我們直接看這個狀態(tài)的代碼。

代碼1處從state中取出第一個元素,即waiter。前一個釋放鎖之后,就會把鎖分配給這個waiter。然后在代碼2處執(zhí)行了它的tryResumeLockWaiter函數(shù),如果返回false,還會執(zhí)行它的completeResumeLockWaiter函數(shù)。

LockCont

上面知道waiter是一個LockCont對象,我們來看看它的源碼:

private inner class LockCont(
    owner: Any?,
    private val cont: CancellableContinuation<Unit>
) : LockWaiter(owner) {

    override fun tryResumeLockWaiter(): Boolean {
        if (!take()) return false
        return cont.tryResume(Unit, idempotent = null) {
            unlock(owner)
        } != null
    }

    override fun completeResumeLockWaiter() = cont.completeResume(RESUME_TOKEN)
    ...
}

可以看到在tryResumeLockWaiter函數(shù)中會執(zhí)行cont的tryResume來嘗試喚醒它對應的線程來執(zhí)行代碼。

如果這個動作沒有成功,最后會在completeResumeLockWaiter函數(shù)中執(zhí)行cont的completeResume來喚醒線程。

總結

Mutex的內(nèi)部邏輯其實并不復雜,如果獲取不到鎖則會掛起線程并加入到等待隊列中,等獲取到鎖的時候在喚醒線程來執(zhí)行代碼。而這段時間內(nèi)線程,或者說Worker可以執(zhí)行其他任務,這樣不會阻塞線程,最大的利用了線程的資源,這就很kotlin。

所以大家在處理協(xié)程的同步問題的時候,盡量使用Mutex這種Kotlin專門為協(xié)程開發(fā)的工具,這樣才能更好的發(fā)揮協(xié)程的能力。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容