前言
可以從 Go 源碼目錄結(jié)構(gòu)和對(duì)應(yīng)代碼文件了解 Go 在不同平臺(tái)下的網(wǎng)絡(luò) I/O 模式的實(shí)現(xiàn)。比如,在 Linux 系統(tǒng)下基于 epoll,freeBSD 系統(tǒng)下基于 kqueue,以及 Windows 系統(tǒng)下基于 iocp。
因?yàn)槲覀兊拇a都是部署在Linux上的,所以本文以epoll封裝實(shí)現(xiàn)為例子來(lái)講解Go語(yǔ)言中I/O多路復(fù)用的源碼實(shí)現(xiàn)。
介紹
I/O多路復(fù)用
所謂 I/O 多路復(fù)用指的就是 select/epoll 這一系列的多路選擇器:支持單一線(xiàn)程同時(shí)監(jiān)聽(tīng)多個(gè)文件描述符(I/O 事件),阻塞等待,并在其中某個(gè)文件描述符可讀寫(xiě)時(shí)收到通知。以防很多同學(xué)對(duì)select或epoll不那么熟悉,所以下面先來(lái)講講這兩個(gè)選擇器。
首先我們先說(shuō)一下什么是文件描述符(File descriptor),根據(jù)它的英文首字母也簡(jiǎn)稱(chēng)FD,它是一個(gè)用于表述指向文件的引用的抽象化概念。它是一個(gè)索引值,指向內(nèi)核為每一個(gè)進(jìn)程所維護(hù)的該進(jìn)程打開(kāi)文件的記錄表。當(dāng)程序打開(kāi)一個(gè)現(xiàn)有文件或者創(chuàng)建一個(gè)新文件時(shí),內(nèi)核向進(jìn)程返回一個(gè)文件描述符。
select
int select(int nfds,
fd_set *restrict readfds,
fd_set *restrict writefds,
fd_set *restrict errorfds,
struct timeval *restrict timeout);
writefds、readfds、和exceptfds是三個(gè)文件描述符集合。select會(huì)遍歷每個(gè)集合的前nfds個(gè)描述符,分別找到可以讀取、可以寫(xiě)入、發(fā)生錯(cuò)誤的描述符,統(tǒng)稱(chēng)為就緒的描述符。
timeout參數(shù)表示調(diào)用select時(shí)的阻塞時(shí)長(zhǎng)。如果所有文件描述符都未就緒,就阻塞調(diào)用進(jìn)程,直到某個(gè)描述符就緒,或者阻塞超過(guò)設(shè)置的 timeout 后,返回。如果timeout參數(shù)設(shè)為 NULL,會(huì)無(wú)限阻塞直到某個(gè)描述符就緒;如果timeout參數(shù)設(shè)為 0,會(huì)立即返回,不阻塞。
當(dāng)select函數(shù)返回后,可以通過(guò)遍歷fdset,來(lái)找到就緒的描述符。

