Go - 網(wǎng)絡(luò)輪詢(xún)器

前言

可以從 Go 源碼目錄結(jié)構(gòu)和對(duì)應(yīng)代碼文件了解 Go 在不同平臺(tái)下的網(wǎng)絡(luò) I/O 模式的實(shí)現(xiàn)。比如,在 Linux 系統(tǒng)下基于 epoll,freeBSD 系統(tǒng)下基于 kqueue,以及 Windows 系統(tǒng)下基于 iocp。

因?yàn)槲覀兊拇a都是部署在Linux上的,所以本文以epoll封裝實(shí)現(xiàn)為例子來(lái)講解Go語(yǔ)言中I/O多路復(fù)用的源碼實(shí)現(xiàn)。

介紹

I/O多路復(fù)用

所謂 I/O 多路復(fù)用指的就是 select/epoll 這一系列的多路選擇器:支持單一線(xiàn)程同時(shí)監(jiān)聽(tīng)多個(gè)文件描述符(I/O 事件),阻塞等待,并在其中某個(gè)文件描述符可讀寫(xiě)時(shí)收到通知。以防很多同學(xué)對(duì)select或epoll不那么熟悉,所以下面先來(lái)講講這兩個(gè)選擇器。

首先我們先說(shuō)一下什么是文件描述符(File descriptor),根據(jù)它的英文首字母也簡(jiǎn)稱(chēng)FD,它是一個(gè)用于表述指向文件的引用的抽象化概念。它是一個(gè)索引值,指向內(nèi)核為每一個(gè)進(jìn)程所維護(hù)的該進(jìn)程打開(kāi)文件的記錄表。當(dāng)程序打開(kāi)一個(gè)現(xiàn)有文件或者創(chuàng)建一個(gè)新文件時(shí),內(nèi)核向進(jìn)程返回一個(gè)文件描述符。

select
int select(int nfds,
            fd_set *restrict readfds,
            fd_set *restrict writefds,
            fd_set *restrict errorfds,
            struct timeval *restrict timeout);

writefds、readfds、和exceptfds是三個(gè)文件描述符集合。select會(huì)遍歷每個(gè)集合的前nfds個(gè)描述符,分別找到可以讀取、可以寫(xiě)入、發(fā)生錯(cuò)誤的描述符,統(tǒng)稱(chēng)為就緒的描述符。

timeout參數(shù)表示調(diào)用select時(shí)的阻塞時(shí)長(zhǎng)。如果所有文件描述符都未就緒,就阻塞調(diào)用進(jìn)程,直到某個(gè)描述符就緒,或者阻塞超過(guò)設(shè)置的 timeout 后,返回。如果timeout參數(shù)設(shè)為 NULL,會(huì)無(wú)限阻塞直到某個(gè)描述符就緒;如果timeout參數(shù)設(shè)為 0,會(huì)立即返回,不阻塞。

當(dāng)select函數(shù)返回后,可以通過(guò)遍歷fdset,來(lái)找到就緒的描述符。



select的缺點(diǎn)也列舉一下:

  1. select最大的缺陷就是單個(gè)進(jìn)程所打開(kāi)的FD是有一定限制的,它由FD_SETSIZE設(shè)置,默認(rèn)值是1024;
  2. 每次調(diào)用 select,都需要把 fd 集合從用戶(hù)態(tài)拷貝到內(nèi)核態(tài),這個(gè)開(kāi)銷(xiāo)在 fd 很多時(shí)會(huì)很大;
  3. 每次 kernel 都需要線(xiàn)性?huà)呙枵麄€(gè) fd_set,所以隨著監(jiān)控的描述符 fd 數(shù)量增長(zhǎng),其 I/O 性能會(huì)線(xiàn)性下降;
epoll

epoll是selec的增強(qiáng)版本,避免了“性能開(kāi)銷(xiāo)大”和“文件描述符數(shù)量少”兩個(gè)缺點(diǎn)。

為方便理解后續(xù)的內(nèi)容,先看一下epoll的用法:

