本項目地址:gof 一個支持百萬連接的websocket框架
本文提及的內(nèi)容包含在:conn.go
一、新連接內(nèi)容接收的流程
1、通過epoll對象獲取新連接
我們需要通過epoll對象的wait方法來獲取到客戶端的連接,在第一次獲取的時候,其內(nèi)容是通過GlobalFd傳輸?shù)?,只有我們獲取到連接句柄,并加入到epoll對象中之后,以后才可以接收到該連接自己發(fā)送的消息。
2、解析頭消息內(nèi)容
websocket連接在獲取到連接之后,其首先發(fā)送的一定是頭信息,如果沒有頭信息,我們可以直接關(guān)閉socket連接,拒絕客戶端的訪問。
3、回應(yīng)客戶端頭消息
接收到客戶端的header頭信息之后,我們應(yīng)該對其進行解析,并判斷該header是否有用,并返回相應(yīng)的數(shù)據(jù),從而建立連接,進而通過連接去收發(fā)消息。
二、接收句柄消息
我們通過實例化EpollObj對象,可以得到一個Epoll模型,然后我們通過其中的wait方法可以接收句柄消息。
下面我們說一下第一種情況,也就是如果Epoll模型給我們的消息內(nèi)容是GlobalFd傳送過來的,這時候我們的整個接收流程。也就是我們上一章節(jié)中提到的,eWait方法中的handle
1、判斷是否有新連接加入
// @Author WangKan
// @Description //當(dāng)wait方法取到內(nèi)容后,會回調(diào)此方法,對fd進行處理
// @Date 2021/2/2 21:39
func (s *Server) handler(fd int, connType ConnStatus) {
switch connType {
case CONN_NEW:
newFd := s.addConn(fd)
//Upgrader to http header
s.handShaker(newFd)
//s.messageChan<-newFd
case CONN_MESSAGE:
Log.Info("接收到描述符為%v的消息", fd)
c, ok := s.conns.Load(fd)
if !ok {
Log.Info("描述符fd 為 %d 的s.conns 不存在!", fd)
return
}
s.receiveFdBytes <- c.(*Conn)
default:
panic("no connType")
}
}
我們只需要判斷connType 就可以看到是新連接加入,還是之前的連接再次發(fā)送的消息。
如果是新連接的加入,我們首先應(yīng)該做的處理就是從系統(tǒng)中讀取到該連接的句柄,并取出其中的內(nèi)容,進行解析。也就是 s.addConn方法。
// @Author WangKan
// @Description //如果有新的連接,就取出系統(tǒng)中的fd,添加到當(dāng)前的conns中。
// @Date 2021/2/2 21:37
func (s *Server) addConn(fd int) (newFd int) {
newFd, _, err := syscall.Accept(fd)
fmt.Printf("系統(tǒng)描述符新建的鏈接:%+v\n", newFd)
if err != nil {
fmt.Println("accept err: ", err)
return
}
//設(shè)置fd為非阻塞
if err := syscall.SetNonblock(newFd, true); err != nil {
os.Exit(1)
}
//把這個鏈接加入到epoll中
s.ep.eAdd(newFd)
return
}
在該方法中,傳入的參數(shù)是GlobalFd,這個時候首先我們應(yīng)該調(diào)用syscall.Accept方法找到新加入的句柄,然后將該句柄設(shè)置為非阻塞模式,從而添加到epoll模型中去進行管理。
2、讀取新連接中的Header
當(dāng)我們解析到了新的連接句柄之后,我們需要對該句柄中的內(nèi)容進行解析,首先我們應(yīng)該獲取其中的內(nèi)容:
// utils.go
// @Author WangKan
// @Description //獲取客戶端發(fā)送的Header頭
// @Date 2021/2/24 15:13
// @Param
// @return
func GetHeaderContent(fd int) (string,int) {
for{
var buf [1024]byte
nbytes, _ := syscall.Read(fd, buf[:])
if nbytes > 0 {
return string(buf[:]),nbytes
}
}
}
通過調(diào)用syscall.Read()方法,我們可以將連接中的內(nèi)容取出,取出之后是一個[]byte類型,然后我們轉(zhuǎn)換為string,并將該內(nèi)容返回給調(diào)用方。
3、為Header進行解析
在獲取到Header頭之后,我們需要解析header頭的內(nèi)容。在本項目中,是先將Header信息變成一個Map,然后遍歷該map進行判斷的。
具體代碼為:
// @Author WangKan
// @Description //將Header頭轉(zhuǎn)換為map
// @Date 2021/2/24 15:13
// @Param
// @return
func FormatHeader(buf string,length int)(map[string]string){
var header =make(map[string]string)
str:=""
index:=0
for i:=0;i<length;i++{
if buf[i]==13{
if index == 0 {
arr:=strings.Split(str," ")
fmt.Println(arr)
header["Method"]=arr[0]
str=""
index++
continue
}
if str != ""{
arr:=strings.Split(str,": ")
header[arr[0]]=arr[1]
str=""
}
}else if buf[i]==10{
continue
}else{
str += string(buf[i])
}
}
return header
}
通過上述的代碼,我們最終返回給調(diào)用方一個字符串類型的map,然后,我們對其中的關(guān)鍵信息進行判斷,判斷的代碼來源于:gorilla/websocket/server.go
func (u *Upgrader) Upgrade(fd int, header map[string]string, s *Server) (*Conn, error) {
const badHandshake = "websocket: the client is not using the websocket protocol: "
if header["Connection"] != "Upgrade" && header["Connection"] != "upgrade" {
return u.returnError(http.StatusBadRequest, badHandshake+"'upgrade' token not found in 'Connection' header")
}
if header["Upgrade"] != "websocket" {
return u.returnError(http.StatusBadRequest, badHandshake+"'websocket' token not found in 'Upgrade' header")
}
if header["Method"] != "GET" {
return u.returnError(http.StatusMethodNotAllowed, badHandshake+"request method is not GET")
}
if header["Sec-WebSocket-Version"] != "13" {
return u.returnError(http.StatusBadRequest, "websocket: unsupported version: 13 not found in 'Sec-Websocket-Version' header")
}
//if _, ok := header["Sec-WebSocket-Extensions"]; ok {
// return u.returnError(http.StatusInternalServerError, "websocket: application specific 'Sec-WebSocket-Extensions' headers are unsupported")
//}
challengeKey := header["Sec-WebSocket-Key"]
if challengeKey == "" {
return u.returnError(http.StatusBadRequest, "websocket: not a websocket handshake: 'Sec-WebSocket-Key' header is missing or blank")
}
c := newConn(fd, s)
// Use larger of hijacked buffer and connection write buffer for header.
wf := []byte{}
wf = append(wf, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "...)
wf = append(wf, computeAcceptKey(challengeKey)...)
wf = append(wf, "\r\n"...)
wf = append(wf, "\r\n"...)
c.handShake <- Message{
MessageType: -1,
Content: wf,
}
return c, nil
}
其中精簡了一部分代碼,是由于一些配置項我們還沒有用到。
通過這個方法,我們最終返回的是wf,就是需要傳送給客戶端的頭信息。
然后我們通過調(diào)用 syscall.Write()方法,就可以將這些內(nèi)容寫入對應(yīng)的客戶端句柄中,系統(tǒng)會將內(nèi)容自動推送給客戶端。
//server.go
n, err := syscall.Write(fd, heade.Content)