http服務(wù)端是從*http.Server.ListenAndServe()開(kāi)始的。Server數(shù)據(jù)結(jié)構(gòu)定義了Handler字段,聲明了http服務(wù)器如何處理(w,*r)
type Server struct {
Handler Handle
}
Handler接口定義:
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
Handler可以自定義實(shí)現(xiàn),或是由上層框架實(shí)現(xiàn)。服務(wù)端監(jiān)聽(tīng)工作,已由net/http標(biāo)準(zhǔn)庫(kù)完成。
首先是外層的監(jiān)聽(tīng)函數(shù)Serve(l)用來(lái)監(jiān)聽(tīng)tcp連接,有連接建立時(shí)accepted,開(kāi)一個(gè)協(xié)程處理請(qǐng)求。服務(wù)器用一個(gè)線程處理多個(gè)客戶端請(qǐng)求,是典型的異步應(yīng)用場(chǎng)景。由于go語(yǔ)言支持協(xié)程,代替了這樣的應(yīng)用場(chǎng)景,go serve(ctx)簡(jiǎn)單的一行代碼,就完成了復(fù)雜的異步調(diào)用完成的功能。并且業(yè)務(wù)邏輯不分節(jié),不需要關(guān)心線程池的調(diào)參。
協(xié)程中執(zhí)行的閉包,主要由以下三塊邏輯:
- 從TCP連接的buf緩沖中讀取請(qǐng)求字節(jié)流,并構(gòu)建Request請(qǐng)求。
- 調(diào)用http.Server.Handler處理請(qǐng)求。
- fnishRequest刷新TCP連接的緩存,完成http響應(yīng)。
Serve(l)獲取連接
sync.Once
Java的兩種實(shí)現(xiàn)
舉一個(gè)例子,JDK7中的concurrentHashmap在構(gòu)建segment時(shí),會(huì)保證構(gòu)建唯一的Segment。是通過(guò)CAS無(wú)鎖操作實(shí)現(xiàn)的:volatile判空,線程中構(gòu)建,CAS賦值。常規(guī)的方法是添加的全局的volatile bool,在構(gòu)造函數(shù)的最后一行利用volatile禁止重排序的語(yǔ)義,標(biāo)記構(gòu)造已完成。并且要對(duì)構(gòu)造語(yǔ)句的集合加鎖,保證其原子性。
//解釋java的思想,用go語(yǔ)言實(shí)現(xiàn)
var done bool = false
func setup(){
a = "hello"
done = true
}
func doprint(){
if !done{
setup()
}
print(a)
}
而go中也有類似的功能,sync.Once,通過(guò)Mutex實(shí)現(xiàn)。功能為保證閉包全局的唯一一次執(zhí)行。
var once sync.Once
once.Do()
lintener
在TCP監(jiān)聽(tīng)中,監(jiān)聽(tīng)線程只有一個(gè)listener,用sync.Once包裹。第個(gè)Serve(l)方法只管理一個(gè)監(jiān)聽(tīng)器。當(dāng)接受到并建立連接之用,為每個(gè)連接單開(kāi)一個(gè)協(xié)程處理。并調(diào)用trackListener方法,將其加入到Server.listeners。如果*Server正在執(zhí)行關(guān)閉,則不進(jìn)行添加。
Serve(l)方法結(jié)束之前,將連接移除。
s.shuttingDown()是一個(gè)標(biāo)志位方法,如果將其置為true,所有新建連接、處理請(qǐng)求等操作,在執(zhí)行之前都會(huì)檢查這個(gè)標(biāo)志位。類似于java的volatile并發(fā)讀可見(jiàn)性語(yǔ)義。s.shutdown標(biāo)志位采用了atomic.LoadInt32(&s.inShutdown)的方式確保其并發(fā)讀一致性。
Context配置
由于go沒(méi)有繼承,常用第一形參來(lái)代替被繼承的對(duì)象。所以這樣go可以利用形參,實(shí)現(xiàn)“多繼承”??偨Y(jié)golang的形參特性:
- 第一形參包含一個(gè)父類實(shí)例,代替繼承
- 支持閉包傳遞,減少繼承使用
- 引用傳遞修改結(jié)果,代替返回值。由于Go本身支持多返回值,這個(gè)特性效果不明顯
- 支持不定參,切片打散傳入
TCP服務(wù)端,通過(guò)向ctx加入“http-serve”:*Server的KV對(duì)來(lái)標(biāo)注。
循還體內(nèi)持續(xù)監(jiān)聽(tīng)
調(diào)用l.Accept()阻塞等待監(jiān)聽(tīng),當(dāng)有連接建立,將返回net.Conn實(shí)例。
檢測(cè)doneChan通道,是一個(gè)結(jié)束監(jiān)聽(tīng)的開(kāi)關(guān)。使能之后遇到,處理完已連接請(qǐng)求之后,將立即結(jié)束監(jiān)聽(tīng)。
TCP連接具有超時(shí)重試機(jī)制,邏輯為:
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
//未連接成功,等待5ms 10ms 20ms...1s不斷重試。
總結(jié)未正確得到TCP連接的情況:
- doneChan開(kāi)關(guān)使能,終止程序。
- 超時(shí)重試。
- 其它錯(cuò)誤,終止程序
最后新服務(wù)包裹net.Conn代表的TCP連接 和 服務(wù)端實(shí)例*Server。并新建一個(gè)協(xié)程處理TCP連接的數(shù)據(jù)包請(qǐng)求。
由于并發(fā)線程非常有限,使用并發(fā)地方式,一個(gè)線程處理一個(gè)連接請(qǐng)求,會(huì)極大限制服務(wù)器處理連接的數(shù)量(個(gè)人測(cè)試,簡(jiǎn)單業(yè)務(wù)邏輯,16G內(nèi)存最大約開(kāi)4000個(gè)線程)。因而產(chǎn)生了異步式編程,只用一個(gè)監(jiān)聽(tīng),處理更大量的連接請(qǐng)求,由于請(qǐng)求是IO密集型操作,通過(guò)異步編程可以有效提高其并發(fā)度。具體執(zhí)行實(shí)際仍交給線程池執(zhí)行。不過(guò)異步編程的缺點(diǎn)也十分明顯,程序邏輯要截成兩段,拿到異步結(jié)果之后的處理邏輯還要再寫一段,同時(shí)程序還需要用并發(fā)哈希臨時(shí)保存未完成的請(qǐng)求,以及異步結(jié)果提前初始化好對(duì)象,等待接收。
有了協(xié)程和閉包,go語(yǔ)言的處理連接請(qǐng)求方法結(jié)構(gòu)十分清晰,只有一行:
go c.serve(ctx)
在TCP層面,為每個(gè)連接單獨(dú)創(chuàng)建協(xié)程處理。在讀請(qǐng)求的完整包并作http協(xié)議層解析,會(huì)使用對(duì)象池,減少GC壓力,提高處理并發(fā)度。
c.serve(ctx)監(jiān)聽(tīng)連接
net.Conn裝飾
連接的數(shù)據(jù)結(jié)構(gòu):
type conn struct {
server *Server
rwc net.Conn
remoteAddr string
tlsState *tls.ConnectionState
cancelCtx context.CancelFunc
werr error
r *connReader
bufr *bufio.Reader
bufw *bufio.Writer
lastMethod string
mu sync.Mutex
hijackedv bool
}
HTTP連接共有三塊內(nèi)容:-
- 上下層:上層Server服務(wù)端,下層的TCP連接 客戶端地址 TLS信息
- 讀寫緩沖區(qū)
- 狀態(tài)信息:最后http請(qǐng)求方式,當(dāng)前請(qǐng)求,當(dāng)前狀態(tài),是否被handler劫持。
注意到讀緩沖區(qū)有兩種數(shù)據(jù)結(jié)構(gòu) 常規(guī)的bufio.Reader和net/http包實(shí)現(xiàn)的connReader:
type connReader struct {
conn *conn
mu sync.Mutex // guards following
hasByte bool
byteBuf [1]byte
cond *sync.Cond
inRead bool
aborted bool // set true before conn.rwc deadline is set to past
remain int64 // bytes remaining
}
除了bufio.Reader緩沖區(qū)之外,net/http實(shí)現(xiàn)的connReader是net.Conn的包裹,為bufio.Reader提供緩沖數(shù)據(jù)。為何實(shí)現(xiàn)兩個(gè)io.Reader?后面的內(nèi)容會(huì)詳述。
serve(ctx)方法,首先在ctx中保存客戶端地址c.rwc.LocalAddr()。
ctx裝飾類emptyCtx cancelCtx timerCtx
Context是一個(gè)可配置生命周期的,被多個(gè)協(xié)程并發(fā)訪問(wèn)的KV存儲(chǔ)結(jié)構(gòu)??稍O(shè)置生命周期,或主動(dòng)釋放(可以被GC)。
其接口為:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
對(duì)于連接的ctx采用如下裝飾,擴(kuò)充其原有的功能
- emptyCtx int類型而非struct{},僅實(shí)現(xiàn)Context接口,供context.Background()調(diào)用創(chuàng)建空ctx。同樣功能的還有context.Todo()
- valueCtx 用于ctx的PUT/GET操作
- cancelCtx,帶有cancelFunc的ctx。
- timerCtx,具有超時(shí)機(jī)制的ctx。
先看ctx最基本的Put Get操作。
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflect.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
type valueCtx struct {
Context
key, val interface{}
}
ctx添加的KV值,直接存在繼承類的新字段中。反射Type有Comparable()方法,可用來(lái)檢查類型是否可以被用來(lái)作key。不同于哈希,加參數(shù)并不檢查key是否已存在。
獲取字段用遞歸查找其所有的KV。有一個(gè)即返回,不檢查全部。
到這里看似這個(gè)數(shù)據(jù)結(jié)構(gòu)和map[]功能并沒(méi)有區(qū)別,實(shí)現(xiàn)卻坡為復(fù)雜。單獨(dú)寫一個(gè)這樣的數(shù)據(jù)結(jié)構(gòu),同樣的KV對(duì)之間多了一層父子關(guān)系。也就是說(shuō)參數(shù)是像壓棧一樣被壓入的,當(dāng)需要一個(gè)元素出棧,所有后壓入的元素也會(huì)跟著出棧。由于KV對(duì)之間存在父子關(guān)系,所以允許相同的key存在。意義在于其concelFunc:
Calling the CancelFunc cancels the child and its children, removes the parent's reference to the child, and stops any associated timers.
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
canceler接口的實(shí)現(xiàn)類就是*cancelCtx and *timerCtx。
創(chuàng)建方法:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
其中Canceled是個(gè)構(gòu)建error對(duì)象,為打印的信息。方法返回cancelCtx和取消ctx的閉包。propagateCancel方法功能:向上面的parent找一個(gè)cancelCtx,如果這個(gè)cancelCtx已經(jīng)取消,則當(dāng)前ctx也取消。否則加入p.children[]添加關(guān)聯(lián),等待被取消??傊δ転椋蛏辖⒏缸雨P(guān)系,這種關(guān)系是cancelCtx之間的。
cancelCtx.Done()功能:加鎖創(chuàng)建cancelCtx.done通道
cancel.calcel()功能:關(guān)閉c.done通道,遞歸調(diào)用所有c.children.cancel(),釋放c.children = nil
c.child的父子關(guān)系,并不是相鄰的valueCtx繼承關(guān)系,而是相鄰的cancelCtx之間的關(guān)系。
再來(lái)看看timerCtx:
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
可以配置截止日期。使用了time.AfterFunc(dur, func() )到期后,自動(dòng)清理。
AfterFunc waits for the duration to elapse and then calls f in its own goroutine. It returns a Timer that can be used to cancel the call using its Stop method.
最后利用以上代碼,直接實(shí)現(xiàn)最常用的為ctx添加超時(shí)機(jī)制。
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
總結(jié):這種數(shù)據(jù)結(jié)構(gòu)為大量的KV配置信息,提供批量釋放的功能。功能類似于etcd為一組相似的配置信息,配置相同的租約。最終使用GC更為高效,提高內(nèi)存利用率。
connReader bufio.Reader bufio.Writer
c.bufw是池化的,且指定了c.bufw.buf的數(shù)組大小為4<<10。并且根據(jù)size大小,分為bufioWriter2kPool和bufioWriter4kPool。
bufio.Reader的最小池大小為4M字節(jié)。
讀取并解析請(qǐng)求
完成以上配置之后循還獲取數(shù)據(jù),并解析為空的Response:
w, err := c.readRequest(ctx)
遵守RFC 7231 5.1.1作Expect校驗(yàn)
如果請(qǐng)求Header中有"Expect":"100-continue"則繼續(xù),否則關(guān)閉http連接。
For now we'll just obey RFC 7231 5.1.1 which says "A server that receives an Expect field-value other than 100-continue MAY respond with a 417 (Expectation Failed) status code to indicate that the unexpected expectation cannot be met."
結(jié)束請(qǐng)求的工作:
w.Header().Set("Connection", "close")
w.WriteHeader(StatusExpectationFailed) //返回417狀態(tài)碼
w.finishRequest() //刷新緩存相關(guān)操作
注意到req.Body是包裹了Response的。
為req.Body添加hitEof的后續(xù)處理函數(shù)
先上body數(shù)據(jù)結(jié)構(gòu):
type body struct {
src io.Reader
hdr interface{} // non-nil (Response or Request) value means read trailer
r *bufio.Reader // underlying wire-format reader for the trailer
closing bool // is the connection to be closed after reading body?
doEarlyClose bool // whether Close should stop early
mu sync.Mutex // guards following, and calls to Read and Close
sawEOF bool
closed bool
earlyClose bool // Close called and we didn't read to the end of src
onHitEOF func() // if non-nil, func to call when EOF is Read
}
再看connReader的數(shù)據(jù)結(jié)構(gòu):
type connReader struct {
conn *conn
mu sync.Mutex // guards following
hasByte bool
byteBuf [1]byte
cond *sync.Cond
inRead bool
aborted bool // set true before conn.rwc deadline is set to past
remain int64 // bytes remaining
}
先通過(guò)b.sawEOF判斷是否讀完了req.Body。讀完req.Body之后,調(diào)用w.conn.r.startBackgroundRead函數(shù),需要加鎖。
cr.inRead標(biāo)志位判斷body是否被并發(fā)讀,原則上不支持并發(fā)讀。
cr.aborted終止讀取,并設(shè)置連接截止時(shí)間為很久以前aLongTimeAgo,意圖終止連接。
startBackgroundRead方法僅讀取1個(gè)byte,忽略掉連接超時(shí)、網(wǎng)絡(luò)錯(cuò)誤、aborted==true的錯(cuò)誤,報(bào)錯(cuò)則刪除終止請(qǐng)求,即向res.closeNotifyCh寫入true。
調(diào)用handler處理請(qǐng)求
釋放ctx
調(diào)用cancelCtx的方法,釋放ctx及其子節(jié)點(diǎn)的空間。
重用連接的處理
重用連接有兩個(gè)條件:s.disableKeepAlive==0 并且s.inShutdown==0,通過(guò)在*Server服務(wù)端參數(shù)設(shè)置。
還可以在請(qǐng)求中配置重用連接,與上一個(gè)條件都達(dá)到連接才是可重用的。同時(shí)滿足的條件為:
- 響應(yīng)頭"Connection: keep-alive"
- 如果有響應(yīng)body,必須要寫完。即請(qǐng)求方法非HEAD,且響應(yīng)contentLength非-1,且寫入body的類型符合的情況下,響應(yīng)contentLength寫入的字節(jié)數(shù)與resp.written相同
- 沒(méi)有發(fā)生body.earlyClose錯(cuò)誤,也主是未讀完req.Body的情況
- 沒(méi)有發(fā)生寫入響應(yīng)錯(cuò)誤
所以綜上所述,重用連接共有1,2,3共計(jì)6個(gè)條件,一個(gè)是通過(guò)客戶端在Header中配置,其中兩個(gè)是服務(wù)端的配置參數(shù),其余三個(gè)條件是確保沒(méi)有傳輸錯(cuò)誤。
配置idleTimeout
將Server.idelTimeout()配置到conn.SetReadDeadline()。如果因超時(shí)未完成讀取req.Body,或向連接寫入response。其處理邏輯定位至上文“重用連接的處理”。
至此完成了一次http的請(qǐng)求處理流程。其中readRequest讀包并解析 finishRequest刷緩存寫包在后面繼續(xù)更新。而serverHandler.ServeHTTP()是一上層mvc框架的復(fù)雜邏輯實(shí)現(xiàn),單獨(dú)寫一個(gè)集合更新。