int listenfd = socket(AF_INET, SOCK_STREAM, 0);   
bind(listenfd, ...)
listen(listenfd, ...)

int epfd = epoll_create(...);
epoll_ctl(epfd, ...); //將所有需要監(jiān)聽(tīng)的fd添加到epfd中

while(1){
    int n = epoll_wait(...)
    for(接收到數(shù)據(jù)的socket){
        //處理
    }
}

先用epoll_create創(chuàng)建一個(gè)epoll對(duì)象實(shí)例epfd,同時(shí)返回一個(gè)引用該實(shí)例的文件描述符,返回的文件描述符僅僅指向?qū)?yīng)的epoll實(shí)例,并不表示真實(shí)的磁盤(pán)文件節(jié)點(diǎn)。

epoll實(shí)例內(nèi)部存儲(chǔ):

  • 監(jiān)聽(tīng)列表:所有要監(jiān)聽(tīng)的文件描述符,使用紅黑樹(shù);
  • 就緒列表:所有就緒的文件描述符,使用鏈表;

再通過(guò)epoll_ctl將需要監(jiān)視的fd添加到epfd中,同時(shí)為fd設(shè)置一個(gè)回調(diào)函數(shù),并監(jiān)聽(tīng)事件event,并添加到監(jiān)聽(tīng)列表中。當(dāng)有事件發(fā)生時(shí),會(huì)調(diào)用回調(diào)函數(shù),并將fd添加到epoll實(shí)例的就緒隊(duì)列上。

最后調(diào)用epoll_wait阻塞監(jiān)聽(tīng) epoll 實(shí)例上所有的fd的 I/O 事件。當(dāng)就緒列表中已有數(shù)據(jù),那么epoll_wait直接返回,解決了select每次都需要輪詢(xún)一遍的問(wèn)題。

epoll的優(yōu)點(diǎn):
epoll的監(jiān)聽(tīng)列表使用紅黑樹(shù)存儲(chǔ),epoll_ctl 函數(shù)添加進(jìn)來(lái)的 fd 都會(huì)被放在紅黑樹(shù)的某個(gè)節(jié)點(diǎn)內(nèi),而紅黑樹(shù)本身插入和刪除性能比較穩(wěn)定,時(shí)間復(fù)雜度 O(logN),并且可以存儲(chǔ)大量的的fd,避免了只能存儲(chǔ)1024個(gè)fd的限制;

epoll_ctl 中為每個(gè)文件描述符指定了回調(diào)函數(shù),并在就緒時(shí)將其加入到就緒列表,因此不需要像select一樣遍歷檢測(cè)每個(gè)文件描述符,只需要判斷就緒列表是否為空即可;

解析

netpoll本質(zhì)上是對(duì) I/O 多路復(fù)用技術(shù)的封裝,所以自然也是和epoll一樣脫離不了下面幾步:

  1. netpoll創(chuàng)建及其初始化;
  2. 向netpoll中加入待監(jiān)控的任務(wù);
  3. 從netpoll獲取觸發(fā)的事件;

在go中對(duì)epoll提供的三個(gè)函數(shù)進(jìn)行了封裝:

func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(delay int64) gList

netpollinit函數(shù)負(fù)責(zé)初始化netpoll;

netpollopen負(fù)責(zé)監(jiān)聽(tīng)文件描述符上的事件;

netpoll會(huì)阻塞等待返回一組已經(jīng)準(zhǔn)備就緒的 Goroutine;

下面是Go語(yǔ)言中編寫(xiě)的一個(gè)TCP server:

func main() {
    listen, err := net.Listen("tcp", ":8888")
    if err != nil {
        fmt.Println("listen error: ", err)
        return
    } 
    for {
        conn, err := listen.Accept()
        if err != nil {
            fmt.Println("accept error: ", err)
            break
        } 
        // 創(chuàng)建一個(gè)goroutine來(lái)負(fù)責(zé)處理讀寫(xiě)任務(wù)
        go HandleConn(conn)
    }
} 

下面我們跟著這個(gè)TCP server的源碼一起看看是在哪里使用了netpoll來(lái)完成epoll的調(diào)用。

