sync 源碼解析

type Locker interface {
   Lock()
   Unlock()
}

Mutex 互斥鎖

互斥即不可同時運行。即使用了互斥鎖的兩個代碼片段互相排斥,只有其中一個代碼片段執(zhí)行完成后,另一個才能執(zhí)行。

type Mutex struct {
   state int32
   sema  uint32
}

state 表示當(dāng)前互斥鎖的狀態(tài),而 sema 是用于控制鎖狀態(tài)的信號量。

互斥鎖的狀態(tài)比較復(fù)雜,如下圖所示,最低三位分別表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用來表示當(dāng)前有多少個 Goroutine 在等待互斥鎖的釋放

image.png

在默認(rèn)情況下,互斥鎖的所有狀態(tài)位都是 0,int32 中的不同位分別表示了不同的狀態(tài):

  • mutexLocked — 表示互斥鎖的鎖定狀態(tài);
  • mutexWoken — 表示從正常模式被從喚醒;
  • mutexStarving — 當(dāng)前的互斥鎖進(jìn)入饑餓狀態(tài);
  • waitersCount — 當(dāng)前互斥鎖上等待的 Goroutine 個數(shù);

Mutex 正常模式和饑餓模式

  • 在正常模式下,鎖的等待者會按照先進(jìn)先出的順序獲取鎖。
  • 但是剛被喚起的 Goroutine 與新創(chuàng)建的 Goroutine 競爭時,大概率會獲取不到鎖,為了減少這種情況的出現(xiàn),一旦 Goroutine 超過 1ms 沒有獲取到鎖,它就會將當(dāng)前互斥鎖切換饑餓模式,防止部分 Goroutine 被『餓死』。
  • 在饑餓模式中,互斥鎖會直接交給等待隊列最前面的 Goroutine。新的 Goroutine 在該狀態(tài)下不能獲取鎖、也不會進(jìn)入自旋狀態(tài),它們只會在隊列的末尾等待。
  • 如果一個 Goroutine 獲得了互斥鎖并且它在隊列的末尾或者它等待的時間少于 1ms,那么當(dāng)前的互斥鎖就會切換回正常模式。
  • 與饑餓模式相比,正常模式下的互斥鎖能夠提供更好地性能,饑餓模式的能避免 Goroutine 由于陷入等待無法獲取鎖而造成的高尾延時。
const (
   mutexLocked = 1 << iota      //1         互斥鎖被鎖定 + 未被喚醒
   mutexWoken                   //2         互斥鎖未鎖定 + 被喚醒
   mutexStarving                //4         互斥鎖未鎖定 + 未被喚醒 + 饑餓模式啟動 
   mutexWaiterShift = iota      //3         互斥鎖被鎖定 + 被喚醒
   starvationThresholdNs = 1e6
)

加鎖和解鎖

