go源碼解析之TCP連接(四)——Write

go源碼解析之TCP連接系列基于go源碼1.16.5

網(wǎng)絡(luò)數(shù)據(jù)發(fā)送

上一章我們通過跟蹤TCPConn的Read方法,了解了讀取數(shù)據(jù)的過程,本章將通過TCPConn的Write方法的跟蹤來了解數(shù)據(jù)寫入的過程。

1. conn的Write方法

從上一章了解到TCPConn繼承自conn,它的Write方法就是conn的Write,代碼如下:

src/net/net.go

func (c *conn) Write(b []byte) (int, error) {
    ...
    n, err := c.fd.Write(b)
    ...
    return n, err
}

conn的Write方法調(diào)用了netFD的Write方法:

src/net/fd_posix.go

func (fd *netFD) Write(p []byte) (nn int, err error) {
    nn, err = fd.pfd.Write(p)
    runtime.KeepAlive(fd)
    return nn, wrapSyscallError(writeSyscallName, err)
}

pfd則是poll.FD,看一下它的Write方法:

src/internal/poll/fd_unix.go

func (fd *FD) Write(p []byte) (int, error) {
    if err := fd.writeLock(); err != nil {
        return 0, err
    }
    defer fd.writeUnlock()
    ...
    var nn int
    for {
        max := len(p)
        ...
        n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
        if n > 0 {
            nn += n
        }
        if nn == len(p) {
            return nn, err
        }
        if err == syscall.EAGAIN && fd.pd.pollable() {
            if err = fd.pd.waitWrite(fd.isFile); err == nil {
                continue
            }
        }
        if err != nil {
            return nn, err
        }
        if n == 0 {
            return nn, io.ErrUnexpectedEOF
        }
    }
}

和Read方法一樣,當(dāng)遇到EAGAIN錯誤且pollable為true,進行等待。些許不同的是,Write方法的for循環(huán)中會保證傳入的數(shù)據(jù)都寫完才返回。

2. poll.FD的加鎖方法

回到方法開頭的writeLock,其實在第二章和第三章的Accept和Read方法的開頭都有readLock和readUnlock操作,只是當(dāng)時為了減少文章篇幅省略了。下面把這塊給補回來。

看一下poll.FD的writeLock、writeUnlock、readLock、readUnlock代碼:

src/internal/poll/fd_mutex.go

// readLock adds a reference to fd and locks fd for reading.
// It returns an error when fd cannot be used for reading.
func (fd *FD) readLock() error {
    if !fd.fdmu.rwlock(true) {
        return errClosing(fd.isFile)
    }
    return nil
}

// readUnlock removes a reference from fd and unlocks fd for reading.
// It also closes fd when the state of fd is set to closed and there
// is no remaining reference.
func (fd *FD) readUnlock() {
    if fd.fdmu.rwunlock(true) {
        fd.destroy()
    }
}

// writeLock adds a reference to fd and locks fd for writing.
// It returns an error when fd cannot be used for writing.
func (fd *FD) writeLock() error {
    if !fd.fdmu.rwlock(false) {
        return errClosing(fd.isFile)
    }
    return nil
}

// writeUnlock removes a reference from fd and unlocks fd for writing.
// It also closes fd when the state of fd is set to closed and there
// is no remaining reference.
func (fd *FD) writeUnlock() {
    if fd.fdmu.rwunlock(false) {
        fd.destroy()
    }
}

每個鎖方法都是調(diào)用了fdmu的rwlock或rwunlock,fdmu則是FD中的一個fdMutex類型的成員變量。先看一下fdMutex的結(jié)構(gòu)定義:

src/internal/poll/fd_mutex.go

// fdMutex是管理fd生命周期以及使得FD的Read、Write和Close方法串行執(zhí)行的同步原語
type fdMutex struct {
    state uint64
    rsema uint32
    wsema uint32
}

// fdMutex.state 不同的字節(jié)代表了不同含義:
// 第1個比特位 - 代表FD是否被關(guān)閉, 如果設(shè)置為1,所有后續(xù)的鎖操作都會失敗.
// 第2個比特位 - 代表鎖定讀.
// 第3個比特位 - 代表鎖定寫.
// 第4至23個比特位 - 代表對FD的總引用數(shù)量,包括讀、寫和其他雜項(例如設(shè)置socketopt).
// 第24至43個比特位 - 等待進行讀操作的等待數(shù)量.
// 第44至63個比特位 - 等待進行寫操作的等待數(shù)量.
const (
    mutexClosed  = 1 << 0
    mutexRLock   = 1 << 1
    mutexWLock   = 1 << 2
    mutexRef     = 1 << 3
    mutexRefMask = (1<<20 - 1) << 3
    mutexRWait   = 1 << 23
    mutexRMask   = (1<<20 - 1) << 23
    mutexWWait   = 1 << 43
    mutexWMask   = (1<<20 - 1) << 43
)