net.Listen

這個(gè)TCP server中會(huì)調(diào)用net.Listen創(chuàng)建一個(gè)socket同時(shí)返回與之對(duì)應(yīng)的fd,該fd用來(lái)初始化listener的netFD(go層面封裝的網(wǎng)絡(luò)文件描述符),接著調(diào)用 netFD的listenStream方法完成對(duì) socket 的 bind&listen和netFD的初始化。

調(diào)用過(guò)程如下:


func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    // 創(chuàng)建一個(gè)socket
    s, err := sysSocket(family, sotype, proto)
    if err != nil {
        return nil, err
    }
    ...
    // 創(chuàng)建fd
    if fd, err = newFD(s, family, sotype, net); err != nil {
        poll.CloseFunc(s)
        return nil, err
    } 
    if laddr != nil && raddr == nil {
        switch sotype {
        case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
            // 調(diào)用 netFD的listenStream方法完成對(duì) socket 的 bind&listen和netFD的初始化
            if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
        case syscall.SOCK_DGRAM:
            ...
        }
    }
    ...
    return fd, nil
}

func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
    ret := &netFD{
        pfd: poll.FD{
            Sysfd:         sysfd,
            IsStream:      sotype == syscall.SOCK_STREAM,
            ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
        },
        family: family,
        sotype: sotype,
        net:    net,
    }
    return ret, nil
}

sysSocket方法會(huì)發(fā)起一個(gè)系統(tǒng)調(diào)用創(chuàng)建一個(gè)socket,newFD會(huì)創(chuàng)建一個(gè)netFD,然后調(diào)用netFD的listenStream方法進(jìn)行bind&listen操作,并對(duì)netFD進(jìn)行init。



netFD是一個(gè)文件描述符的封裝,netFD中包含一個(gè)FD數(shù)據(jù)結(jié)構(gòu),F(xiàn)D中包含了Sysfd 和pollDesc兩個(gè)重要的數(shù)據(jù)結(jié)構(gòu),Sysfd是sysSocket返回的socket系統(tǒng)文件描述符,pollDesc用于監(jiān)控文件描述符的可讀或者可寫(xiě)。