func (m *Mutex) Lock() {
   // 當(dāng)鎖的狀態(tài)是 0 時,將state置位 mutexLocked 為 1:
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      if race.Enabled {
         race.Acquire(unsafe.Pointer(m))
      }
      return
   }
   // 如果互斥鎖的狀態(tài)不是 0 時就會調(diào)用 sync.Mutex.lockSlow 嘗試通過自旋(Spinnig)
   m.lockSlow()
}
func (m *Mutex) lockSlow() {

該方法的主體是一個非常大 for 循環(huán),這里將它分成幾個部分介紹獲取鎖的過程:

  • 判斷當(dāng)前 Goroutine 能否進(jìn)入自旋;
  • 通過自旋等待互斥鎖的釋放;
  • 計算互斥鎖的最新狀態(tài);
  • 更新互斥鎖的狀態(tài)并獲取鎖;

自旋是一種多線程同步機制,當(dāng)前的進(jìn)程在進(jìn)入自旋的過程中會一直保持 CPU 的占用,持續(xù)檢查某個條件是否為真

在多核的 CPU 上,自旋可以避免 Goroutine 的切換,使用恰當(dāng)會對性能帶來很大的增益,但是使用的不恰當(dāng)就會拖慢整個程序,所以 Goroutine 進(jìn)入自旋的條件非常苛刻

  • 互斥鎖只有在普通模式才能進(jìn)入自旋;
  • runtime.sync_runtime_canSpin 需要返回 true:
    1. 運行在多 CPU 的機器上;
    2. 當(dāng)前 Goroutine 為了獲取該鎖進(jìn)入自旋的次數(shù)小于四次;
    3. 當(dāng)前機器上至少存在一個正在運行的處理器 P 并且處理的運行隊列為空;

下面代碼為 進(jìn)入自旋 邏輯

   // 開始等待時間戳
   var waitStartTime int64
   // 饑餓模式標(biāo)識
   starving := false
   // 喚醒標(biāo)識
   awoke := false
   // 自旋次數(shù)
   iter := 0
   // 保存當(dāng)前對象鎖狀態(tài)
   old := m.state
   for {
      // 鎖是非饑餓狀態(tài),鎖還沒被釋放,嘗試自旋
      // 判斷相當(dāng)于xxxx...x0xx & 0101 = 01,當(dāng)前對象狀態(tài)為:xxxx…x0xx 當(dāng)前對象鎖被使用
      // runtime_canSpin(iter) 根據(jù)iter自旋次數(shù)判斷
      // 互斥鎖被鎖定時 進(jìn)入自旋狀態(tài) continue
      if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
         if !awoke && 
            // 判斷相當(dāng)于: xxxx...xx0x & 0010 = 0
            // 當(dāng)前對象狀態(tài)為未被喚醒時 
            old&mutexWoken == 0 && 
            // 當(dāng)前有 Goroutine 在等待互斥鎖的釋放時,也就是有g(shù)oroution在排隊時  old>>mutexWaiterShift != 0 為 true
            old>>mutexWaiterShift != 0 &&
            // 將對象 改為喚醒狀態(tài):xxxx...xx0x | 0010 = xxxx...xx1x
            // 在將 m.state 改為被喚醒狀態(tài) 
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 
         {
                // 告知解鎖,不要喚醒其他阻塞的goroutines
                awoke = true
         }
         // 進(jìn)入自旋狀態(tài) 當(dāng)前goroutine并不掛起,仍然在占用cpu資源,重試一定次數(shù)后,退出自旋狀態(tài)
         runtime_doSpin()
         //表示自旋次數(shù)
         iter++
         // 保存mutex對象即將被設(shè)置成的狀態(tài)
         //再次獲取鎖的狀態(tài),之后會檢查是否鎖被釋放了
         old = m.state
         continue
      }

一旦當(dāng)前 Goroutine 能夠進(jìn)入自旋就會調(diào)用runtime.sync_runtime_doSpin 和 runtime.procyield 并執(zhí)行 30 次的 PAUSE 指令,該指令只會占用 CPU 并消耗 CPU 時間:

func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL cycles+0(FP), AX
again:
    PAUSE
    SUBL $1, AX
    JNZ again
    RET

處理了自旋相關(guān)的特殊邏輯之后,互斥鎖會根據(jù)上下文計算當(dāng)前互斥鎖最新的狀態(tài)。幾個不同的條件分別會更新 state 字段中存儲的不同信息 — mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:

      new := old
      // old 后三位為 mutexLocked,mutexWaiterShift,mutexWoken 時 old&mutexStarving == 0 為true
      // xxxx...x0xx & 0100 = 0   xxxx...x0xx
      // old&mutexStarving == 0  篩選當(dāng)前狀態(tài)為正常模式(非饑餓模式)
      // 將新來的goroutines 互斥鎖鎖定
      if old&mutexStarving == 0 {
         // 將未鎖定狀態(tài)改為鎖定狀態(tài)
        // 非饑餓狀態(tài),加鎖
         new |= mutexLocked
      }
      // xxxx...x1x1 & (0001 | 0100) => xxxx...x1x1 & 0101 != 0;
      // old 互斥鎖被鎖定時但是狀態(tài)為饑餓模式時,mutex的等待goroutine數(shù)目加1  
      if old&(mutexLocked|mutexStarving) != 0 {
        // new + 8  在等待互斥鎖的釋放的Goroutine數(shù)量 +1
        // 更新阻塞goroutine的數(shù)量,表示mutex的等待goroutine數(shù)目加1
         new += 1 << mutexWaiterShift
      }
      // xxxx...xxx1 & 0001 != 0;鎖狀態(tài)為鎖定狀態(tài) 
      if starving && old&mutexLocked != 0 {
        // new + 4 表示從正常模式進(jìn)入饑餓模式
        // xxxx...xxx | 0100 => xxxx...x1xx
         new |= mutexStarving
      }
      // 當(dāng) 喚醒標(biāo)志為 true時 
      if awoke {
        // xxxx...xx1x & 0010 = 0, 如果喚醒標(biāo)志為0 ,表示為被喚醒時,panic
         if new&mutexWoken == 0 {
            throw("sync: inconsistent mutex state")
         }
         // new & (^mutexWoken) => xxxx...xxxx & (^0010) => xxxx...xxxx & 1101 = xxxx...xx0x :設(shè)置喚醒狀態(tài)位0,goroutine是被喚醒的,新狀態(tài)清除喚醒標(biāo)志
         new &^= mutexWoken
      }

計算了新的互斥鎖狀態(tài)之后,會使用 CAS 函數(shù) sync/atomic.compareandswapint32 更新狀態(tài)

     //判斷cas鎖是否 設(shè)置新狀態(tài) 成功 
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
        //原來鎖的狀態(tài)已釋放,并且不是饑餓狀態(tài),正常請求到了鎖,返回
        // xxxx...x0x0 & 0101 = 0, 互斥鎖未加鎖且為正常模式(非饑餓模式)
         if old&(mutexLocked|mutexStarving) == 0 {
            //結(jié)束cas
            break
         }
         // 如果以前就在隊列里面,加入到隊列頭
         queueLifo := waitStartTime != 0
         if waitStartTime == 0 { 
          //獲取當(dāng)前運行納秒時間戳
            waitStartTime = runtime_nanotime()
         }
         //通過信號量保證資源不會被兩個 Goroutine 獲取
        //runtime.sync_runtime_SemacquireMutex 會在方法中不斷嘗試獲取鎖并陷入休眠等待信號量的釋放,一旦當(dāng)前 Goroutine 可以獲取信號量,它就會立刻返回
         runtime_SemacquireMutex(&m.sema, queueLifo, 1)
         // 判斷 等待時間 是否超出限制 1e6 將starving 標(biāo)志為 true 
         starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
         old = m.state
         // xxxx...x1xx & 0100 != 0   判斷鎖狀態(tài)是否 進(jìn)入饑餓狀態(tài)
         if old&mutexStarving != 0 {
            //當(dāng)前 Goroutine 會獲得互斥鎖,如果等待隊列中只存在當(dāng)前 Goroutine,互斥鎖還會從饑餓模式中退出
            // xxxx...xx00 & 0011 = 0
            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
               throw("sync: inconsistent mutex state")
            }
            // 加鎖并且將waiter數(shù)減1
            // delta -7  7 = 0111
            delta := int32(mutexLocked - 1<<mutexWaiterShift)
            //判斷 等待隊列中 是否只存在當(dāng)前 Goroutine
            if !starving || old>>mutexWaiterShift == 1 {
               // delta -11  11 = 1011 
               delta -= mutexStarving
            }
            // m.state - 7 表示當(dāng)前 Goroutine 會獲得互斥鎖
            //  m.state - 11 表示當(dāng)前 Goroutine 會獲得互斥鎖還會從饑餓模式中退出
            atomic.AddInt32(&m.state, delta)
            break
         }
        // 設(shè)置喚醒標(biāo)記 為true 
         awoke = true
        // 重置迭代次數(shù)
         iter = 0
      } else {
         old = m.state
      }
   }
   if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
   }
}

