
大家好!我的名字叫 Sergey Kamardin。我是來自 Mail.Ru 的一名工程師。這篇文章將講述我們是如何用 Go 語言開發(fā)一個(gè)高負(fù)荷的 WebSocket 服務(wù)。即使你對 WebSockets 熟悉但對 Go 語言知之甚少,我還是希望這篇文章里講到的性能優(yōu)化的思路和技術(shù)對你有所啟發(fā)。
介紹
作為全文的鋪墊,我想先講一下我們?yōu)槭裁匆_發(fā)這個(gè)服務(wù)。
Mail.Ru 有許多包含狀態(tài)的系統(tǒng)。用戶的電子郵件存儲(chǔ)是其中之一。有很多辦法來跟蹤這些狀態(tài)的改變。不外乎通過定期的輪詢或者系統(tǒng)通知來得到狀態(tài)的變化。這兩種方法都有它們的優(yōu)缺點(diǎn)。對郵件這個(gè)產(chǎn)品來說,讓用戶盡快收到新的郵件是一個(gè)考量指標(biāo)。郵件的輪詢會(huì)產(chǎn)生大概每秒5萬個(gè) HTTP 請求,其中60%的請求會(huì)返回304狀態(tài)(表示郵箱沒有變化)。因此,為了減少服務(wù)器的負(fù)荷并加速郵件的接收,我們決定重寫一個(gè) publisher-subscriber 服務(wù)(這個(gè)服務(wù)通常也會(huì)稱作 bus,message broker 或者 event-channel)。這個(gè)服務(wù)負(fù)責(zé)接收狀態(tài)更新的通知,然后還處理對這些更新的訂閱。
重寫 publisher-subscriber 服務(wù)之前:
現(xiàn)在:
上面第一個(gè)圖為舊的架構(gòu)。瀏覽器(Browser)會(huì)定期輪詢 API 服務(wù)來獲得郵件存儲(chǔ)服務(wù)(Storage)的更新。
第二張圖展示的是新的架構(gòu)。瀏覽器(Browser)和通知 API 服務(wù)(notificcation API)建立一個(gè) WebSocket 連接。通知 API 服務(wù)會(huì)發(fā)送相關(guān)的訂閱到 Bus 服務(wù)上。當(dāng)收到新的電子郵件時(shí),存儲(chǔ)服務(wù)(Storage)向Bus(1)發(fā)送一個(gè)通知,Bus 又將通知發(fā)送給相應(yīng)的訂閱者(2)。API 服務(wù)為收到的通知找到相應(yīng)的連接,然后把通知推送到用戶的瀏覽器(3)。
我們今天就來討論一下這個(gè) API 服務(wù)(也可以叫做 WebSocket 服務(wù))。在開始之前,我想提一下這個(gè)在線服務(wù)處理將近3百萬個(gè)連接。
慣用的做法(The idiomatic way )
首先,我們看一下不做任何優(yōu)化會(huì)如何用 Go 來實(shí)現(xiàn)這個(gè)服務(wù)的部分功能。在使用 net/http 實(shí)現(xiàn)具體功能前,讓我們先討論下我們將如何發(fā)送和接收數(shù)據(jù)。這些數(shù)據(jù)是定義在 WebSocket 協(xié)議之上的(例如 JSON 對象)。我們在下文中會(huì)成他們?yōu)?packet。
我們先來實(shí)現(xiàn) Channel 結(jié)構(gòu)。它包含相應(yīng)的邏輯來通過 WebScoket 連接發(fā)送和接收 packet。
◆ Channel 結(jié)構(gòu)
// Packet represents application level data.
type Packet struct {
...
}
// Channel wraps user connection.
type Channel struct {
conn net.Conn // WebSocket connection.
send chan Packet // Outgoing packets queue.
}
func NewChannel(conn net.Conn) *Channel {
c := &Channel{
conn: conn,
send: make(chan Packet, N),
}
go c.reader()
go c.writer()
return c
}
這里我要強(qiáng)調(diào)的是讀和寫這兩個(gè) goroutines。每個(gè) goroutine 都需要各自的內(nèi)存棧。棧的初始大小由操作系統(tǒng)和 Go 的版本決定,通常在 2KB 到 8KB 之間。我們之前提到有3百萬個(gè)在線連接,如果每個(gè) goroutine 棧需要 4KB 的話,所有連接就需要 24GB 的內(nèi)存。這還沒算上給 Channel 結(jié)構(gòu),發(fā)送 packet 用的 ch.send 和其它一些內(nèi)部字段分配的內(nèi)存空間。
◆ I/O goroutines
接下來看一下“reader”的實(shí)現(xiàn):
func (c *Channel) reader() {
// We make a buffered read to reduce read syscalls.
buf := bufio.NewReader(c.conn)
for {
pkt, _ := readPacket(buf)
c.handle(pkt)
}
}
這里我們使用了 bufio.Reader。每次都會(huì)在 buf 大小允許的范圍內(nèi)盡量讀取多的字節(jié),從而減少 read() 系統(tǒng)調(diào)用的次數(shù)。在無限循環(huán)中,我們期望會(huì)接收到新的數(shù)據(jù)。請記住之前這句話:期望接收到新的數(shù)據(jù)。我們之后會(huì)討論到這一點(diǎn)。
我們把 packet 的解析和處理邏輯都忽略掉了,因?yàn)樗鼈兒臀覀円懻摰膬?yōu)化不相關(guān)。不過 buf 值得我們的關(guān)注:它的缺省大小是4KB。這意味著所有連接將消耗掉額外的12 GB內(nèi)存?!皐riter”也是類似的情況:
func (c *Channel) writer() {
// We make buffered write to reduce write syscalls.
buf := bufio.NewWriter(c.conn)
for pkt := range c.send {
_ := writePacket(buf, pkt)
buf.Flush()
}
}
我們在待發(fā)送 packet 的 c.send channel 上循環(huán)將 packet 寫到緩存(buffer)里。細(xì)心的讀者肯定已經(jīng)發(fā)現(xiàn),這又是額外的4KB內(nèi)存。3百萬個(gè)連接會(huì)占用12GB的內(nèi)存。
◆ HTTP
我們已經(jīng)有了一個(gè)簡單的 Channel 實(shí)現(xiàn)?,F(xiàn)在我們需要一個(gè) WebSocket 連接。因?yàn)檫€在通常做法(Idiomatic Way)的標(biāo)題下,那么就先來看看通常是如何實(shí)現(xiàn)的。
注:如果你不知道 WebSocket 是怎么工作的,那么這里值得一提的是客戶端是通過一個(gè)叫升級(Upgrade)請求的特殊 HTTP 機(jī)制來建立 WebSocket的。在成功處理升級請求以后,服務(wù)端和客戶端使用 TCP 連接來交換二進(jìn)制的 WebSocket 幀(frames)。這里有關(guān)于幀結(jié)構(gòu)的描述。
import (
"net/http"
"some/websocket"
)
http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
conn, _ := websocket.Upgrade(r, w)
ch := NewChannel(conn)
//...
})
請注意這里的 http.ResponseWriter 結(jié)構(gòu)包含 bufio.Reader 和bufio.Writer(各自分別包含4KB的緩存)。它們用于 *http.Request 初始化和返回結(jié)果。
不管是哪個(gè) WebSocket,在成功回應(yīng)一個(gè)升級請求之后,服務(wù)端在調(diào)用responseWriter.Hijack() 之后會(huì)接收到一個(gè) I/O 緩存和對應(yīng)的 TCP 連接。
注:有時(shí)候我們可以通過 net/http.putBufio{Reader,Writer} 調(diào)用把緩存釋放回 net/http 里的 sync.Pool。
這樣,這3百萬個(gè)連接又需要額外的24 GB內(nèi)存。
所以,為了這個(gè)什么都不干的程序,我們已經(jīng)占用了72 GB的內(nèi)存!
優(yōu)化
我們來回顧一下前面介紹的用戶連接的工作流程。在建立 WebSocket 之后,客戶端會(huì)發(fā)送請求訂閱相關(guān)事件(我們這里忽略類似 ping/pong 的請求)。接下來,在整個(gè)連接的生命周期里,客戶端可能就不會(huì)發(fā)送任何其它數(shù)據(jù)了。
連接的生命周期可能會(huì)持續(xù)幾秒鐘到幾天。
所以在大部分時(shí)間里,Channel.reader() 和 Channel.writer() 都在等待接收和發(fā)送數(shù)據(jù)。與它們一起等待的是各自分配的4 KB的I/O緩存。
現(xiàn)在,我們發(fā)現(xiàn)有些地方是可以做進(jìn)一步優(yōu)化的,對吧?
◆ Netpoll
你還記得 Channel.reader() 的實(shí)現(xiàn)使用了 bufio.Reader.Read() 嗎?bufio.Reader.Read() 又會(huì)調(diào)用 conn.Read()。這個(gè)調(diào)用會(huì)被阻塞以等待接收連接上的新數(shù)據(jù)。如果連接上有新的數(shù)據(jù),Go 的運(yùn)行環(huán)境(runtime)就會(huì)喚醒相應(yīng)的 goroutine 讓它去讀取下一個(gè) packet。之后,goroutine 會(huì)被再次阻塞來等待新的數(shù)據(jù)。我們來研究下 Go 的運(yùn)行環(huán)境是怎么知道 goroutine需要被喚醒的。
如果我們看一下 conn.Read() 的實(shí)現(xiàn),就會(huì)看到它調(diào)用 net.netFD.Read():
// net/fd_unix.go
func (fd *netFD) Read(p []byte) (n int, err error) {
//...
for {
n, err = syscall.Read(fd.sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.waitRead(); err == nil {
continue
}
}
}
//...
break
}
//...
}
Go 使用了 sockets 的非阻塞模式。EAGAIN 表示 socket 里沒有數(shù)據(jù)了但不會(huì)阻塞在空的 socket 上,OS 會(huì)把控制權(quán)返回給用戶進(jìn)程。
這里它首先對連接文件描述符進(jìn)行 read() 系統(tǒng)調(diào)用。如果 read() 返回的是EAGAIN 錯(cuò)誤,運(yùn)行環(huán)境就是調(diào)用 pollDesc.waitRead():
// net/fd_poll_runtime.go
func (pd *pollDesc) waitRead() error {
return pd.wait('r')
}
func (pd *pollDesc) wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
//...
}
如果繼續(xù)深挖,我們可以看到 netpoll 的實(shí)現(xiàn)在 Linux 里用的是 epoll 而在 BSD 里用的是 kqueue。我們的這些連接為什么不采用類似的方式呢?只有在 socket 上有可讀數(shù)據(jù)時(shí),才分配緩存空間并啟用讀數(shù)據(jù)的 goroutine。
在 github.com/golang/go 上,有一個(gè)關(guān)于開放(exporting)netpoll 函數(shù)的問題。
◆ 干掉 goroutines
假設(shè)我們用 Go 語言實(shí)現(xiàn)了 netpoll。我們現(xiàn)在可以避免創(chuàng) Channel.reader() 的 goroutine,取而代之的是從訂閱連接里收到新數(shù)據(jù)的事件。
ch := NewChannel(conn)
// Make conn to be observed by netpoll instance.
poller.Start(conn, netpoll.EventRead, func() {
// We spawn goroutine here to prevent poller wait loop
// to become locked during receiving packet from ch.
go ch.Receive()
})
// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {
buf := bufio.NewReader(ch.conn)
pkt := readPacket(buf)
c.handle(pkt)
}
Channel.writer() 相對容易一點(diǎn),因?yàn)槲覀冎恍柙诎l(fā)送 packet 的時(shí)候創(chuàng)建 goroutine 并分配緩存。
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
go ch.writer()
}
ch.send <- p
}
注意,這里我們沒有處理 write() 系統(tǒng)調(diào)用時(shí)返回的 EAGAIN。我們依賴 Go 運(yùn)行環(huán)境去處理它。這種情況很少發(fā)生。如果需要的話我們還是可以像之前那樣來處理。
從 ch.send 讀取待發(fā)送的 packets 之后,ch.writer() 會(huì)完成它的操作,最后釋放 goroutine 的棧和用于發(fā)送的緩存。
很不錯(cuò)!通過避免這兩個(gè)連續(xù)運(yùn)行的 goroutine 所占用的 I/O 緩存和棧內(nèi)存,我們已經(jīng)節(jié)省了48 GB。
◆ 控制資源
大量的連接不僅僅會(huì)造成大量的內(nèi)存消耗。在開發(fā)服務(wù)端的時(shí)候,我們還不停地遇到競爭條件(race conditions)和死鎖(deadlocks)。隨之而來的是所謂的自我分布式阻斷攻擊(self-DDOS)。在這種情況下,客戶端會(huì)悍然地嘗試重新連接服務(wù)端而把情況搞得更加糟糕。
舉個(gè)例子,如果因?yàn)槟撤N原因我們突然無法處理 ping/pong 消息,這些空閑連接就會(huì)不斷地被關(guān)閉(它們會(huì)以為這些連接已經(jīng)無效因此不會(huì)收到數(shù)據(jù))。然后客戶端每N秒就會(huì)以為失去了連接并嘗試重新建立連接,而不是繼續(xù)等待服務(wù)端發(fā)來的消息。
在這種情況下,比較好的辦法是讓負(fù)載過重的服務(wù)端停止接受新的連接,這樣負(fù)載均衡器(例如nginx)就可以把請求轉(zhuǎn)到其它的服務(wù)端上去。
撇開服務(wù)端的負(fù)載不說,如果所有的客戶端突然(很可能是因?yàn)槟硞€(gè)bug)向服務(wù)端發(fā)送一個(gè) packet,我們之前節(jié)省的 48 GB 內(nèi)存又將會(huì)被消耗掉。因?yàn)檫@時(shí)我們又會(huì)和開始一樣給每個(gè)連接創(chuàng)建 goroutine 并分配緩存。
Goroutine 池
可以用一個(gè) goroutine 池來限制同時(shí)處理 packets 的數(shù)目。下面的代碼是一個(gè)簡單的實(shí)現(xiàn):
package gopool
func New(size int) *Pool {
return &Pool{
work: make(chan func()),
sem: make(chan struct{}, size),
}
}
func (p *Pool) Schedule(task func()) error {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.worker(task)
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }
for {
task()
task = <-p.work
}
}
我們使用 netpoll 的代碼就變成下面這樣:
pool := gopool.New(128)
poller.Start(conn, netpoll.EventRead, func() {
// We will block poller wait loop when
// all pool workers are busy.
pool.Schedule(func() {
ch.Receive()
})
})
現(xiàn)在我們不僅要等可讀的數(shù)據(jù)出現(xiàn)在 socket 上才能讀 packet,還必須等到從池里獲取到空閑的 goroutine。
同樣的,我們修改下 Send() 的代碼:
pool := gopool.New(128)
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
pool.Schedule(ch.writer)
}
ch.send <- p
}
這里我們沒有調(diào)用 go ch.writer(),而是想重復(fù)利用池里 goroutine 來發(fā)送數(shù)據(jù)。 所以,如果一個(gè)池有 N 個(gè) goroutines 的話,我們可以保證有 N 個(gè)請求被同時(shí)處理。而 N + 1 個(gè)請求不會(huì)分配 N + 1 個(gè)緩存。goroutine 池允許我們限制對新連接的 Accept() 和 Upgrade(),這樣就避免了大部分 DDoS 的情況。
◆ 零拷貝升級(Zero-copy upgrade)
之前已經(jīng)提到,客戶端通過 HTTP 升級(Upgrade)請求切換到 WebSocket協(xié)議。下面顯示的是一個(gè)升級請求:
GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket
我們接收 HTTP 請求和它的頭部只是為了切換到 WebSocket 協(xié)議,而 http.Request 里保存了所有頭部的數(shù)據(jù)。從這里可以得到啟發(fā),如果是為了優(yōu)化,我們可以放棄使用標(biāo)準(zhǔn)的 net/http 服務(wù)并在處理 HTTP 請求的時(shí)候避免無用的內(nèi)存分配和拷貝。
舉個(gè)例子,http.Request 包含了一個(gè)叫做 Header 的字段。標(biāo)準(zhǔn) net/http 服務(wù)會(huì)將請求里的所有頭部數(shù)據(jù)全部無條件地拷貝到 Header 字段里。你可以想象這個(gè)字段會(huì)保存許多冗余的數(shù)據(jù),例如一個(gè)包含很長 cookie 的頭部。
我們?nèi)绾蝸韮?yōu)化呢?
WebSocket 實(shí)現(xiàn)
不幸的是,在我們優(yōu)化服務(wù)端的時(shí)候所有能找到的庫只支持對標(biāo)準(zhǔn) net/http 服務(wù)做升級。而且沒有一個(gè)庫允許我們實(shí)現(xiàn)上面提到的讀和寫的優(yōu)化。為了使這些優(yōu)化成為可能,我們必須有一套底層的 API 來操作 WebSocket。為了重用緩存,我們需要類似下面這樣的協(xié)議函數(shù):
func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error
如果我們有一個(gè)包含這樣 API 的庫,我們就按照下面的方式從連接上讀取 packets:
// getReadBuf, putReadBuf are intended to
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)
// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
buf := getReadBuf()
defer putReadBuf(buf)
buf.Reset(conn)
frame, _ := ReadFrame(buf)
parsePacket(frame.Payload)
//...
}
簡而言之,我們需要自己寫一個(gè)庫。
github.com/gobwas/ws
ws 庫的主要設(shè)計(jì)思想是不將協(xié)議的操作邏輯暴露給用戶。所有讀寫函數(shù)都接受通用的 io.Reader 和 io.Writer 接口。因此它可以隨意搭配是否使用緩存以及其它 I/O 的庫。
除了標(biāo)準(zhǔn)庫 net/http 里的升級請求,ws 還支持零拷貝升級。它能夠處理升級請求并切換到 WebSocket 模式而不產(chǎn)生任何內(nèi)存分配或者拷貝。
ws.Upgrade() 接受 io.ReadWriter(net.Conn 實(shí)現(xiàn)了這個(gè)接口)。換句話說,我們可以使用標(biāo)準(zhǔn)的 net.Listen() 函數(shù)然后把從 ln.Accept() 收到的連接馬上交給 ws.Upgrade() 去處理。庫也允許拷貝任何請求數(shù)據(jù)來滿足將來應(yīng)用的需求(舉個(gè)例子,拷貝 Cookie 來驗(yàn)證一個(gè) session)。
下面是處理升級請求的性能測試:標(biāo)準(zhǔn) net/http 庫的實(shí)現(xiàn)和使用零拷貝升級的 net.Listen():
BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9
allocs/opBenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op
使用 ws 以及零拷貝升級為我們節(jié)省了24 GB的空間。這些空間原本被用做 net/http 里處理請求的 I/O 緩存。
◆ 回顧
讓我們來回顧一下之前提到過的優(yōu)化:
一個(gè)包含緩存的讀 goroutine 會(huì)占用很多內(nèi)存。方案: netpoll(epoll, kqueue);重用緩存。
一個(gè)包含緩存的寫 goroutine 會(huì)占用很多內(nèi)存。方案: 在需要的時(shí)候創(chuàng)建goroutine;重用緩存。
存在大量連接請求的時(shí)候,netpoll 不能很好的限制連接數(shù)。方案: 重用 goroutines 并且限制它們的數(shù)目。
net/http 對升級到 WebSocket 請求的處理不是最高效的。方案: 在 TCP 連接上實(shí)現(xiàn)零拷貝升級。
下面是服務(wù)端的大致實(shí)現(xiàn)代碼:
import (
"net"
"github.com/gobwas/ws"
)
ln, _ := net.Listen("tcp", ":8080")
for {
// Try to accept incoming connection inside free pool worker.
// If there no free workers for 1ms, do not accept anything and try later.
// This will help us to prevent many self-ddos or out of resource limit cases.
err := pool.ScheduleTimeout(time.Millisecond, func() {
conn := ln.Accept()
_ = ws.Upgrade(conn)
// Wrap WebSocket connection with our Channel struct.
// This will help us to handle/send our app's packets.
ch := NewChannel(conn)
// Wait for incoming bytes from connection.
poller.Start(conn, netpoll.EventRead, func() {
// Do not cross the resource limits.
pool.Schedule(func() {
// Read and handle incoming packet(s).
ch.Recevie()
})
})
})
if err != nil {
time.Sleep(time.Millisecond)
}
}
結(jié)論
在程序設(shè)計(jì)時(shí),過早優(yōu)化是萬惡之源。Donald Knuth
上面的優(yōu)化是有意義的,但不是所有情況都適用。舉個(gè)例子,如果空閑資源(內(nèi)存,CPU)與在線連接數(shù)之間的比例很高的話,優(yōu)化就沒有太多意義。當(dāng)然,知道什么地方可以優(yōu)化以及如何優(yōu)化總是有幫助的。
引用
- https://github.com/mailru/easygo
- https://github.com/gobwas/ws
- https://github.com/gobwas/ws-...
- https://github.com/gobwas/htt...
轉(zhuǎn)載|segmentfault
原文鏈接:https://segmentfault.com/a/1190000011162605
Golang 實(shí)戰(zhàn)班第2期火熱報(bào)名進(jìn)行中
招生要求:
有Linux基礎(chǔ),有志于使用 Go 語言做分布式系統(tǒng)編程的人員,想往系統(tǒng)架構(gòu)師方向發(fā)展的同學(xué)。BAT 架構(gòu)師帶你一起飛。
課程內(nèi)容:
Golang入門
Golang程序結(jié)構(gòu)
Golang的基礎(chǔ)數(shù)據(jù)類型
Golang復(fù)合數(shù)據(jù)類型
Golang的函數(shù)
Golang的方法
Golang的接口
Golang的協(xié)程和Channel
Golang基于共享變量的并發(fā)
Golang包和工具
上課模式:網(wǎng)絡(luò)直播班 線下面授班
咨詢報(bào)名聯(lián)系:
QQ(1):979950755 小月
QQ(2):279312229 ada
WeChat : 1902433859 小月
WeChat : 1251743084 小單
開課時(shí)間:10月14日(周六)