可以看到fdMutex的state變量的不同位代表了不同的狀態(tài),當(dāng)?shù)?個比特位為1,表示fd已經(jīng)關(guān)閉,加鎖操作會直接返回;當(dāng)?shù)?個比特位為1,表示讀鎖定,需要等待讀鎖釋放,為0則代表可以獲取讀鎖;當(dāng)?shù)?個比特位為1,表示寫鎖定,需要等待寫鎖釋放,為0則表示可以獲取寫鎖。

加鎖方法rwlock的代碼我們通過注釋的方式進行講解:

src/internal/poll/fd_mutex.go

func (mu *fdMutex) rwlock(read bool) bool {
    var mutexBit, mutexWait, mutexMask uint64
    var mutexSema *uint32
    if read {
        mutexBit = mutexRLock
        mutexWait = mutexRWait
        mutexMask = mutexRMask
        mutexSema = &mu.rsema
    } else {
        mutexBit = mutexWLock
        mutexWait = mutexWWait
        mutexMask = mutexWMask
        mutexSema = &mu.wsema
    }
    for {
        //通過原子方法獲取state的值,并判斷關(guān)閉位是否被設(shè)置1
        old := atomic.LoadUint64(&mu.state)
        if old&mutexClosed != 0 {
            return false
        }
       //判斷鎖定位
        var new uint64
        if old&mutexBit == 0 {
            // 沒有被鎖定,設(shè)置state的新值為被鎖定且增加引用計數(shù)
            new = (old | mutexBit) + mutexRef
           // 判斷引用計數(shù)是否溢出
            if new&mutexRefMask == 0 {
                panic(overflowMsg)
            }
        } else {
            // 已經(jīng)被鎖定,增加state中的等待計數(shù)
            new = old + mutexWait
           // 判斷等待計數(shù)是否溢出
            if new&mutexMask == 0 {
                panic(overflowMsg)
            }
        }
       // 通過CAS原子方式設(shè)置state為新值
        if atomic.CompareAndSwapUint64(&mu.state, old, new) {
           // 設(shè)置成功且之前是未鎖定狀態(tài),獲取鎖成功
            if old&mutexBit == 0 {
                return true
            }
           // 之前是鎖定狀態(tài),使用信號量將自身阻塞,等待鎖釋放后的喚醒信號
            runtime_Semacquire(mutexSema)
            // The signaller has subtracted mutexWait.
        }
       // 如果CAS方法沒有成功,則返回循環(huán)其實為止,繼續(xù)獲取最新的state值
    }
}

以上代碼有幾個點需要單獨講解,方便對代碼的理解:

  1. rwlock方法是讀鎖和寫鎖共用的方法,和我們平時接觸到的讀寫鎖有所不同,這里的讀鎖和寫鎖不是互斥的,其實原因也很簡單,socket的讀和寫并不沖突,不需要加鎖形成互斥(其實socket本身的讀寫是多線程安全的,這里的鎖主要是為了鎖定fd.pd.waitWrite操作,pd.waitWrite涉及go語言實現(xiàn)io多路復(fù)用,后續(xù)專門章節(jié)講解)
  2. CompareAndSwapUint64是一個原子方法,它對比state當(dāng)前值與old值是否相同,如相同則將state設(shè)置為new值,且返回true;如不相同則返回false。而for循環(huán)的開頭則調(diào)用了LoadUint64原子方法,將old設(shè)置為state的最新值。在for循環(huán)的最后,通過調(diào)用CompareAndSwapUint64判斷state的值是否已經(jīng)被其他協(xié)程改變,如果沒有改變,可以設(shè)置計算出的新值;如果發(fā)生了變化,則返回for循環(huán)的開頭,繼續(xù)取state的最新值進行計算和判斷。
  3. 可能有的同學(xué)會對for循環(huán)的性能有疑問,可能會覺得鎖這里的并發(fā)高了以后,CAS頻繁失敗,for循環(huán)執(zhí)行次數(shù)太多導(dǎo)致吃cpu。我們從兩方面來打消這種疑慮,一是從并行角度:多cpu每個cpu上運行的線程并行執(zhí)行相同fd的rwlock代碼,這樣的并行數(shù)量取決于cpu數(shù)量,不可能高;二是從并發(fā)角度:例如正在執(zhí)行rwlock的協(xié)程或者線程被搶占,當(dāng)再此被喚醒執(zhí)行時,其他的協(xié)程或者線程已經(jīng)執(zhí)行過相同fd的rwlock代碼,導(dǎo)致state值變了,CAS失敗,返回for循環(huán)開頭繼續(xù)計算,這種情況出現(xiàn)在高并發(fā)且大部分協(xié)程在rwlock代碼執(zhí)行過程中被搶占,不能說不可能出現(xiàn),但是概率很小。