如果沒有通過 CAS 獲得鎖,會調(diào)用 runtime.sync_runtime_SemacquireMutex 通過信號量保證資源不會被兩個 Goroutine 獲取

runtime.sync_runtime_SemacquireMutex 會在方法中不斷嘗試獲取鎖并陷入休眠等待信號量的釋放,一旦當(dāng)前 Goroutine 可以獲取信號量,它就會立刻返回,sync.Mutex.Lock 的剩余代碼也會繼續(xù)執(zhí)行。

  • 在正常模式下,這段代碼會設(shè)置喚醒和饑餓標(biāo)記、重置迭代次數(shù)并重新執(zhí)行獲取鎖的循環(huán);
  • 在饑餓模式下,當(dāng)前 Goroutine 會獲得互斥鎖,如果等待隊列中只存在當(dāng)前 Goroutine,互斥鎖還會從饑餓模式中退出;

互斥鎖的解鎖過程 sync.Mutex.Unlock 與加鎖過程相比就很簡單,該過程會先使用 sync/atomic.AddInt32 函數(shù)快速解鎖,這時會發(fā)生下面的兩種情況:

  • 如果該函數(shù)返回的新狀態(tài)等于 0,當(dāng)前 Goroutine 就成功解鎖了互斥鎖;
  • 如果該函數(shù)返回的新狀態(tài)不等于 0,這段代碼會調(diào)用 sync.Mutex.unlockSlow 開始慢速解鎖:
func (m *Mutex) Unlock() {
   if race.Enabled {
      _ = m.state
      race.Release(unsafe.Pointer(m))
   }
   // m.state - 1 
   // 新狀態(tài)等于 0,當(dāng)前 Goroutine 就成功解鎖了互斥鎖
   // 新狀態(tài)不等于 0,這段代碼會調(diào)用 sync.Mutex.unlockSlow 開始慢速解鎖
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 {
      m.unlockSlow(new)
   }
}

sync.Mutex.unlockSlow 會先校驗鎖狀態(tài)的合法性 — 如果當(dāng)前互斥鎖已經(jīng)被解鎖過了會直接拋出異常 “sync: unlock of unlocked mutex” 中止當(dāng)前程序。
在正常情況下會根據(jù)當(dāng)前互斥鎖的狀態(tài),分別處理正常模式和饑餓模式下的互斥鎖:

func (m *Mutex) unlockSlow(new int32) {
   // 校驗鎖狀態(tài)的合法性
   if (new+mutexLocked)&mutexLocked == 0 { 
      //如果 當(dāng)前互斥鎖已經(jīng)被解鎖過了會直接拋出異常 “sync: unlock of unlocked mutex” 中止當(dāng)前程序
      throw("sync: unlock of unlocked mutex")
   }
   // 分別處理正常模式和饑餓模式下的互斥鎖
   if new&mutexStarving == 0 {
      // 正常狀態(tài)
      old := new
      for {
         // 如果互斥鎖不存在等待者或者互斥鎖的 mutexLocked、mutexStarving、mutexWoken 狀態(tài)不都為 0(有喚醒者或鎖已經(jīng)加鎖),那么當(dāng)前方法可以直接返回,不需要喚醒其他等待者;
         //如果沒有其它的 waiter,說明對這個鎖的競爭的 goroutine 只有一個,那就可以直接返回了;如果這個時候有喚醒的 goroutine,或者是又被別人加了鎖,那么,無需我們操勞,其它 goroutine 自己干得都很好,當(dāng)前的這個 goroutine 就可以放心返回了。
         //xxxx…x000 & (0001 | 0010 | 0100) => xxxx…x000 & 0111 = 0
         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
            return
         }
         // 如果互斥鎖存在等待者,會通過 sync.runtime_Semrelease 喚醒等待者并移交鎖的所有權(quán);
         //如果有等待者,并且沒有喚醒的 waiter,那就需要喚醒一個等待的 waiter。在喚醒之前,需要將 waiter 數(shù)量減 1,并且將 mutexWoken 標(biāo)志設(shè)置上,這樣,Unlock 就可以返回了。
         new = (old - 1<<mutexWaiterShift) | mutexWoken
         if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime_Semrelease(&m.sema, false, 1)
            return
         }
         old = m.state
      }
   } else {
      // 饑餓狀態(tài)
      // 當(dāng)前鎖交給下一個正在嘗試獲取鎖的等待者,等待者被喚醒后會得到鎖,在這時互斥鎖還不會退出饑餓狀態(tài)
      runtime_Semrelease(&m.sema, true, 1)
   }
}

在正常模式下,上述代碼會使用如下所示的處理過程:

  • 如果互斥鎖不存在等待者或者互斥鎖的 mutexLocked、mutexStarving、mutexWoken 狀態(tài)不都為 0,那么當(dāng)前方法可以直接返回,不需要喚醒其他等待者;
  • 如果互斥鎖存在等待者,會通過 sync.runtime_Semrelease 喚醒等待者并移交鎖的所有權(quán);

