func serveRequest(s *Server, responsesChan chan<- *serverMessage, stopChan <-chan struct{}, m *serverMessage, workersCh <-chan struct{}) {
request := m.Request
m.Request = nil
clientAddr := m.ClientAddr
m.ClientAddr = ""
skipResponse := (m.ID == 0)
if skipResponse {
m.Response = nil
m.Error = ""
s.Stats.incRPCCalls()
serverMessagePool.Put(m)
}
t := time.Now()
response, err := callHandlerWithRecover(s.LogError, s.Handler, clientAddr, s.Addr, request)
s.Stats.incRPCTime(uint64(time.Since(t).Seconds() * 1000))
if !skipResponse {
m.Response = response
m.Error = err
// Select hack for better performance.
// See https://github.com/valyala/gorpc/pull/1 for details.
select {
case responsesChan <- m:
default:
select {
case responsesChan <- m:
case <-stopChan:
}
}
}
<-workersCh
}
- 消息分為兩種,- ID不為0的時(shí)候,即!skipResponse的時(shí)候,將處理后的消息放入responsesChan交由serverWriter處理,返回給client
- ID為0的標(biāo)記為skipResponse,并放回消息池中。這種類似于不需要回應(yīng)的rpc函數(shù)調(diào)用。
- 具體的處理函數(shù)如下,調(diào)用handler(s.Handler)并附加了一層崩潰記錄恢復(fù)。
- s.Handler可以實(shí)現(xiàn)類似于函數(shù)字典,提供函數(shù)注冊和函數(shù)調(diào)用的功能。
- workersCh實(shí)際上是對worker計(jì)數(shù),所以結(jié)束的時(shí)候計(jì)數(shù)-1
func callHandlerWithRecover(logErrorFunc LoggerFunc, handler HandlerFunc, clientAddr, serverAddr string, request interface{}) (response interface{}, errStr string) {
defer func() {
if x := recover(); x != nil {
stackTrace := make([]byte, 1<<20)
n := runtime.Stack(stackTrace, false)
errStr = fmt.Sprintf("Panic occured: %v\nStack trace: %s", x, stackTrace[:n])
logErrorFunc("gorpc.Server: [%s]->[%s]. %s", clientAddr, serverAddr, errStr)
}
}()
response = handler(clientAddr, request)
return
}
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。