我們繼續(xù)看listenStream:

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    ...
    // 完成綁定操作
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
    // 進(jìn)行監(jiān)聽(tīng)操作
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
    // 初始化fd
    if err = fd.init(); err != nil {
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

listenStream方法會(huì)調(diào)用Bind方法完成fd的綁定操作,然后調(diào)用listenFunc進(jìn)行監(jiān)聽(tīng),接著調(diào)用fd的init方法,完成FD、pollDesc初始化。

func (pd *pollDesc) init(fd *FD) error {
    // 調(diào)用到runtime.poll_runtime_pollServerInit
    serverInit.Do(runtime_pollServerInit)
    // 調(diào)用到runtime.poll_runtime_pollOpen
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    ...
    return nil
}

runtime_pollServerInit用Once封裝保證只能被調(diào)用一次,這個(gè)函數(shù)在Linux平臺(tái)上會(huì)創(chuàng)建一個(gè)epoll文件描述符實(shí)例;

poll_runtime_pollOpen調(diào)用了netpollopen會(huì)將fd注冊(cè)到 epoll實(shí)例中,并返回一個(gè)pollDesc;

netpollinit初始化

func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

func netpollGenericInit() {
    if atomic.Load(&netpollInited) == 0 {
        lock(&netpollInitLock)
        if netpollInited == 0 {
            netpollinit()
            atomic.Store(&netpollInited, 1)
        }
        unlock(&netpollInitLock)
    }
}

netpollGenericInit會(huì)調(diào)用平臺(tái)上特定實(shí)現(xiàn)的netpollinit,在Linux中會(huì)調(diào)用到netpoll_epoll.go的netpollinit方法:

var (
    epfd int32 = -1 // epoll descriptor 
)

func netpollinit() {
    // 創(chuàng)建一個(gè)新的 epoll 文件描述符
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    ...
    // 創(chuàng)建一個(gè)用于通信的管道
    r, w, errno := nonblockingPipe()
    ...
    ev := epollevent{
        events: _EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
    // 將讀取數(shù)據(jù)的文件描述符加入監(jiān)聽(tīng)
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
    ...
    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

調(diào)用epollcreate1方法會(huì)創(chuàng)建一個(gè)epoll文件描述符實(shí)例,需要注意的是epfd是一個(gè)全局的屬性。然后創(chuàng)建一個(gè)用于通信的管道,調(diào)用epollctl將讀取數(shù)據(jù)的文件描述符加入監(jiān)聽(tīng)。

netpollopen加入事件監(jiān)聽(tīng)

下面再看看poll_runtime_pollOpen方法:

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    pd := pollcache.alloc()
    lock(&pd.lock)
    if pd.wg != 0 && pd.wg != pdReady {
        throw("runtime: blocked write on free polldesc")
    }
    if pd.rg != 0 && pd.rg != pdReady {
        throw("runtime: blocked read on free polldesc")
    }
    pd.fd = fd
    pd.closing = false
    pd.everr = false
    pd.rseq++
    pd.rg = 0
    pd.rd = 0
    pd.wseq++
    pd.wg = 0
    pd.wd = 0
    pd.self = pd
    unlock(&pd.lock)

    var errno int32
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

poll_runtime_pollOpen方法會(huì)通過(guò)pollcache.alloc初始化總大小約為 4KB的pollDesc結(jié)構(gòu)體。然后重置pd的屬性,調(diào)用netpollopen向epoll實(shí)例epfd加入新的輪詢(xún)事件監(jiān)聽(tīng)文件描述符的可讀和可寫(xiě)狀態(tài)。

下面我們?cè)倏纯磒ollCache是如何初始化pollDesc的。

type pollCache struct {
    lock  mutex
    first *pollDesc 
}

const pollBlockSize = 4 * 1024

func (c *pollCache) alloc() *pollDesc {
    lock(&c.lock)
    // 初始化首節(jié)點(diǎn)
    if c.first == nil {
        const pdSize = unsafe.Sizeof(pollDesc{})
        n := pollBlockSize / pdSize
        if n == 0 {
            n = 1
        } 
        mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
        // 初始化pollDesc鏈表
        for i := uintptr(0); i < n; i++ {
            pd := (*pollDesc)(add(mem, i*pdSize))
            pd.link = c.first
            c.first = pd
        }
    }
    pd := c.first
    c.first = pd.link
    lockInit(&pd.lock, lockRankPollDesc)
    unlock(&c.lock)
    return pd
}

pollCache的鏈表頭如果為空,那么初始化首節(jié)點(diǎn),首節(jié)點(diǎn)是一個(gè)pollDesc的鏈表頭,每次調(diào)用該結(jié)構(gòu)體都會(huì)返回鏈表頭還沒(méi)有被使用的pollDesc。


到這里就完成了net.Listen的分析,下面我們看看listen.Accept。

net.Accept

Listener.Accept方法最終會(huì)調(diào)用到netFD的accept方法中:

func (fd *netFD) accept() (netfd *netFD, err error) {
    // 調(diào)用netfd.FD的Accept接受新的 socket 連接,返回 socket 的 fd
    d, rsa, errcall, err := fd.pfd.Accept()
    ...
    // 構(gòu)造一個(gè)新的netfd
    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
        poll.CloseFunc(d)
        return nil, err
    }
    // 調(diào)用 netFD 的 init 方法完成初始化
    if err = netfd.init(); err != nil {
        netfd.Close()
        return nil, err
    }
    lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
    return netfd, nil
}

這個(gè)方法首先會(huì)調(diào)用到FD的Accept接受新的 socket 連接,并返回新的socket對(duì)應(yīng)的fd,然后調(diào)用newFD構(gòu)造一個(gè)新的netfd,并通過(guò)init 方法完成初始化。

init方法上面我們已經(jīng)看過(guò)了,下面我們來(lái)看看Accept方法:

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    ...
    for {
        // 使用 linux 系統(tǒng)調(diào)用 accept 接收新連接,創(chuàng)建對(duì)應(yīng)的 socket
        s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        case syscall.EINTR:
            continue
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                // 如果當(dāng)前沒(méi)有發(fā)生期待的 I/O 事件,那么 waitRead 會(huì)通過(guò) park goroutine 讓邏輯 block 在這里
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        case syscall.ECONNABORTED: 
            continue
        }
        return -1, nil, errcall, err
    }
}