在饑餓模式下,上述代碼會直接調(diào)用 sync.runtime_Semrelease 將當(dāng)前鎖交給下一個正在嘗試獲取鎖的等待者,等待者被喚醒后會得到鎖,在這時互斥鎖還不會退出饑餓狀態(tài);

總結(jié)

互斥鎖的加鎖過程比較復(fù)雜,它涉及自旋、信號量以及調(diào)度等概念:

  • 如果互斥鎖處于初始化狀態(tài),會通過置位 mutexLocked 加鎖;
  • 如果互斥鎖處于 mutexLocked 狀態(tài)并且在普通模式下工作,會進(jìn)入自旋,執(zhí)行 30 次 PAUSE 指令消耗 CPU 時間等待鎖的釋放;
  • 如果當(dāng)前 Goroutine 等待鎖的時間超過了 1ms,互斥鎖就會切換到饑餓模式;
  • 互斥鎖在正常情況下會通過 runtime.sync_runtime_SemacquireMutex 將嘗試獲取鎖的 Goroutine 切換至休眠狀態(tài),等待鎖的持有者喚醒;
  • 如果當(dāng)前 Goroutine 是互斥鎖上的最后一個等待的協(xié)程或者等待的時間小于 1ms,那么它會將互斥鎖切換回正常模式;

互斥鎖的解鎖過程與之相比就比較簡單,其代碼行數(shù)不多、邏輯清晰,也比較容易理解:

  • 當(dāng)互斥鎖已經(jīng)被解鎖時,調(diào)用 sync.Mutex.Unlock 會直接拋出異常;
  • 當(dāng)互斥鎖處于饑餓模式時,將鎖的所有權(quán)交給隊列中的下一個等待者,等待者會負(fù)責(zé)設(shè)置 mutexLocked 標(biāo)志位;
  • 當(dāng)互斥鎖處于普通模式時,如果沒有 Goroutine 等待鎖的釋放或者已經(jīng)有被喚醒的 Goroutine 獲得了鎖,會直接返回;在其他情況下會通過 sync.runtime_Semrelease 喚醒對應(yīng)的 Goroutine;

RWMutex 讀寫鎖

讀寫互斥鎖 sync.RWMutex 是細(xì)粒度的互斥鎖,它不限制資源的并發(fā)讀,但是讀寫、寫寫操作無法并行執(zhí)行。
sync.RWMutex 中總共包含以下 5 個字段:

type RWMutex struct {
   w           Mutex  // held if there are pending writers
   writerSem   uint32 // semaphore for writers to wait for completing readers
   readerSem   uint32 // semaphore for readers to wait for completing writers
   readerCount int32  // number of pending readers
   readerWait  int32  // number of departing readers
}
  • w — 復(fù)用互斥鎖提供的能力;
  • writerSem 和 readerSem — 分別用于寫等待讀和讀等待寫:
  • readerCount 存儲了當(dāng)前正在執(zhí)行的讀操作數(shù)量;
  • readerWait 表示當(dāng)寫操作被阻塞時等待的讀操作個數(shù);

寫鎖

當(dāng)資源的使用者想要獲取寫鎖時,需要調(diào)用 sync.RWMutex.Lock 方法:

func (rw *RWMutex) Lock() {
   if race.Enabled {
      _ = rw.w.state
      race.Disable()
   }
   rw.w.Lock()
   r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders 
   if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
      //進(jìn)入休眠狀態(tài)等待所有讀鎖所有者執(zhí)行結(jié)束后釋放 writerSem 信號量將當(dāng)前協(xié)程喚醒
      runtime_SemacquireMutex(&rw.writerSem, false, 0)
   }
   if race.Enabled {
      race.Enable()
      race.Acquire(unsafe.Pointer(&rw.readerSem))
      race.Acquire(unsafe.Pointer(&rw.writerSem))
   }
}
  1. 調(diào)用結(jié)構(gòu)體持有的 sync.Mutex 結(jié)構(gòu)體的 sync.Mutex.Lock 阻塞后續(xù)的寫操作;因為互斥鎖已經(jīng)被獲取,其他 Goroutine 在獲取寫鎖時會進(jìn)入自旋或者休眠;
  2. 調(diào)用 sync/atomic.AddInt32 函數(shù)阻塞后續(xù)的讀操作:
  3. 如果仍然有其他 Goroutine 持有互斥鎖的讀鎖,該 Goroutine 會調(diào)用 runtime.sync_runtime_SemacquireMutex 進(jìn)入休眠狀態(tài)等待所有讀鎖所有者執(zhí)行結(jié)束后釋放 writerSem 信號量將當(dāng)前協(xié)程喚醒;

寫鎖的釋放會調(diào)用 sync.RWMutex.Unlock:

func (rw *RWMutex) Unlock() {
   if race.Enabled {
      _ = rw.w.state
      race.Release(unsafe.Pointer(&rw.readerSem))
      race.Disable()
   }
   r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
   if r >= rwmutexMaxReaders {
      race.Enable()
      throw("sync: Unlock of unlocked RWMutex")
   }
   for i := 0; i < int(r); i++ {
      runtime_Semrelease(&rw.readerSem, false, 0)
   }
   rw.w.Unlock()
   if race.Enabled {
      race.Enable()
   }
}

