注:本文分析基于Go1.14
簡(jiǎn)單實(shí)例
一個(gè)簡(jiǎn)單的tcp server例子
package main
import (
"context"
"fmt"
"net"
"time"
)
func main() {
listener, err := net.Listen("tcp", "0.0.0.0:7777")
if err != nil {
fmt.Println("Listen error:", err)
return
}
defer listener.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
conn, err := listener.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
fmt.Println("Accept temp error:", ne)
time.Sleep(time.Second)
continue
}
fmt.Println("Accept error:", err)
break
}
go handleConn(ctx, conn)
}
}
func handleConn(ctx context.Context, conn net.Conn) {
defer conn.Close()
buf := make([]byte, 1024)
for {
select {
case <-ctx.Done():
return
default:
}
n, err := conn.Read(buf)
if err != nil {
fmt.Println("Read error:", err)
}
_, err = conn.Write(buf[:n])
if err != nil {
fmt.Println("Write error:", err)
}
}
}
完整實(shí)現(xiàn)
上面是簡(jiǎn)單的TCP Server使用示例,如果想獲取完整的服務(wù)實(shí)現(xiàn),可以使用或者參考github上的一個(gè)go net 開(kāi)源項(xiàng)目:
https://github.com/izhw/gnet
該項(xiàng)目使用go原生net package實(shí)現(xiàn),不依賴(lài)其他第三方庫(kù),可以幫助開(kāi)發(fā)者快速搭建一個(gè)net服務(wù)(TCP Server or Client)。
比如,開(kāi)發(fā)者只要實(shí)現(xiàn)gnet.NetEventHandler事件回調(diào)(也可內(nèi)置默認(rèn)實(shí)現(xiàn)),使用可選的Functional options,就能快速搭建一個(gè)TCP Server,簡(jiǎn)單使用代碼如下:
package main
import (
"fmt"
"log"
"github.com/izhw/gnet"
"github.com/izhw/gnet/tcp/tcpserver"
)
type ServerHandler struct {
*gnet.NetEventHandler
}
func (h *ServerHandler) OnReadMsg(c gnet.Conn, data []byte) error {
fmt.Println("read msg:", string(data))
c.Write(data)
return nil
}
func main() {
s := tcpserver.NewServer("0.0.0.0:7777", &ServerHandler{})
log.Fatal("Exit:", s.Serve())
}
Go net源碼分析 (go1.14)
開(kāi)始的簡(jiǎn)單示例中,我們看到幾個(gè)關(guān)鍵的調(diào)用:net.Listen()、listener.Accept()、conn.Read()、conn.Write,下面分別進(jìn)行源碼分析。
Listen
-
net.Listen返回的是實(shí)現(xiàn)了net.Listener接口的*TCPListener。 - 其中,
Listen方法內(nèi),生成系統(tǒng)文件描述符sysfd,使用該sysfd設(shè)置參數(shù)、調(diào)用syscall.Bind、syscall.Listen完成綁定、監(jiān)聽(tīng),并初始化一些重要的結(jié)構(gòu)信息,創(chuàng)建epoll句柄、注冊(cè)epoll事件,然后構(gòu)造返回TCPListener。
調(diào)用鏈源碼如下(只貼了關(guān)鍵代碼)。
// `net/net.go`
// Listener接口
type Listener interface {
// 阻塞等待,有新連接事件的時(shí)候,返回一個(gè)net.Conn
Accept() (Conn, error)
// 關(guān)閉listener,任何阻塞的Accept操作變?yōu)椴蛔枞?,并返回錯(cuò)誤
Close() error
// 返回listener的網(wǎng)絡(luò)地址
Addr() Addr
}
// `net/tcpsock.go`
type TCPListener struct {
// Go封裝的網(wǎng)絡(luò)描述符,后面會(huì)具體講
fd *netFD
// Listen配置
lc ListenConfig
}
// `net/dial.go`
// 根據(jù)不同的 'network'和'address'構(gòu)建相應(yīng)的'Listener'
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
// resolve解析addrs
// ...
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
// 根據(jù)不同的addrs類(lèi)型調(diào)用不同的listen
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
// return error...
}
// return error...
return l, nil
}
// `net/tcpsock_posix.go`
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
// internetSocket內(nèi)部解析AddrFamily并調(diào)用socket()
// socket內(nèi)部創(chuàng)建關(guān)鍵的結(jié)構(gòu)對(duì)象netFD,并初始化綁定、監(jiān)聽(tīng)等
// 注意這里的syscall.SOCK_STREAM,后面會(huì)用到
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
// 用上面創(chuàng)建并初始化好的fd構(gòu)造TCPListener
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
// `net/sock_posix.go`
// 返回一個(gè)初始化好的異步I/O `netFD` 網(wǎng)絡(luò)描述符
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// `sysSocket`內(nèi)部調(diào)用syscall.Socket,
// 并置為非阻塞和close-on-exec (Syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC),
// 返回系統(tǒng)描述符: s
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
// ...
// 用`sysSocket`創(chuàng)建的`s`創(chuàng)建`netFD`
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
// ...
// 上面提到過(guò),在listenTCP中傳入的參數(shù)為syscall.SOCK_STREAM
// 此處判斷該sotype類(lèi)型,調(diào)用fd.listenStream()
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
// ...
return fd, nil
}
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
// 設(shè)置默認(rèn)參數(shù) SO_REUSEADDR
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
if ctrlFn != nil {
// ...
}
// syscall.Bind 綁定
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// syscall.Listen 監(jiān)聽(tīng)
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// netFD初始化,fd.init()->fd.pfd.Init()->fd.pfd.pd.init()`
// 最終調(diào)用的是`runtime_pollServerInit`、`runtime_pollOpen`
// netFD初始化部分后面會(huì)接著講
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
為了更好的理解,需要講幾個(gè)重要的struct:
-
netFD是網(wǎng)絡(luò)描述符,其結(jié)構(gòu)體中有一個(gè)poll.FD對(duì)象。 -
poll.FD是文件描述符,表示一個(gè)網(wǎng)絡(luò)連接或者OS文件。 -
poll.FD結(jié)構(gòu)中主要看Sysfd int和pd pollDesc兩個(gè)變量,前者是系統(tǒng)返回的文件描述符,后者其內(nèi)部封裝了運(yùn)行時(shí)上下文,包括讀、寫(xiě)goroutine及其狀態(tài),讀寫(xiě)超時(shí)等基本信息。通過(guò)將pollDesc指針信息存入epollevent.data(8字節(jié)數(shù)組)中,然后調(diào)用epollctl(epoll_ctl)將fd和epollevent信息注冊(cè)到epoll實(shí)例上,實(shí)現(xiàn)epoll事件回調(diào)和用戶(hù)態(tài)協(xié)程調(diào)用的關(guān)聯(lián)。
相關(guān)結(jié)構(gòu)源碼如下:
// `net/fd_unix.go`
type netFD struct {
pfd poll.FD
// Close前不可變
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
// `internal/poll/fd_unix.go`
type FD struct {
// sysfd和Read/Write方法鎖
fdmu fdMutex
// 系統(tǒng)文件描述符
Sysfd int
// I/O poller,封裝了運(yùn)行時(shí)上下文
pd pollDesc
// Writev 緩存.
iovecs *[]syscall.Iovec
// 文件關(guān)閉的時(shí)候發(fā)送信號(hào)量
csema uint32
// 非阻塞模式時(shí)非零
isBlocking uint32
// 是streaming還是packet-based UDP
IsStream bool
// 讀到零字節(jié)是否表示EOF,對(duì)于基于消息的套接字連接為false
ZeroReadIsEOF bool
// 是一個(gè)file而并非network socket
isFile bool
}
// internal/poll/fd_poll_runtime.go
// `pollDesc`結(jié)構(gòu)中只有一個(gè)`uintptr`變量,`runtimeCtx`封裝了運(yùn)行時(shí)上下文,其具體信息后面會(huì)講
type pollDesc struct {
runtimeCtx uintptr
}
接著看netFD初始化源碼:
// `net/fd_unix.go`
// netFD初始化
func (fd *netFD) init() error {
// 調(diào)用 pfd (poll.FD) 的Init方法
return fd.pfd.Init(fd.net, true)
}
// `internal/poll/fd_unix.go`
// FD初始化
func (fd *FD) Init(net string, pollable bool) error {
// ...
// 調(diào)用 pd (pollDesc) 的init方法
err := fd.pd.init(fd)
// ...
return err
}
// internal/poll/fd_poll_runtime.go
// serverInit全局變量,只執(zhí)行一次runtime_pollServerInit,
// 并在其內(nèi)部調(diào)用runtime.netpollinit()創(chuàng)建epoll實(shí)例;
// runtime_pollOpen內(nèi)部調(diào)用runtime.netpollopen,
// 將listener fd注冊(cè)到epoll實(shí)例中,初始化pollDesc并返回ctx,賦值runtimeCtx
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
上面pollDesc的init()方法中,runtime_pollServerInit和runtime_pollOpen實(shí)際link的是runtime包下的poll_runtime_pollServerInit和poll_runtime_pollOpen函數(shù),具體實(shí)現(xiàn)在runtime/netpoll.go中。
首先,看一下pollDesc在runtime包下的具體封裝信息:
// `runtime/netpoll.go`
// Network poller descriptor.
// No heap pointers.
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
}
pollDesc結(jié)構(gòu)中的重要變量:
-
lock鎖 防止內(nèi)部成員變量并發(fā)讀寫(xiě)問(wèn)題 -
fd為文件描述符 -
rt和wt分別表示讀寫(xiě)定時(shí)器,用來(lái)防止讀寫(xiě)超時(shí) -
rg和wg分別保存了用戶(hù)態(tài)操作pollDesc的讀、寫(xiě)goroutine地址,以及goroutine的ready/wait狀態(tài),用于goroutine讀寫(xiě)阻塞時(shí)掛起、就緒時(shí)恢復(fù)運(yùn)行
接著看一下runtime包下面的Init和Open,Init全局只初始化一次。
// `runtime/netpoll.go`
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
// 判斷netpoll是否已經(jīng)初始化過(guò)
if atomic.Load(&netpollInited) == 0 {
// 全局鎖
lock(&netpollInitLock)
if netpollInited == 0 {
// 調(diào)用netpollinit()
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
type epollevent struct {
// 事件
events uint32
data [8]byte // unaligned uintptr
}
// `runtime/netpoll_epoll.go`
func netpollinit() {
// 調(diào)用 OS epoll_create,創(chuàng)建一個(gè)epoll實(shí)例,
// 把生成的epoll fd賦值給全局變量 `epfd`
// 后續(xù)listener以及accept的所有sockets相關(guān)的epoll操作都是基于這個(gè)`epfd`
epfd = epollcreate1(_EPOLL_CLOEXEC)
// ...
ev := epollevent{
// 讀事件
events: _EPOLLIN,
}
// netpollBreakRd: for netpollBreak
// 在后面有事件回調(diào)時(shí)會(huì)用到,判斷是否為netpollBreakRd
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
// 系統(tǒng)調(diào)用 epoll_ctl,注冊(cè)epoll事件
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
// ...
}
接著看Open
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
// pollcache為全局pollDesc鏈表緩存,調(diào)用 alloc()獲取一個(gè)*pollDesc
pd := pollcache.alloc()
lock(&pd.lock)
// 鎖住,初始化、賦值成員變量
// ...
unlock(&pd.lock)
var errno int32
// 調(diào)用 netpollopen(),實(shí)現(xiàn)見(jiàn)下面
errno = netpollopen(fd, pd)
return pd, int(errno)
}
// alloc 如果鏈表頭`first`為空,則分配內(nèi)存并初始化n個(gè)`*pollDesc`節(jié)點(diǎn)鏈表,然后pop出頭節(jié)點(diǎn);
// 如果`first`不為空則直接pop出頭部節(jié)點(diǎn)。
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
unlock(&c.lock)
return pd
}
// netpollopen
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
// 觸發(fā)事件,讀、寫(xiě)、掛起關(guān)閉、邊緣觸發(fā)
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
// 調(diào)用`epollctl`注冊(cè)fd到epoll實(shí)例
// 同時(shí)把`*pollDesc`保存到`epollevent.data`里傳入內(nèi)核
// 實(shí)現(xiàn)內(nèi)核態(tài)事件和用戶(hù)態(tài)協(xié)程的關(guān)聯(lián)
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
從上面的源碼可以看到,Listen實(shí)現(xiàn)了TCP Server的綁定、監(jiān)聽(tīng),通過(guò)調(diào)用epoll_create、epoll_ctl 創(chuàng)建epoll句柄、注冊(cè)epoll事件,并將goroutine信息與回調(diào)事件相關(guān)聯(lián)。
Accept
listener.Accept是Listener的接口方法,TCPListener實(shí)現(xiàn)了該方法,它阻塞等待下一次調(diào)用并返回一個(gè)Conn。
// `net/tcpsock.go`
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
// `net/tcpsock_posix.go`
func (ln *TCPListener) accept() (*TCPConn, error) {
// 關(guān)注1:
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
// 關(guān)注2:
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
// `net/fd_unix.go`
func (fd *netFD) accept() (netfd *netFD, err error) {
// 調(diào)用`poll.FD`的`Accept`方法接受新的socket連接,返回socket的fd
d, rsa, errcall, err := fd.pfd.Accept()
// ...
// 用上面返回的fd(d)創(chuàng)建一個(gè)netFD
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net);
// ...
// 調(diào)用`netFD`的`init`方法完成`pollDesc`初始化,并將事件加入epoll實(shí)例
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
// ...
return netfd, nil
}
// `internal/poll/fd_unix.go`
// Accept 封裝了accept網(wǎng)絡(luò)調(diào)用
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
// ...
for {
// 其內(nèi)調(diào)用syscall.Accept4/syscall.Accept,設(shè)置為syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC
s, rsa, errcall, err := accept(fd.Sysfd)
// listener fd在創(chuàng)建的時(shí)候設(shè)置為非阻塞模式,accept()會(huì)立即返回,
// 判斷err,為nil則說(shuō)明有新連接事件,直接return
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果err為syscall.EAGAIN,并且pollDesc的runtimeCtx不為空,則調(diào)用pollDesc.waitRead,
// 其中調(diào)用了`runtime_pollWait`,實(shí)際連接調(diào)用的是`runtime.poll_runtime_pollWait`
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
continue
}
return -1, nil, errcall, err
}
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) waitWrite(isFile bool) error {
return pd.wait('w', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
-
Accept接收客戶(hù)端連接請(qǐng)求建立新連接,通過(guò)netFD的accept()創(chuàng)建系統(tǒng)fd,將socket設(shè)置為非阻塞I/O模式。 - 后面邏輯與前面講的一樣,創(chuàng)建并初始化
netFD,其內(nèi)完成pollDesc初始化、調(diào)用runtime.netpollopen將fd、epollevent添加到epoll實(shí)例。 - 因?yàn)槭欠亲枞J?,?dāng)
accept()返回err為syscall.EAGAIN時(shí),若pollDesc的runtimeCtx不為空,則調(diào)用pollDesc.waitRead,其中調(diào)用了runtime_pollWait。 -
runtime_pollWait實(shí)際link的是runtime.poll_runtime_pollWait,其中調(diào)用netpollblock,源碼如下:
// `runtime/netpoll.go`
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// netpollcheckerr and check GOOS
///...
// for循環(huán)等待 IO ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
}
return 0
}
// IO reday返回true,超時(shí)或者關(guān)閉返回false
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// gpp根據(jù)mode的值取rg或者wg,后面調(diào)用gopark時(shí),會(huì)把當(dāng)前的goroutine的g結(jié)構(gòu)指針存入gpp
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to WAIT
// 判斷狀態(tài)為 pdReady 則 return, 否則設(shè)置為 pdWait
for {
old := *gpp
// old == pdReady 表示此時(shí)已有 I/O 事件發(fā)生,
// 直接返回 unblock 當(dāng)前 goroutine 并執(zhí)行相應(yīng)的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// CAS原子操作將gpp置為`pdWait`
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
// recheck error states
if waitio || netpollcheckerr(pd, mode) == 0 {
// 防止此時(shí)可能會(huì)有其他的并發(fā)操作修改pd里的內(nèi)容,所以需要再次檢查錯(cuò)誤狀態(tài)。
// gopark將當(dāng)前goroutine置于等待狀態(tài)并等待下一次的調(diào)度
// `netpollblockcommit`回調(diào)函數(shù)在gopark內(nèi)部回調(diào)時(shí),CAS將當(dāng)前goroutine指針存到傳入的參數(shù)`gpp`
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// 通過(guò)原子操作將gpp的值設(shè)置為0,返回修改前的值并判斷是否pdReady
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
關(guān)于Go的調(diào)度,此處不深入,簡(jiǎn)單講一下。
gopark調(diào)用mcall(park_m),mcall是通過(guò)匯編實(shí)現(xiàn)的,其函數(shù)原型及作用為:
func mcall(fn func(*g))
- 從當(dāng)前g棧切換到g0棧;
- 在g0棧上執(zhí)行函數(shù)fn(g),此處為
park_m; - 保存當(dāng)前g的信息,將PC/SP存儲(chǔ)到
g.sched中,當(dāng)被重新調(diào)度時(shí)能夠獲取相關(guān)信息繼續(xù)執(zhí)行。
在park_m()中將當(dāng)前goroutine狀態(tài)由_GrunningCAS為_Gwaiting、與當(dāng)前的m解綁,并回調(diào)netpollblockcommit將gr/gw由pdWaitCAS為goroutine指針,然后調(diào)用schedule()。
schedule函數(shù)永遠(yuǎn)不會(huì)返回,其調(diào)用邏輯為:schedule() -> execute() -> gogo() -> goroutine 任務(wù) -> goexit() -> goexit1() -> mcall() -> goexit0() -> schedule()。
當(dāng)goroutine對(duì)應(yīng)的fd上發(fā)生期望的事件時(shí),它就會(huì)被重新調(diào)度,從g.sched中獲取之前保存的信息,繼續(xù)執(zhí)行后面的邏輯,此時(shí)gpp為pdReady狀態(tài)。
Read、Write
TCPListener的accept()中創(chuàng)建并初始化netFD后,會(huì)調(diào)用newTCPConn()創(chuàng)建并返回*TCPConn,它實(shí)現(xiàn)了net.Conn接口,我們主要看Read和Write方法。
Read調(diào)用鏈源碼:
// `net/tcpsock.go`
func newTCPConn(fd *netFD) *TCPConn {
c := &TCPConn{conn{fd}}
setNoDelay(c.fd, true)
return c
}
type TCPConn struct {
conn
}
// `net/net.go`
type conn struct {
fd *netFD
}
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
// `net/fd_unix.go`
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}
// `internal/poll/fd_unix.go`
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
// ...
for {
// syscall.Read系統(tǒng)調(diào)用從socket讀取數(shù)據(jù),因?yàn)?socket在被accept的時(shí)候設(shè)置為非阻塞 I/O,不會(huì)阻塞
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
// 當(dāng)err為syscall.EAGAIN,調(diào)用waitRead,
// 從上面的分析知道,其內(nèi)通過(guò)gopark將當(dāng)前goroutine掛起
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}
conn.Write與conn.Read邏輯類(lèi)似,通過(guò)FD.Write調(diào)用syscall.Write,因?yàn)闉榉亲枞?I/O,如果返回err為syscall.EAGAIN,也會(huì)類(lèi)似Read,調(diào)用pollDesc.waitWrite,
執(zhí)行runtime_pollWait->netpollblock,gopark住當(dāng)前goroutine,直到有事件發(fā)生被重新調(diào)度。
netpoll
通過(guò)前面的分析,我們了解了Go通過(guò)gopark住 goroutine 達(dá)到阻塞 Accept/Read/Write 的效果?,F(xiàn)在會(huì)有兩個(gè)疑問(wèn):
- 當(dāng)相應(yīng)的 I/O 事件發(fā)生之后,如何喚醒這些
gopark住的goroutine從而繼續(xù)調(diào)度執(zhí)行呢? - 我們前面講到了跟epoll相關(guān)的兩個(gè)調(diào)用
epoll_create、epoll_ctl,還有一個(gè)重要的epoll_wait在哪里調(diào)用的呢?
通過(guò)源碼,可以發(fā)現(xiàn),在runtime/netpoll_epoll.go中有一個(gè)netpoll()方法,它內(nèi)部調(diào)用 epollwait(epoll_wait) 獲取就緒的fd事件epollevent列表,然后將每個(gè)epollevent.data值取出轉(zhuǎn)化為*pollDesc,并調(diào)用netpollready->netpollunblock, 將rg/wg的狀態(tài)轉(zhuǎn)化為pdReady(ioready),同時(shí)將讀、寫(xiě)g指針添加到goroutine列表gList返回。
相關(guān)源碼如下:
// runtime/proc.go
// A gList is a list of Gs linked through g.schedlink. A G can only be
// on one gQueue or gList at a time.
type gList struct {
head guintptr
}
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
// epfd、delay判斷waitms賦值
// ...
var events [128]epollevent
retry:
// 獲取就緒的fd事件列表
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
// ...
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
// 判斷是否為netpollinit注冊(cè)epoll實(shí)例時(shí)設(shè)置的netpollBreakRd
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
// ...
continue
}
var mode int32
// 讀事件
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
// 寫(xiě)事件
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在epollevent.data中的pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 調(diào)用netpollready,傳入就緒fd的pollDesc
netpollready(&toRun, pd, mode)
}
}
return toRun
}
// netpollready 調(diào)用netpollunblock,把pollDesc中相應(yīng)的可讀、寫(xiě)goroutine取出
// 并將pollDesc.rg/wg轉(zhuǎn)換狀態(tài)為pdReady,然后將取出的goroutine push到鏈表 toRun 中
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// gpp CAS 為 pdReady
if atomic.Casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
// 將之前存入pollDesc的 g結(jié)構(gòu)指針 old 轉(zhuǎn)換為 *g
return (*g)(unsafe.Pointer(old))
}
}
}
我們看到Go 在netpoll()中獲取觸發(fā)讀寫(xiě)事件的goroutine列表,而netpoll()會(huì)在多處被調(diào)用(runtime/proc.go):
- startTheWorldWithSema()
StartTheWorld時(shí)會(huì)調(diào)用netpoll獲取gList進(jìn)行調(diào)度 - findrunnable()
該方法會(huì)在Go scheduler的核心方法schedule()中被調(diào)用,從而調(diào)用netpoll獲取gList - pollWork()
后臺(tái)工作循環(huán)(比如idle GC)檢查時(shí)會(huì)調(diào)用netpoll獲取gList - sysmon()
在程序啟動(dòng)時(shí)調(diào)用,不需要P,使用獨(dú)立的M作為監(jiān)控線程,sysmon每 20us~10ms運(yùn)行一次,調(diào)用netpoll獲取gList
當(dāng)上面這些調(diào)用獲取gList后,都會(huì)調(diào)用injectglist()方法(findrunnable()中會(huì)先pop出一個(gè)g,將g狀態(tài)由_GwaitingCAS為_Grunnable,然后再調(diào)用injectglist()),injectglist方法會(huì)將gList中的g的狀態(tài)由_GwaitingCAS為_Grunnable,然后放入全局運(yùn)行隊(duì)列(globrunqput(gp)),從而被重新調(diào)度,當(dāng)goroutine被重新調(diào)度時(shí),會(huì)從g.sched中取出PC/SP信息,繼續(xù)執(zhí)行之前的邏輯。
// Injects the list of runnable G's into the scheduler and clears glist.
// Can run concurrently with GC.
func injectglist(glist *gList) {
if glist.empty() {
return
}
if trace.enabled {
for gp := glist.head.ptr(); gp != nil; gp = gp.schedlink.ptr() {
traceGoUnpark(gp, 0)
}
}
lock(&sched.lock)
var n int
// 從glist中循環(huán)取出gp
for n = 0; !glist.empty(); n++ {
gp := glist.pop()
// 由 _Gwaiting 變?yōu)?_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)
// 放入全局運(yùn)行隊(duì)列
globrunqput(gp)
}
unlock(&sched.lock)
for ; n != 0 && sched.npidle != 0; n-- {
// 循環(huán)獲取空閑P,并調(diào)度 M 去運(yùn)行 P
startm(nil, false)
}
*glist = gList{}
}
Go基于epoll/kqueue/iocp和自身的運(yùn)行時(shí)調(diào)度機(jī)制,實(shí)現(xiàn)了自己的I/O多路復(fù)用netpoll網(wǎng)絡(luò)模型,從上面源碼可以看到,Accept/Read/Write等方法其底層實(shí)現(xiàn)均采用非阻塞方式,而我們?cè)陂_(kāi)發(fā)過(guò)程中調(diào)用的方式很顯然是同步模式,這就大大降低了網(wǎng)絡(luò)開(kāi)發(fā)難度,從這也可以看出Go語(yǔ)言設(shè)計(jì)的初衷之一:簡(jiǎn)單而高效。
至此,本文關(guān)于Go net(TCP service)相關(guān)內(nèi)容就介紹完了,總結(jié)一下涉及的內(nèi)容:
- TCP Server的簡(jiǎn)單使用示例
- 簡(jiǎn)單介紹了一個(gè)可使用(或作為參考)的、基于Go原生net package實(shí)現(xiàn)的go net項(xiàng)目(github開(kāi)源項(xiàng)目)
- 主要進(jìn)行了
netpoll相關(guān)源碼的分析(基于Go1.14)