select的缺點(diǎn)也列舉一下:
- select最大的缺陷就是單個(gè)進(jìn)程所打開(kāi)的FD是有一定限制的,它由FD_SETSIZE設(shè)置,默認(rèn)值是1024;
- 每次調(diào)用 select,都需要把 fd 集合從用戶(hù)態(tài)拷貝到內(nèi)核態(tài),這個(gè)開(kāi)銷(xiāo)在 fd 很多時(shí)會(huì)很大;
- 每次 kernel 都需要線(xiàn)性?huà)呙枵麄€(gè) fd_set,所以隨著監(jiān)控的描述符 fd 數(shù)量增長(zhǎng),其 I/O 性能會(huì)線(xiàn)性下降;
epoll
epoll是selec的增強(qiáng)版本,避免了“性能開(kāi)銷(xiāo)大”和“文件描述符數(shù)量少”兩個(gè)缺點(diǎn)。
為方便理解后續(xù)的內(nèi)容,先看一下epoll的用法:
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
bind(listenfd, ...)
listen(listenfd, ...)
int epfd = epoll_create(...);
epoll_ctl(epfd, ...); //將所有需要監(jiān)聽(tīng)的fd添加到epfd中
while(1){
int n = epoll_wait(...)
for(接收到數(shù)據(jù)的socket){
//處理
}
}
先用epoll_create創(chuàng)建一個(gè)epoll對(duì)象實(shí)例epfd,同時(shí)返回一個(gè)引用該實(shí)例的文件描述符,返回的文件描述符僅僅指向?qū)?yīng)的epoll實(shí)例,并不表示真實(shí)的磁盤(pán)文件節(jié)點(diǎn)。
epoll實(shí)例內(nèi)部存儲(chǔ):
- 監(jiān)聽(tīng)列表:所有要監(jiān)聽(tīng)的文件描述符,使用紅黑樹(shù);
- 就緒列表:所有就緒的文件描述符,使用鏈表;
再通過(guò)epoll_ctl將需要監(jiān)視的fd添加到epfd中,同時(shí)為fd設(shè)置一個(gè)回調(diào)函數(shù),并監(jiān)聽(tīng)事件event,并添加到監(jiān)聽(tīng)列表中。當(dāng)有事件發(fā)生時(shí),會(huì)調(diào)用回調(diào)函數(shù),并將fd添加到epoll實(shí)例的就緒隊(duì)列上。
最后調(diào)用epoll_wait阻塞監(jiān)聽(tīng) epoll 實(shí)例上所有的fd的 I/O 事件。當(dāng)就緒列表中已有數(shù)據(jù),那么epoll_wait直接返回,解決了select每次都需要輪詢(xún)一遍的問(wèn)題。
epoll的優(yōu)點(diǎn):
epoll的監(jiān)聽(tīng)列表使用紅黑樹(shù)存儲(chǔ),epoll_ctl 函數(shù)添加進(jìn)來(lái)的 fd 都會(huì)被放在紅黑樹(shù)的某個(gè)節(jié)點(diǎn)內(nèi),而紅黑樹(shù)本身插入和刪除性能比較穩(wěn)定,時(shí)間復(fù)雜度 O(logN),并且可以存儲(chǔ)大量的的fd,避免了只能存儲(chǔ)1024個(gè)fd的限制;
epoll_ctl 中為每個(gè)文件描述符指定了回調(diào)函數(shù),并在就緒時(shí)將其加入到就緒列表,因此不需要像select一樣遍歷檢測(cè)每個(gè)文件描述符,只需要判斷就緒列表是否為空即可;
解析
netpoll本質(zhì)上是對(duì) I/O 多路復(fù)用技術(shù)的封裝,所以自然也是和epoll一樣脫離不了下面幾步:
- netpoll創(chuàng)建及其初始化;
- 向netpoll中加入待監(jiān)控的任務(wù);
- 從netpoll獲取觸發(fā)的事件;
在go中對(duì)epoll提供的三個(gè)函數(shù)進(jìn)行了封裝:
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(delay int64) gList
netpollinit函數(shù)負(fù)責(zé)初始化netpoll;
netpollopen負(fù)責(zé)監(jiān)聽(tīng)文件描述符上的事件;
netpoll會(huì)阻塞等待返回一組已經(jīng)準(zhǔn)備就緒的 Goroutine;
下面是Go語(yǔ)言中編寫(xiě)的一個(gè)TCP server:
func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println("listen error: ", err)
return
}
for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept error: ", err)
break
}
// 創(chuàng)建一個(gè)goroutine來(lái)負(fù)責(zé)處理讀寫(xiě)任務(wù)
go HandleConn(conn)
}
}
下面我們跟著這個(gè)TCP server的源碼一起看看是在哪里使用了netpoll來(lái)完成epoll的調(diào)用。
net.Listen
這個(gè)TCP server中會(huì)調(diào)用net.Listen創(chuàng)建一個(gè)socket同時(shí)返回與之對(duì)應(yīng)的fd,該fd用來(lái)初始化listener的netFD(go層面封裝的網(wǎng)絡(luò)文件描述符),接著調(diào)用 netFD的listenStream方法完成對(duì) socket 的 bind&listen和netFD的初始化。
調(diào)用過(guò)程如下:

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// 創(chuàng)建一個(gè)socket
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
...
// 創(chuàng)建fd
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
// 調(diào)用 netFD的listenStream方法完成對(duì) socket 的 bind&listen和netFD的初始化
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
...
}
}
...
return fd, nil
}
func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}
sysSocket方法會(huì)發(fā)起一個(gè)系統(tǒng)調(diào)用創(chuàng)建一個(gè)socket,newFD會(huì)創(chuàng)建一個(gè)netFD,然后調(diào)用netFD的listenStream方法進(jìn)行bind&listen操作,并對(duì)netFD進(jìn)行init。