FD.Accept方法會(huì)使用 linux 系統(tǒng)調(diào)用 accept 接收新連接,創(chuàng)建對(duì)應(yīng)的 socket,如果沒(méi)有可讀的消息,waitRead會(huì)被阻塞。這些被park住的goroutine會(huì)在goroutine的調(diào)度中調(diào)用runtime.netpoll被喚醒。

pollWait事件等待

pollDesc.waitRead實(shí)際上是調(diào)用了runtime.poll_runtime_pollWait

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    ...
    // 進(jìn)入 netpollblock 并且判斷是否有期待的 I/O 事件發(fā)生
    for !netpollblock(pd, int32(mode), false) {
        ...
    }
    return 0
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }
    // 這個(gè) for 循環(huán)是為了等待 io ready 或者 io wait
    for {
        old := *gpp
        // gpp == pdReady 表示此時(shí)已有期待的 I/O 事件發(fā)生,
        // 可以直接返回 unblock 當(dāng)前 goroutine 并執(zhí)行響應(yīng)的 I/O 操作
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        // 如果沒(méi)有期待的 I/O 事件發(fā)生,則通過(guò)原子操作把 gpp 的值置為 pdWait 并退出 for 循環(huán)
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }
    if waitio || netpollcheckerr(pd, mode) == 0 {
        // 讓出當(dāng)前線(xiàn)程,將 Goroutine 轉(zhuǎn)換到休眠狀態(tài)并等待運(yùn)行時(shí)的喚醒
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent pdReady notification
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

poll_runtime_pollWait會(huì)用for循環(huán)調(diào)用netpollblock函數(shù)判斷是否有期待的 I/O 事件發(fā)生,直到netpollblock返回true表示io ready才會(huì)走出循環(huán)。

netpollblock方法會(huì)判斷當(dāng)前的狀態(tài)是不是處于pdReady,如果是那么直接返回true;如果不是,那么將gpp通過(guò)CAS設(shè)置為pdWait并退出 for 循環(huán)。通過(guò)gopark 把當(dāng)前 goroutine 給 park 住,直到對(duì)應(yīng)的 fd 上發(fā)生可讀/可寫(xiě)或者其他I/O 事件為止。

這些被park住的goroutine會(huì)在goroutine的調(diào)度中調(diào)用runtime.netpoll被喚醒。

netpoll輪詢(xún)等待

runtime.netpoll的核心邏輯是: 根據(jù)入?yún)?delay設(shè)置調(diào)用 epoll_wait 的 timeout 值,調(diào)用 epoll_wait 從 epoll 的 eventpoll.rdllist雙向列表中獲取IO就緒的fd列表,遍歷epoll_wait 返回的fd列表, 根據(jù)調(diào)用epoll_ctl注冊(cè)fd時(shí)封裝的上下文信息組裝可運(yùn)行的 goroutine 并返回。

執(zhí)行完 netpoll 之后,會(huì)返回一個(gè)就緒 fd 列表對(duì)應(yīng)的 goroutine 列表,接下來(lái)將就緒的 goroutine 加入到調(diào)度隊(duì)列中,等待調(diào)度運(yùn)行。