與加鎖的過程正好相反,寫鎖的釋放分以下幾個執(zhí)行:

  • 調(diào)用 sync/atomic.AddInt32 函數(shù)將 readerCount 變回正數(shù),釋放讀鎖;
  • 通過 for 循環(huán)釋放所有因為獲取讀鎖而陷入等待的 Goroutine:
  • 調(diào)用 sync.Mutex.Unlock 釋放寫鎖;

獲取寫鎖時會先阻塞寫鎖的獲取,后阻塞讀鎖的獲取,這種策略能夠保證讀操作不會被連續(xù)的寫操作『餓死』。

讀鎖

讀鎖的加鎖方法 sync.RWMutex.RLock 很簡單,該方法會通過 sync/atomic.AddInt32 將 readerCount 加一:

func (rw *RWMutex) RLock() {
   if race.Enabled {
      _ = rw.w.state
      race.Disable()
   }
   if atomic.AddInt32(&rw.readerCount, 1) < 0 {
      // A writer is pending, wait for it.
      runtime_SemacquireMutex(&rw.readerSem, false, 0)
   }
   if race.Enabled {
      race.Enable()
      race.Acquire(unsafe.Pointer(&rw.readerSem))
   }
}
  1. 如果該方法返回負(fù)數(shù) — 其他 Goroutine 獲得了寫鎖,當(dāng)前 Goroutine 就會調(diào)用 runtime.sync_runtime_SemacquireMutex 陷入休眠等待鎖的釋放;
  2. 如果該方法的結(jié)果為非負(fù)數(shù) — 沒有 Goroutine 獲得寫鎖,當(dāng)前方法會成功返回;

當(dāng) Goroutine 想要釋放讀鎖時,會調(diào)用如下所示的 sync.RWMutex.RUnlock 方法:

func (rw *RWMutex) RUnlock() {
   if race.Enabled {
      _ = rw.w.state
      race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
      race.Disable()
   }
   if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
      // Outlined slow-path to allow the fast-path to be inlined
      rw.rUnlockSlow(r)
   }
   if race.Enabled {
      race.Enable()
   }
}

該方法會先減少正在讀資源的 readerCount 整數(shù),根據(jù) sync/atomic.AddInt32 的返回值不同會分別進(jìn)行處理:

  • 如果返回值大于等于零 — 讀鎖直接解鎖成功;
  • 如果返回值小于零 — 有一個正在執(zhí)行的寫操作,在這時會調(diào)用sync.RWMutex.rUnlockSlow 方法;
func (rw *RWMutex) rUnlockSlow(r int32) {
   if r+1 == 0 || r+1 == -rwmutexMaxReaders {
      race.Enable()
      throw("sync: RUnlock of unlocked RWMutex")
   }
   // A writer is pending.
   if atomic.AddInt32(&rw.readerWait, -1) == 0 {
      // The last reader unblocks the writer.
      runtime_Semrelease(&rw.writerSem, false, 1)
   }
}

sync.RWMutex.rUnlockSlow 會減少獲取鎖的寫操作等待的讀操作數(shù) readerWait 并在所有讀操作都被釋放之后觸發(fā)寫操作的信號量 writerSem,該信號量被觸發(fā)時,調(diào)度器就會喚醒嘗試獲取寫鎖的 Goroutine。

雖然讀寫互斥鎖 sync.RWMutex 提供的功能比較復(fù)雜,但是因為它建立在 sync.Mutex 上,所以實現(xiàn)會簡單很多。我們總結(jié)一下讀鎖和寫鎖的關(guān)系:

  • 調(diào)用 sync.RWMutex.Lock 嘗試獲取寫鎖時;
    • 每次 sync.RWMutex.RUnlock 都會將 readerCount 其減一,當(dāng)它歸零時該 Goroutine 會獲得寫鎖;
    • 將 readerCount 減少 rwmutexMaxReaders 個數(shù)以阻塞后續(xù)的讀操作;
  • 調(diào)用 sync.RWMutex.Unlock 釋放寫鎖時,會先通知所有的讀操作,然后才會釋放持有的互斥鎖;

讀寫互斥鎖在互斥鎖之上提供了額外的更細(xì)粒度的控制,能夠在讀操作遠(yuǎn)遠(yuǎn)多于寫操作時提升性能。

WaitGroup

sync.WaitGroup 可以等待一組 Goroutine 的返回,一個比較常見的使用場景是批量發(fā)出 RPC 或者 HTTP 請求:

requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done()
        // res, err := service.call(r)
    }(request)
}
wg.Wait()

我們可以通過 sync.WaitGroup 將原本順序執(zhí)行的代碼在多個 Goroutine 中并發(fā)執(zhí)行,加快程序處理的速度。


image.png

WaitGroup 等待多個 Goroutine

結(jié)構(gòu)體

sync.WaitGroup 結(jié)構(gòu)體中只包含兩個成員變量:

type WaitGroup struct {
   noCopy noCopy
   state1 [3]uint32
}
  • noCopy — 保證 sync.WaitGroup 不會被開發(fā)者通過再賦值的方式拷貝;
  • state1 — 存儲著狀態(tài)和信號量;

sync.noCopy 是一個特殊的私有結(jié)構(gòu)體,tools/go/analysis/passes/copylock 包中的分析器會在編譯期間檢查被拷貝的變量中是否包含 sync.noCopy 或者實現(xiàn)了 Lock 和 Unlock 方法
sync.WaitGroup 結(jié)構(gòu)體中包含一個總共占用 12 字節(jié)的數(shù)組,這個數(shù)組會存儲當(dāng)前結(jié)構(gòu)體的狀態(tài),在 64 位與 32 位的機器上表現(xiàn)也非常不同。


image.png