netFD是一個(gè)文件描述符的封裝,netFD中包含一個(gè)FD數(shù)據(jù)結(jié)構(gòu),F(xiàn)D中包含了Sysfd 和pollDesc兩個(gè)重要的數(shù)據(jù)結(jié)構(gòu),Sysfd是sysSocket返回的socket系統(tǒng)文件描述符,pollDesc用于監(jiān)控文件描述符的可讀或者可寫(xiě)。
我們繼續(xù)看listenStream:
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
...
// 完成綁定操作
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// 進(jìn)行監(jiān)聽(tīng)操作
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// 初始化fd
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
listenStream方法會(huì)調(diào)用Bind方法完成fd的綁定操作,然后調(diào)用listenFunc進(jìn)行監(jiān)聽(tīng),接著調(diào)用fd的init方法,完成FD、pollDesc初始化。
func (pd *pollDesc) init(fd *FD) error {
// 調(diào)用到runtime.poll_runtime_pollServerInit
serverInit.Do(runtime_pollServerInit)
// 調(diào)用到runtime.poll_runtime_pollOpen
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
...
return nil
}
runtime_pollServerInit用Once封裝保證只能被調(diào)用一次,這個(gè)函數(shù)在Linux平臺(tái)上會(huì)創(chuàng)建一個(gè)epoll文件描述符實(shí)例;
poll_runtime_pollOpen調(diào)用了netpollopen會(huì)將fd注冊(cè)到 epoll實(shí)例中,并返回一個(gè)pollDesc;
netpollinit初始化
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
netpollGenericInit會(huì)調(diào)用平臺(tái)上特定實(shí)現(xiàn)的netpollinit,在Linux中會(huì)調(diào)用到netpoll_epoll.go的netpollinit方法:
var (
epfd int32 = -1 // epoll descriptor
)
func netpollinit() {
// 創(chuàng)建一個(gè)新的 epoll 文件描述符
epfd = epollcreate1(_EPOLL_CLOEXEC)
...
// 創(chuàng)建一個(gè)用于通信的管道
r, w, errno := nonblockingPipe()
...
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
// 將讀取數(shù)據(jù)的文件描述符加入監(jiān)聽(tīng)
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
...
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
調(diào)用epollcreate1方法會(huì)創(chuàng)建一個(gè)epoll文件描述符實(shí)例,需要注意的是epfd是一個(gè)全局的屬性。然后創(chuàng)建一個(gè)用于通信的管道,調(diào)用epollctl將讀取數(shù)據(jù)的文件描述符加入監(jiān)聽(tīng)。
netpollopen加入事件監(jiān)聽(tīng)
下面再看看poll_runtime_pollOpen方法:
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
if pd.wg != 0 && pd.wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
if pd.rg != 0 && pd.rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.everr = false
pd.rseq++
pd.rg = 0
pd.rd = 0
pd.wseq++
pd.wg = 0
pd.wd = 0
pd.self = pd
unlock(&pd.lock)
var errno int32
errno = netpollopen(fd, pd)
return pd, int(errno)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
poll_runtime_pollOpen方法會(huì)通過(guò)pollcache.alloc初始化總大小約為 4KB的pollDesc結(jié)構(gòu)體。然后重置pd的屬性,調(diào)用netpollopen向epoll實(shí)例epfd加入新的輪詢(xún)事件監(jiān)聽(tīng)文件描述符的可讀和可寫(xiě)狀態(tài)。
下面我們?cè)倏纯磒ollCache是如何初始化pollDesc的。
type pollCache struct {
lock mutex
first *pollDesc
}
const pollBlockSize = 4 * 1024
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
// 初始化首節(jié)點(diǎn)
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
// 初始化pollDesc鏈表
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
lockInit(&pd.lock, lockRankPollDesc)
unlock(&c.lock)
return pd
}
pollCache的鏈表頭如果為空,那么初始化首節(jié)點(diǎn),首節(jié)點(diǎn)是一個(gè)pollDesc的鏈表頭,每次調(diào)用該結(jié)構(gòu)體都會(huì)返回鏈表頭還沒(méi)有被使用的pollDesc。