rwunlock的方法就不再詳細講解了,代碼中標(biāo)有部分注釋:

src/internal/poll/fd_mutex.go

func (mu *fdMutex) rwunlock(read bool) bool {
    var mutexBit, mutexWait, mutexMask uint64
    var mutexSema *uint32
    if read {
        mutexBit = mutexRLock
        mutexWait = mutexRWait
        mutexMask = mutexRMask
        mutexSema = &mu.rsema
    } else {
        mutexBit = mutexWLock
        mutexWait = mutexWWait
        mutexMask = mutexWMask
        mutexSema = &mu.wsema
    }
    for {
        // 通過原子方法獲取state的最新值
        old := atomic.LoadUint64(&mu.state)
        // 如果沒有加鎖或者引用數(shù)為0,panic(一般是沒有成對出現(xiàn)rwlock導(dǎo)致)
        if old&mutexBit == 0 || old&mutexRefMask == 0 {
            panic("inconsistent poll.fdMutex")
        }
        // 將新值的鎖標(biāo)記位清空并較少一次引用
        new := (old &^ mutexBit) - mutexRef
       // 減少一個等待數(shù)量
        if old&mutexMask != 0 {
            new -= mutexWait
        }
       // CAS
        if atomic.CompareAndSwapUint64(&mu.state, old, new) {
           // 如果之前有等待者,發(fā)送信號量喚醒等待者
            if old&mutexMask != 0 {
                runtime_Semrelease(mutexSema)
            }
           // 如果已經(jīng)關(guān)閉且沒有其他引用,返回已關(guān)閉
            return new&(mutexClosed|mutexRefMask) == mutexClosed
        }
    }
}

fdMutex除了對fd的讀寫做并發(fā)控制,還控制了fd的生命周期,下面看一下increfAndClose方法:

func (mu *fdMutex) increfAndClose() bool {
    for {
       // 依然是先判斷是否已關(guān)閉
        old := atomic.LoadUint64(&mu.state)
        if old&mutexClosed != 0 {
            return false
        }
        // 設(shè)置state的新值為關(guān)閉狀態(tài)且引用次數(shù)加一
        new := (old | mutexClosed) + mutexRef
        if new&mutexRefMask == 0 {
            panic(overflowMsg)
        }
        // 清空讀等待數(shù)和寫等待數(shù)
        new &^= mutexRMask | mutexWMask
        // CAS
        if atomic.CompareAndSwapUint64(&mu.state, old, new) {
            // 喚醒所有等待者
            for old&mutexRMask != 0 {
                old -= mutexRWait
                runtime_Semrelease(&mu.rsema)
            }
            for old&mutexWMask != 0 {
                old -= mutexWWait
                runtime_Semrelease(&mu.wsema)
            }
            return true
        }
    }
}

rwlock、rwunlock、increfAndClose三個方法看完后,我們接著將state中除了低3位代表的關(guān)閉位、讀鎖位、寫鎖位外的引用計數(shù)、寫等待計數(shù)、讀等待計數(shù)的作用總結(jié)一下:

  1. 引用計數(shù),每個操作fd的方法中都會首先增加引用計數(shù),方法結(jié)束再減少引用。它標(biāo)記了fd正在被引用的次數(shù),在設(shè)置fd關(guān)閉后必須等待不再有引用才能銷毀fd,否則在其他方法還在引用fd的時候,fd被銷毀,將產(chǎn)生不可預(yù)知的錯誤。
  2. 寫等待計數(shù)、讀等待計數(shù),標(biāo)記等待者的數(shù)量,當(dāng)解鎖時如果等待數(shù)量不為0,需要喚醒一個等待者,當(dāng)關(guān)閉fd時,需要將所有等待者喚醒,等待者喚醒后發(fā)現(xiàn)fd被設(shè)置為關(guān)閉,再進入關(guān)閉判斷分支執(zhí)行。

3. 小結(jié)

本章通過跟蹤conn的Write方法,了解了網(wǎng)絡(luò)數(shù)據(jù)寫入的過程及poll.FD的讀寫鎖??偨Y(jié)為以下幾點:

  1. conn的Write方法調(diào)用了netFD的Write,netFD的Write方法又調(diào)用了poll.FD的Write
  2. poll.FD的rwlock通過原子操作更新讀、寫和關(guān)閉狀態(tài),當(dāng)鎖獲取失敗時調(diào)用信號量方法將自身阻塞并等待其他協(xié)程解鎖后釋放信號量。

下一章我們將對TCPConn的Close方法進行跟蹤,來了解連接關(guān)閉的過程。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容