WaitGroup 在 64 位和 32 位機器的不同狀態(tài)

  • 64 位機器上本身就能保證 64 位對齊,所以按照 64 位對齊來取數(shù)據(jù),拿到 state1[0], state1[1] 本身就是64 位對齊的。但是 32 位機器上并不能保證 64 位對齊,因為 32 位機器是 4 字節(jié)對齊,如果也按照 64 位機器取 state[0],state[1] 就有可能會造成 atmoic 的使用錯誤。
  • 32 位機器上空出第一個 32 位,也就使后面 64 位天然滿足 64 位對齊,第一個 32 位放入 sema 剛好合適

WaitGroup.state1 其實代表三個字段:counter,waiter,sema

  • counter :可以理解為一個計數(shù)器,計算經(jīng)過 wg.Add(N), wg.Done() 后的值。
  • waiter :當(dāng)前等待 WaitGroup 任務(wù)結(jié)束的等待者數(shù)量。其實就是調(diào)用 wg.Wait() 的次數(shù),所以通常這個值是 1 。
  • sema : 信號量,用來喚醒 Wait() 函數(shù)。

sync.WaitGroup 提供的私有方法 sync.WaitGroup.state 能夠幫我們從 state1 字段中取出它的狀態(tài)和信號量。

func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
   if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
      return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
   } else {
      return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
   }
}

接口

sync.WaitGroup 對外暴露了三個方法 — sync.WaitGroup.Add、sync.WaitGroup.Wait 和 sync.WaitGroup.Done.

func (wg *WaitGroup) Add(delta int) {
   statep, semap := wg.state()
   if race.Enabled {
      _ = *statep // trigger nil deref early
      if delta < 0 {
         race.ReleaseMerge(unsafe.Pointer(wg))
      }
      race.Disable()
      defer race.Enable()
   }

   //更新worker計數(shù)器
   state := atomic.AddUint64(statep, uint64(delta)<<32)

   //worker計數(shù)器:v 是 statep *uint64 的左32位
   //waiter計數(shù)器:w 是 statep *uint64 的右32位
   v := int32(state >> 32)
   w := uint32(state)

   if race.Enabled && delta > 0 && v == int32(delta) {
      race.Read(unsafe.Pointer(semap))
   }

   if v < 0 {
      panic("sync: negative WaitGroup counter")
   }

   if w != 0 && delta > 0 && v == int32(delta) {
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
   }

  // worker 大于 0 或者 waiter 等于 0 說明還有Goroutine沒有執(zhí)行完,直接返回
   if v > 0 || w == 0 {
      return
   }
   
   if *statep != state {
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
   }
   
   // 狀態(tài)設(shè)置為0 
   *statep = 0

   // 通過 sync.runtime_Semrelease 喚醒處于等待狀態(tài)的 Goroutine(喚醒 Wait())
   for ; w != 0; w-- {
      runtime_Semrelease(semap, false, 0)
   }
}

sync.WaitGroup.Add 可以更新 sync.WaitGroup 中的計數(shù)器 counter。
雖然 sync.WaitGroup.Add 方法傳入的參數(shù)可以為負(fù)數(shù),但是計數(shù)器只能是非負(fù)數(shù),一旦出現(xiàn)負(fù)數(shù)就會發(fā)生程序崩潰。
當(dāng)調(diào)用計數(shù)器歸零,即所有任務(wù)都執(zhí)行完成時,才會通過 sync.runtime_Semrelease 喚醒處于等待狀態(tài)的 Goroutine。

sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法傳入了 -1

func (wg *WaitGroup) Done() {
   wg.Add(-1)
}

sync.WaitGroup 的另一個方法 sync.WaitGroup.Wait 會在計數(shù)器大于 0 并且不存在等待的 Goroutine 時,調(diào)用 runtime.sync_runtime_Semacquire 陷入睡眠。

func (wg *WaitGroup) Wait() {
   statep, semap := wg.state()

   if race.Enabled {
      _ = *statep // trigger nil deref early
      race.Disable()
   }

   // 循環(huán)判斷 是否滿足退出條件 
   for {

      //worker計數(shù)器:v 是 statep *uint64 的左32位
      //waiter計數(shù)器:w 是 statep *uint64 的右32位
      state := atomic.LoadUint64(statep)
      v := int32(state >> 32)
      w := uint32(state)

      // 當(dāng)計數(shù)器為0時,表示所有Goroutine 都執(zhí)行完成,立即返回
      if v == 0 {
         if race.Enabled {
            race.Enable()
            race.Acquire(unsafe.Pointer(wg))
         }
         return
      }

      // 更新waiter計數(shù)器  atomic.CompareAndSwapUint64 對uint64值執(zhí)行比較和交換操作
      if atomic.CompareAndSwapUint64(statep, state, state+1) {
         if race.Enabled && w == 0 {
            race.Write(unsafe.Pointer(semap))
         }
         // Goroutine 進(jìn)入睡眠狀態(tài),等待信號量喚醒
         runtime_Semacquire(semap)

         if *statep != 0 {
            panic("sync: WaitGroup is reused before previous Wait has returned")
         }

         if race.Enabled {
            race.Enable()
            race.Acquire(unsafe.Pointer(wg))
         }

         return
      } 
      
   }
}

當(dāng) sync.WaitGroup 的計數(shù)器歸零時,陷入睡眠狀態(tài)的 Goroutine 會被喚醒,上述方法也會立刻返回。