func netpoll(delay int64) gList {
    if epfd == -1 {
        return gList{}
    }
    var waitms int32
    // 因?yàn)閭魅雂elay單位是納秒,下面將納秒轉(zhuǎn)換成毫秒
    if delay < 0 {
        waitms = -1
    } else if delay == 0 {
        waitms = 0
    } else if delay < 1e6 {
        waitms = 1
    } else if delay < 1e15 {
        waitms = int32(delay / 1e6)
    } else {
        // An arbitrary cap on how long to wait for a timer.
        // 1e9 ms == ~11.5 days.
        waitms = 1e9
    }
    var events [128]epollevent
retry:
    // 等待文件描述符轉(zhuǎn)換成可讀或者可寫(xiě)
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    // 返回負(fù)值,那么重新調(diào)用epollwait進(jìn)行等待
    if n < 0 {
        ...
        goto retry
    }
    var toRun gList
    // 意味著被監(jiān)控的文件描述符出現(xiàn)了待處理的事件
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        } 
        ...
        // 判斷發(fā)生的事件類(lèi)型,讀類(lèi)型或者寫(xiě)類(lèi)型
        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            // 取出保存在 epollevent 里的 pollDesc
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.everr = false
            if ev.events == _EPOLLERR {
                pd.everr = true
            }
            // 調(diào)用 netpollready,傳入就緒 fd 的 pollDesc
            netpollready(&toRun, pd, mode)
        }
    }
    return toRun
}

netpoll會(huì)調(diào)用epollwait獲取就緒的 fd 列表,對(duì)應(yīng)的epoll函數(shù)是epoll_wait。toRun是一個(gè) g 的鏈表,存儲(chǔ)要恢復(fù)的 goroutines,最后返回給調(diào)用方。如果epollwait返回的n大于零,那么表示被監(jiān)控的文件描述符出現(xiàn)了待處理的事件,那么需要調(diào)用for循環(huán)進(jìn)行處理。循環(huán)里面會(huì)根據(jù)時(shí)間類(lèi)型設(shè)置mode,然后拿出對(duì)應(yīng)的pollDesc,調(diào)用netpollready方法。

下面我們?cè)倏匆幌耼etpollready:

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    // 獲取對(duì)應(yīng)的g的指針
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    // 將對(duì)應(yīng)的g加入到toRun列表中
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    // 根據(jù)傳入的mode判斷事件類(lèi)型
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        // 取出 gpp 存儲(chǔ)的 g
        old := *gpp
        if old == pdReady {
            return nil
        }
        if old == 0 && !ioready {
            return nil
        }
        var new uintptr
        if ioready {
            new = pdReady
        }
        // cas 將讀或者寫(xiě)信號(hào)量轉(zhuǎn)換成 pdReady
        if atomic.Casuintptr(gpp, old, new) {
            if old == pdWait {
                old = 0
            }
            // 返回對(duì)應(yīng)的 g指針
            return (*g)(unsafe.Pointer(old))
        }
    }
}

講完了runtime.netpoll的源碼有個(gè)需要注意的地方,調(diào)用runtime.netpoll的地方有兩處:

在調(diào)度器中執(zhí)行runtime.schedule(),該方法中會(huì)執(zhí)行runtime.findrunable(),在runtime.findrunable()中調(diào)用了runtime.netpoll獲取待執(zhí)行的goroutine;
Go runtime 在程序啟動(dòng)的時(shí)候會(huì)創(chuàng)建一個(gè)獨(dú)立的sysmon監(jiān)控線(xiàn)程,sysmon 每 20us~10ms 運(yùn)行一次,每次運(yùn)行會(huì)檢查距離上一次執(zhí)行netpoll是否超過(guò)10ms,如果是則會(huì)調(diào)用一次runtime.netpoll;

這些入口的調(diào)用感興趣的可以自己去看看。

總結(jié)

本文從I/O多路復(fù)用開(kāi)始講解select以及epoll,然后再回到go語(yǔ)言中去看它是如何實(shí)現(xiàn)多路復(fù)用這樣的結(jié)構(gòu)的。通過(guò)追蹤源碼可以發(fā)現(xiàn),其實(shí)go也是根據(jù)epoll來(lái)封裝自己的函數(shù):

func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

通過(guò)這三個(gè)函數(shù)來(lái)實(shí)現(xiàn)對(duì)epoll的創(chuàng)建實(shí)例、注冊(cè)、事件等待操作。

轉(zhuǎn)自

https://www.cnblogs.com/luozhiyun/p/14390824.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容