go源碼解析之TCP連接系列基于go源碼1.16.5
網(wǎng)絡(luò)數(shù)據(jù)發(fā)送
上一章我們通過跟蹤TCPConn的Read方法,了解了讀取數(shù)據(jù)的過程,本章將通過TCPConn的Write方法的跟蹤來了解數(shù)據(jù)寫入的過程。
1. conn的Write方法
從上一章了解到TCPConn繼承自conn,它的Write方法就是conn的Write,代碼如下:
src/net/net.go
func (c *conn) Write(b []byte) (int, error) {
...
n, err := c.fd.Write(b)
...
return n, err
}
conn的Write方法調(diào)用了netFD的Write方法:
src/net/fd_posix.go
func (fd *netFD) Write(p []byte) (nn int, err error) {
nn, err = fd.pfd.Write(p)
runtime.KeepAlive(fd)
return nn, wrapSyscallError(writeSyscallName, err)
}
pfd則是poll.FD,看一下它的Write方法:
src/internal/poll/fd_unix.go
func (fd *FD) Write(p []byte) (int, error) {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
...
var nn int
for {
max := len(p)
...
n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
if n > 0 {
nn += n
}
if nn == len(p) {
return nn, err
}
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
if err != nil {
return nn, err
}
if n == 0 {
return nn, io.ErrUnexpectedEOF
}
}
}
和Read方法一樣,當(dāng)遇到EAGAIN錯誤且pollable為true,進行等待。些許不同的是,Write方法的for循環(huán)中會保證傳入的數(shù)據(jù)都寫完才返回。
2. poll.FD的加鎖方法
回到方法開頭的writeLock,其實在第二章和第三章的Accept和Read方法的開頭都有readLock和readUnlock操作,只是當(dāng)時為了減少文章篇幅省略了。下面把這塊給補回來。
看一下poll.FD的writeLock、writeUnlock、readLock、readUnlock代碼:
src/internal/poll/fd_mutex.go
// readLock adds a reference to fd and locks fd for reading.
// It returns an error when fd cannot be used for reading.
func (fd *FD) readLock() error {
if !fd.fdmu.rwlock(true) {
return errClosing(fd.isFile)
}
return nil
}
// readUnlock removes a reference from fd and unlocks fd for reading.
// It also closes fd when the state of fd is set to closed and there
// is no remaining reference.
func (fd *FD) readUnlock() {
if fd.fdmu.rwunlock(true) {
fd.destroy()
}
}
// writeLock adds a reference to fd and locks fd for writing.
// It returns an error when fd cannot be used for writing.
func (fd *FD) writeLock() error {
if !fd.fdmu.rwlock(false) {
return errClosing(fd.isFile)
}
return nil
}
// writeUnlock removes a reference from fd and unlocks fd for writing.
// It also closes fd when the state of fd is set to closed and there
// is no remaining reference.
func (fd *FD) writeUnlock() {
if fd.fdmu.rwunlock(false) {
fd.destroy()
}
}
每個鎖方法都是調(diào)用了fdmu的rwlock或rwunlock,fdmu則是FD中的一個fdMutex類型的成員變量。先看一下fdMutex的結(jié)構(gòu)定義:
src/internal/poll/fd_mutex.go
// fdMutex是管理fd生命周期以及使得FD的Read、Write和Close方法串行執(zhí)行的同步原語
type fdMutex struct {
state uint64
rsema uint32
wsema uint32
}
// fdMutex.state 不同的字節(jié)代表了不同含義:
// 第1個比特位 - 代表FD是否被關(guān)閉, 如果設(shè)置為1,所有后續(xù)的鎖操作都會失敗.
// 第2個比特位 - 代表鎖定讀.
// 第3個比特位 - 代表鎖定寫.
// 第4至23個比特位 - 代表對FD的總引用數(shù)量,包括讀、寫和其他雜項(例如設(shè)置socketopt).
// 第24至43個比特位 - 等待進行讀操作的等待數(shù)量.
// 第44至63個比特位 - 等待進行寫操作的等待數(shù)量.
const (
mutexClosed = 1 << 0
mutexRLock = 1 << 1
mutexWLock = 1 << 2
mutexRef = 1 << 3
mutexRefMask = (1<<20 - 1) << 3
mutexRWait = 1 << 23
mutexRMask = (1<<20 - 1) << 23
mutexWWait = 1 << 43
mutexWMask = (1<<20 - 1) << 43
)
可以看到fdMutex的state變量的不同位代表了不同的狀態(tài),當(dāng)?shù)?個比特位為1,表示fd已經(jīng)關(guān)閉,加鎖操作會直接返回;當(dāng)?shù)?個比特位為1,表示讀鎖定,需要等待讀鎖釋放,為0則代表可以獲取讀鎖;當(dāng)?shù)?個比特位為1,表示寫鎖定,需要等待寫鎖釋放,為0則表示可以獲取寫鎖。
加鎖方法rwlock的代碼我們通過注釋的方式進行講解:
src/internal/poll/fd_mutex.go
func (mu *fdMutex) rwlock(read bool) bool {
var mutexBit, mutexWait, mutexMask uint64
var mutexSema *uint32
if read {
mutexBit = mutexRLock
mutexWait = mutexRWait
mutexMask = mutexRMask
mutexSema = &mu.rsema
} else {
mutexBit = mutexWLock
mutexWait = mutexWWait
mutexMask = mutexWMask
mutexSema = &mu.wsema
}
for {
//通過原子方法獲取state的值,并判斷關(guān)閉位是否被設(shè)置1
old := atomic.LoadUint64(&mu.state)
if old&mutexClosed != 0 {
return false
}
//判斷鎖定位
var new uint64
if old&mutexBit == 0 {
// 沒有被鎖定,設(shè)置state的新值為被鎖定且增加引用計數(shù)
new = (old | mutexBit) + mutexRef
// 判斷引用計數(shù)是否溢出
if new&mutexRefMask == 0 {
panic(overflowMsg)
}
} else {
// 已經(jīng)被鎖定,增加state中的等待計數(shù)
new = old + mutexWait
// 判斷等待計數(shù)是否溢出
if new&mutexMask == 0 {
panic(overflowMsg)
}
}
// 通過CAS原子方式設(shè)置state為新值
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
// 設(shè)置成功且之前是未鎖定狀態(tài),獲取鎖成功
if old&mutexBit == 0 {
return true
}
// 之前是鎖定狀態(tài),使用信號量將自身阻塞,等待鎖釋放后的喚醒信號
runtime_Semacquire(mutexSema)
// The signaller has subtracted mutexWait.
}
// 如果CAS方法沒有成功,則返回循環(huán)其實為止,繼續(xù)獲取最新的state值
}
}
以上代碼有幾個點需要單獨講解,方便對代碼的理解:
- rwlock方法是讀鎖和寫鎖共用的方法,和我們平時接觸到的讀寫鎖有所不同,這里的讀鎖和寫鎖不是互斥的,其實原因也很簡單,socket的讀和寫并不沖突,不需要加鎖形成互斥(其實socket本身的讀寫是多線程安全的,這里的鎖主要是為了鎖定fd.pd.waitWrite操作,pd.waitWrite涉及go語言實現(xiàn)io多路復(fù)用,后續(xù)專門章節(jié)講解)
- CompareAndSwapUint64是一個原子方法,它對比state當(dāng)前值與old值是否相同,如相同則將state設(shè)置為new值,且返回true;如不相同則返回false。而for循環(huán)的開頭則調(diào)用了LoadUint64原子方法,將old設(shè)置為state的最新值。在for循環(huán)的最后,通過調(diào)用CompareAndSwapUint64判斷state的值是否已經(jīng)被其他協(xié)程改變,如果沒有改變,可以設(shè)置計算出的新值;如果發(fā)生了變化,則返回for循環(huán)的開頭,繼續(xù)取state的最新值進行計算和判斷。
- 可能有的同學(xué)會對for循環(huán)的性能有疑問,可能會覺得鎖這里的并發(fā)高了以后,CAS頻繁失敗,for循環(huán)執(zhí)行次數(shù)太多導(dǎo)致吃cpu。我們從兩方面來打消這種疑慮,一是從并行角度:多cpu每個cpu上運行的線程并行執(zhí)行相同fd的rwlock代碼,這樣的并行數(shù)量取決于cpu數(shù)量,不可能高;二是從并發(fā)角度:例如正在執(zhí)行rwlock的協(xié)程或者線程被搶占,當(dāng)再此被喚醒執(zhí)行時,其他的協(xié)程或者線程已經(jīng)執(zhí)行過相同fd的rwlock代碼,導(dǎo)致state值變了,CAS失敗,返回for循環(huán)開頭繼續(xù)計算,這種情況出現(xiàn)在高并發(fā)且大部分協(xié)程在rwlock代碼執(zhí)行過程中被搶占,不能說不可能出現(xiàn),但是概率很小。
rwunlock的方法就不再詳細講解了,代碼中標(biāo)有部分注釋:
src/internal/poll/fd_mutex.go
func (mu *fdMutex) rwunlock(read bool) bool {
var mutexBit, mutexWait, mutexMask uint64
var mutexSema *uint32
if read {
mutexBit = mutexRLock
mutexWait = mutexRWait
mutexMask = mutexRMask
mutexSema = &mu.rsema
} else {
mutexBit = mutexWLock
mutexWait = mutexWWait
mutexMask = mutexWMask
mutexSema = &mu.wsema
}
for {
// 通過原子方法獲取state的最新值
old := atomic.LoadUint64(&mu.state)
// 如果沒有加鎖或者引用數(shù)為0,panic(一般是沒有成對出現(xiàn)rwlock導(dǎo)致)
if old&mutexBit == 0 || old&mutexRefMask == 0 {
panic("inconsistent poll.fdMutex")
}
// 將新值的鎖標(biāo)記位清空并較少一次引用
new := (old &^ mutexBit) - mutexRef
// 減少一個等待數(shù)量
if old&mutexMask != 0 {
new -= mutexWait
}
// CAS
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
// 如果之前有等待者,發(fā)送信號量喚醒等待者
if old&mutexMask != 0 {
runtime_Semrelease(mutexSema)
}
// 如果已經(jīng)關(guān)閉且沒有其他引用,返回已關(guān)閉
return new&(mutexClosed|mutexRefMask) == mutexClosed
}
}
}
fdMutex除了對fd的讀寫做并發(fā)控制,還控制了fd的生命周期,下面看一下increfAndClose方法:
func (mu *fdMutex) increfAndClose() bool {
for {
// 依然是先判斷是否已關(guān)閉
old := atomic.LoadUint64(&mu.state)
if old&mutexClosed != 0 {
return false
}
// 設(shè)置state的新值為關(guān)閉狀態(tài)且引用次數(shù)加一
new := (old | mutexClosed) + mutexRef
if new&mutexRefMask == 0 {
panic(overflowMsg)
}
// 清空讀等待數(shù)和寫等待數(shù)
new &^= mutexRMask | mutexWMask
// CAS
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
// 喚醒所有等待者
for old&mutexRMask != 0 {
old -= mutexRWait
runtime_Semrelease(&mu.rsema)
}
for old&mutexWMask != 0 {
old -= mutexWWait
runtime_Semrelease(&mu.wsema)
}
return true
}
}
}
rwlock、rwunlock、increfAndClose三個方法看完后,我們接著將state中除了低3位代表的關(guān)閉位、讀鎖位、寫鎖位外的引用計數(shù)、寫等待計數(shù)、讀等待計數(shù)的作用總結(jié)一下:
- 引用計數(shù),每個操作fd的方法中都會首先增加引用計數(shù),方法結(jié)束再減少引用。它標(biāo)記了fd正在被引用的次數(shù),在設(shè)置fd關(guān)閉后必須等待不再有引用才能銷毀fd,否則在其他方法還在引用fd的時候,fd被銷毀,將產(chǎn)生不可預(yù)知的錯誤。
- 寫等待計數(shù)、讀等待計數(shù),標(biāo)記等待者的數(shù)量,當(dāng)解鎖時如果等待數(shù)量不為0,需要喚醒一個等待者,當(dāng)關(guān)閉fd時,需要將所有等待者喚醒,等待者喚醒后發(fā)現(xiàn)fd被設(shè)置為關(guān)閉,再進入關(guān)閉判斷分支執(zhí)行。
3. 小結(jié)
本章通過跟蹤conn的Write方法,了解了網(wǎng)絡(luò)數(shù)據(jù)寫入的過程及poll.FD的讀寫鎖??偨Y(jié)為以下幾點:
- conn的Write方法調(diào)用了netFD的Write,netFD的Write方法又調(diào)用了poll.FD的Write
- poll.FD的rwlock通過原子操作更新讀、寫和關(guān)閉狀態(tài),當(dāng)鎖獲取失敗時調(diào)用信號量方法將自身阻塞并等待其他協(xié)程解鎖后釋放信號量。
下一章我們將對TCPConn的Close方法進行跟蹤,來了解連接關(guān)閉的過程。