通過對 sync.WaitGroup 的分析和研究,我們能夠得出以下結(jié)論:

  • sync.WaitGroup 必須在 sync.WaitGroup.Wait 方法返回之后才能被重新使用;
  • sync.WaitGroup.Done 只是對 sync.WaitGroup.Add 方法的簡單封裝,我們可以向 sync.WaitGroup.Add 方法傳入任意負(fù)數(shù)(需要保證計數(shù)器非負(fù))快速將計數(shù)器歸零以喚醒等待的 Goroutine;
  • 可以同時有多個 Goroutine 等待當(dāng)前 sync.WaitGroup 計數(shù)器的歸零,這些 Goroutine 會被同時喚醒;

once

Go 語言標(biāo)準(zhǔn)庫中 sync.Once 可以保證在 Go 程序運行期間的某段代碼只會執(zhí)行一次

結(jié)構(gòu)體

每一個 sync.Once 結(jié)構(gòu)體中都只包含一個用于標(biāo)識代碼塊是否執(zhí)行過的 done 以及一個互斥鎖 sync.Mutex:

type Once struct {
   done uint32
   m    Mutex
}

接口

sync.Once.Do 是 sync.Once 結(jié)構(gòu)體對外唯一暴露的方法,該方法會接收一個入?yún)榭盏暮瘮?shù):

  • 如果傳入的函數(shù)已經(jīng)執(zhí)行過,會直接返回;
  • 如果傳入的函數(shù)沒有執(zhí)行過,會調(diào)用 sync.Once.doSlow 執(zhí)行傳入的函數(shù):
func (o *Once) Do(f func()) {
   if atomic.LoadUint32(&o.done) == 0 {
      o.doSlow(f)
   }
}

func (o *Once) doSlow(f func()) {
   o.m.Lock()
   defer o.m.Unlock()
   if o.done == 0 {
      defer atomic.StoreUint32(&o.done, 1)
      f()
   }
}
  1. 為當(dāng)前 Goroutine 獲取互斥鎖;
  2. 執(zhí)行傳入的無入?yún)⒑瘮?shù);
  3. 運行延遲函數(shù)調(diào)用,將成員變量 done 更新成 1;

sync.Once 會通過成員變量 done 確保函數(shù)不會執(zhí)行第二次。

作為用于保證函數(shù)執(zhí)行次數(shù)的 sync.Once 結(jié)構(gòu)體,它使用互斥鎖和 sync/atomic 包提供的方法實現(xiàn)了某個函數(shù)在程序運行期間只能執(zhí)行一次的語義。在使用該結(jié)構(gòu)體時,我們也需要注意以下的問題:

  • sync.Once.Do方法中傳入的函數(shù)只會被執(zhí)行一次,哪怕函數(shù)中發(fā)生了 panic;
  • 兩次調(diào)用 sync.Once.Do 方法傳入不同的函數(shù)只會執(zhí)行第一次調(diào)傳入的函數(shù);

Cond

sync.Cond 用來協(xié)調(diào)想要訪問共享資源的 goroutine。
sync.Cond 經(jīng)常用在多個 goroutine 等待,一個 goroutine 通知(事件發(fā)生)的場景。如果是一個通知,一個等待,使用互斥鎖或 channel 就能搞定了。
使用場景

  • 有一個協(xié)程在異步地接收數(shù)據(jù),剩下的多個協(xié)程必須等待這個協(xié)程接收完數(shù)據(jù),才能讀取到正確的數(shù)據(jù)。在這種情況下,如果單純使用 chan 或互斥鎖,那么只能有一個協(xié)程可以等待,并讀取到數(shù)據(jù),沒辦法通知其他的協(xié)程也讀取數(shù)據(jù)。
  • 這個時候,就需要有個全局的變量來標(biāo)志第一個協(xié)程數(shù)據(jù)是否接受完畢,剩下的協(xié)程,反復(fù)檢查該變量的值,直到滿足要求。或者創(chuàng)建多個 channel,每個協(xié)程阻塞在一個 channel 上,由接收數(shù)據(jù)的協(xié)程在數(shù)據(jù)接收完畢后,逐個通知??傊?,需要額外的復(fù)雜度來完成這件事

結(jié)構(gòu)體

type Cond struct {
   noCopy noCopy
   L Locker
   notify  notifyList
   checker copyChecker
}
  • noCopy — 用于保證結(jié)構(gòu)體不會在編譯期間拷貝;
  • copyChecker — 用于禁止運行期間發(fā)生的拷貝;
  • L — 用于保護內(nèi)部的 notify 字段,Locker 接口類型的變量;
  • notify — 一個 Goroutine 的鏈表,它是實現(xiàn)同步機制的核心結(jié)構(gòu);
type notifyList struct {
   wait   uint32
   notify uint32
   lock   uintptr // key field of the mutex
   head   unsafe.Pointer
   tail   unsafe.Pointer
}

在 sync.notifyList 結(jié)構(gòu)體中,head 和 tail 分別指向的鏈表的頭和尾,wait 和 notify 分別表示當(dāng)前正在等待的和已經(jīng)通知到的 Goroutine 的索引。

type copyChecker uintptr

func (c *copyChecker) check() {
   if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
      !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
      uintptr(*c) != uintptr(unsafe.Pointer(c)) {
      panic("sync.Cond is copied")
   }
}
newCond
func NewCond(l Locker) *Cond {
   return &Cond{L: l}
}

NewCond 創(chuàng)建 Cond 實例時,需要關(guān)聯(lián)一個鎖。

接口

wait

sync.Cond 對外暴露的 sync.Cond.Wait 方法會將當(dāng)前 Goroutine 陷入休眠狀態(tài),它的執(zhí)行過程分成以下兩個步驟:

  • 調(diào)用 runtime.notifyListAdd 將等待計數(shù)器加一并解鎖;
  • 調(diào)用 runtime.notifyListWait 等待其他 Goroutine 的喚醒并加鎖:
func (c *Cond) Wait() {
   // 檢查c是否是被復(fù)制的,如果是就panic
   c.checker.check()
   // 將當(dāng)前goroutine加入等待隊列
   t := runtime_notifyListAdd(&c.notify)
   // 解鎖
   c.L.Unlock()
   // 等待隊列中的所有的goroutine執(zhí)行等待喚醒操作
   runtime_notifyListWait(&c.notify, t)
   // 鎖
   c.L.Lock()
}

調(diào)用 Wait 會自動釋放鎖 c.L,并掛起調(diào)用者所在的 goroutine,因此當(dāng)前協(xié)程會阻塞在 Wait 方法調(diào)用的地方。
如果其他協(xié)程調(diào)用了 Signal 或 Broadcast 喚醒了該協(xié)程,那么 Wait 方法在結(jié)束阻塞時,會重新給 c.L 加鎖,并且繼續(xù)執(zhí)行 Wait 后面的代碼。

runtime.notifyListWait 會獲取當(dāng)前 Goroutine 并將它追加到 Goroutine 通知鏈表的最末端:

func notifyListWait(l *notifyList, t uint32) {
        s := acquireSudog()
        s.g = getg()
        s.ticket = t
        if l.tail == nil {
                l.head = s
        } else {
                l.tail.next = s
        }
        l.tail = s
        goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
        releaseSudog(s)
}

除了將當(dāng)前 Goroutine 追加到鏈表的末端之外,我們還會調(diào)用 runtime.goparkunlock 將當(dāng)前 Goroutine 陷入休眠,該函數(shù)也是在 Go 語言切換 Goroutine 時經(jīng)常會使用的方法,它會直接讓出當(dāng)前處理器的使用權(quán)并等待調(diào)度器的喚醒。


image.png

sync.Cond.Signal 和 sync.Cond.Broadcast 就是用來喚醒陷入休眠的 Goroutine 的方法,它們的實現(xiàn)有一些細(xì)微的差別:

  • sync.Cond.Signal 方法會喚醒隊列最前面的 Goroutine;
  • sync.Cond.Broadcast 方法會喚醒隊列中全部的 Goroutine;
func (c *Cond) Signal() {
   c.checker.check()
   runtime_notifyListNotifyOne(&c.notify)
}

Signal 只喚醒任意 1 個等待條件變量 c 的 goroutine,無需鎖保護。

func (c *Cond) Broadcast() {
   c.checker.check()
   runtime_notifyListNotifyAll(&c.notify)
}

Broadcast 喚醒所有等待條件變量 c 的 goroutine,無需鎖保護。

runtime.notifyListNotifyOne 只會從 sync.notifyList 鏈表中找到滿足 sudog.ticket == l.notify 條件的 Goroutine 并通過 runtime.readyWithTime 喚醒:

func notifyListNotifyOne(l *notifyList) {
        t := l.notify
        atomic.Store(&l.notify, t+1)

        for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
                if s.ticket == t {
                        n := s.next
                        if p != nil {
                                p.next = n
                        } else {
                                l.head = n
                        }
                        if n == nil {
                                l.tail = p
                        }
                        s.next = nil
                        readyWithTime(s, 4)
                        return
                }
        }
}

runtime.notifyListNotifyAll 會依次通過 runtime.readyWithTime 喚醒鏈表中 Goroutine:

func notifyListNotifyAll(l *notifyList) {
        s := l.head
        l.head = nil
        l.tail = nil

        atomic.Store(&l.notify, atomic.Load(&l.wait))

        for s != nil {
                next := s.next
                s.next = nil
                readyWithTime(s, 4)
                s = next
        }
}

Goroutine 的喚醒順序也是按照加入隊列的先后順序,先加入的會先被喚醒,而后加入的可能 Goroutine 需要等待調(diào)度器的調(diào)度。
在一般情況下,我們都會先調(diào)用 sync.Cond.Wait 陷入休眠等待滿足期望條件,當(dāng)滿足喚醒條件時,就可以選擇使用 sync.Cond.Signal 或者 sync.Cond.Broadcast 喚醒一個或者全部的 Goroutine。
Cond使用例子

var status int64

func main() {
        c := sync.NewCond(&sync.Mutex{})
        for i := 0; i < 10; i++ {
                go listen(c)
        }
        time.Sleep(1 * time.Second)
        go broadcast(c)

        ch := make(chan os.Signal, 1)
        signal.Notify(ch, os.Interrupt)
        <-ch
}

func broadcast(c *sync.Cond) {
        c.L.Lock()
        atomic.StoreInt64(&status, 1)
        c.Broadcast()
        c.L.Unlock()
}

func listen(c *sync.Cond) {
        c.L.Lock()
        for atomic.LoadInt64(&status) != 1 {
                c.Wait()
        }
        fmt.Println("listen")
        c.L.Unlock()
}

$ go run main.go
listen
...
listen

sync.Cond 不是一個常用的同步機制,但是在條件長時間無法滿足時,與使用 for {} 進(jìn)行忙碌等待相比,sync.Cond 能夠讓出處理器的使用權(quán),提高 CPU 的利用率。使用時我們也需要注意以下問題:

  • sync.Cond.Wait 在調(diào)用之前一定要使用獲取互斥鎖,否則會觸發(fā)程序崩潰;
  • sync.Cond.Signal 喚醒的 Goroutine 都是隊列最前面、等待最久的 Goroutine;
  • sync.Cond.Broadcast 會按照一定順序廣播通知等待的全部 Goroutine;
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容