到這里就完成了net.Listen的分析,下面我們看看listen.Accept。
net.Accept
Listener.Accept方法最終會(huì)調(diào)用到netFD的accept方法中:

func (fd *netFD) accept() (netfd *netFD, err error) {
// 調(diào)用netfd.FD的Accept接受新的 socket 連接,返回 socket 的 fd
d, rsa, errcall, err := fd.pfd.Accept()
...
// 構(gòu)造一個(gè)新的netfd
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
// 調(diào)用 netFD 的 init 方法完成初始化
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
這個(gè)方法首先會(huì)調(diào)用到FD的Accept接受新的 socket 連接,并返回新的socket對(duì)應(yīng)的fd,然后調(diào)用newFD構(gòu)造一個(gè)新的netfd,并通過(guò)init 方法完成初始化。
init方法上面我們已經(jīng)看過(guò)了,下面我們來(lái)看看Accept方法:
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
...
for {
// 使用 linux 系統(tǒng)調(diào)用 accept 接收新連接,創(chuàng)建對(duì)應(yīng)的 socket
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果當(dāng)前沒(méi)有發(fā)生期待的 I/O 事件,那么 waitRead 會(huì)通過(guò) park goroutine 讓邏輯 block 在這里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
continue
}
return -1, nil, errcall, err
}
}
FD.Accept方法會(huì)使用 linux 系統(tǒng)調(diào)用 accept 接收新連接,創(chuàng)建對(duì)應(yīng)的 socket,如果沒(méi)有可讀的消息,waitRead會(huì)被阻塞。這些被park住的goroutine會(huì)在goroutine的調(diào)度中調(diào)用runtime.netpoll被喚醒。
pollWait事件等待
pollDesc.waitRead實(shí)際上是調(diào)用了runtime.poll_runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
...
// 進(jìn)入 netpollblock 并且判斷是否有期待的 I/O 事件發(fā)生
for !netpollblock(pd, int32(mode), false) {
...
}
return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// 這個(gè) for 循環(huán)是為了等待 io ready 或者 io wait
for {
old := *gpp
// gpp == pdReady 表示此時(shí)已有期待的 I/O 事件發(fā)生,
// 可以直接返回 unblock 當(dāng)前 goroutine 并執(zhí)行響應(yīng)的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// 如果沒(méi)有期待的 I/O 事件發(fā)生,則通過(guò)原子操作把 gpp 的值置為 pdWait 并退出 for 循環(huán)
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
if waitio || netpollcheckerr(pd, mode) == 0 {
// 讓出當(dāng)前線(xiàn)程,將 Goroutine 轉(zhuǎn)換到休眠狀態(tài)并等待運(yùn)行時(shí)的喚醒
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
poll_runtime_pollWait會(huì)用for循環(huán)調(diào)用netpollblock函數(shù)判斷是否有期待的 I/O 事件發(fā)生,直到netpollblock返回true表示io ready才會(huì)走出循環(huán)。
netpollblock方法會(huì)判斷當(dāng)前的狀態(tài)是不是處于pdReady,如果是那么直接返回true;如果不是,那么將gpp通過(guò)CAS設(shè)置為pdWait并退出 for 循環(huán)。通過(guò)gopark 把當(dāng)前 goroutine 給 park 住,直到對(duì)應(yīng)的 fd 上發(fā)生可讀/可寫(xiě)或者其他I/O 事件為止。
這些被park住的goroutine會(huì)在goroutine的調(diào)度中調(diào)用runtime.netpoll被喚醒。
netpoll輪詢(xún)等待
runtime.netpoll的核心邏輯是: 根據(jù)入?yún)?delay設(shè)置調(diào)用 epoll_wait 的 timeout 值,調(diào)用 epoll_wait 從 epoll 的 eventpoll.rdllist雙向列表中獲取IO就緒的fd列表,遍歷epoll_wait 返回的fd列表, 根據(jù)調(diào)用epoll_ctl注冊(cè)fd時(shí)封裝的上下文信息組裝可運(yùn)行的 goroutine 并返回。
執(zhí)行完 netpoll 之后,會(huì)返回一個(gè)就緒 fd 列表對(duì)應(yīng)的 goroutine 列表,接下來(lái)將就緒的 goroutine 加入到調(diào)度隊(duì)列中,等待調(diào)度運(yùn)行。
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32
// 因?yàn)閭魅雂elay單位是納秒,下面將納秒轉(zhuǎn)換成毫秒
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]epollevent
retry:
// 等待文件描述符轉(zhuǎn)換成可讀或者可寫(xiě)
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
// 返回負(fù)值,那么重新調(diào)用epollwait進(jìn)行等待
if n < 0 {
...
goto retry
}
var toRun gList
// 意味著被監(jiān)控的文件描述符出現(xiàn)了待處理的事件
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
...
// 判斷發(fā)生的事件類(lèi)型,讀類(lèi)型或者寫(xiě)類(lèi)型
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在 epollevent 里的 pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 調(diào)用 netpollready,傳入就緒 fd 的 pollDesc
netpollready(&toRun, pd, mode)
}
}
return toRun
}
netpoll會(huì)調(diào)用epollwait獲取就緒的 fd 列表,對(duì)應(yīng)的epoll函數(shù)是epoll_wait。toRun是一個(gè) g 的鏈表,存儲(chǔ)要恢復(fù)的 goroutines,最后返回給調(diào)用方。如果epollwait返回的n大于零,那么表示被監(jiān)控的文件描述符出現(xiàn)了待處理的事件,那么需要調(diào)用for循環(huán)進(jìn)行處理。循環(huán)里面會(huì)根據(jù)時(shí)間類(lèi)型設(shè)置mode,然后拿出對(duì)應(yīng)的pollDesc,調(diào)用netpollready方法。
下面我們?cè)倏匆幌耼etpollready:
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
// 獲取對(duì)應(yīng)的g的指針
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
// 將對(duì)應(yīng)的g加入到toRun列表中
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
// 根據(jù)傳入的mode判斷事件類(lèi)型
if mode == 'w' {
gpp = &pd.wg
}
for {
// 取出 gpp 存儲(chǔ)的 g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// cas 將讀或者寫(xiě)信號(hào)量轉(zhuǎn)換成 pdReady
if atomic.Casuintptr(gpp, old, new) {
if old == pdWait {
old = 0
}
// 返回對(duì)應(yīng)的 g指針
return (*g)(unsafe.Pointer(old))
}
}
}
講完了runtime.netpoll的源碼有個(gè)需要注意的地方,調(diào)用runtime.netpoll的地方有兩處:
在調(diào)度器中執(zhí)行runtime.schedule(),該方法中會(huì)執(zhí)行runtime.findrunable(),在runtime.findrunable()中調(diào)用了runtime.netpoll獲取待執(zhí)行的goroutine;
Go runtime 在程序啟動(dòng)的時(shí)候會(huì)創(chuàng)建一個(gè)獨(dú)立的sysmon監(jiān)控線(xiàn)程,sysmon 每 20us~10ms 運(yùn)行一次,每次運(yùn)行會(huì)檢查距離上一次執(zhí)行netpoll是否超過(guò)10ms,如果是則會(huì)調(diào)用一次runtime.netpoll;
這些入口的調(diào)用感興趣的可以自己去看看。
總結(jié)
本文從I/O多路復(fù)用開(kāi)始講解select以及epoll,然后再回到go語(yǔ)言中去看它是如何實(shí)現(xiàn)多路復(fù)用這樣的結(jié)構(gòu)的。通過(guò)追蹤源碼可以發(fā)現(xiàn),其實(shí)go也是根據(jù)epoll來(lái)封裝自己的函數(shù):
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList
通過(guò)這三個(gè)函數(shù)來(lái)實(shí)現(xiàn)對(duì)epoll的創(chuàng)建實(shí)例、注冊(cè)、事件等待操作。