項(xiàng)目中的消息通知用到了websocket,感覺比http長連接分塊發(fā)送好用,特此記錄一下。
WebSocket協(xié)議用ws表示。此外,還有wss協(xié)議,表示加密的WebSocket協(xié)議,對(duì)應(yīng)HTTPs協(xié)議。
完成握手以后,WebSocket協(xié)議就在TCP協(xié)議之上,開始傳送數(shù)據(jù)
websocket原理及運(yùn)行機(jī)制
WebSocket是HTML5下一種新的協(xié)議。它實(shí)現(xiàn)了瀏覽器與服務(wù)器全雙工通信,能更好的節(jié)省服務(wù)器資源和帶寬并達(dá)到實(shí)時(shí)通訊的目的。它與HTTP一樣通過已建立的TCP連接來傳輸數(shù)據(jù),但是它和HTTP最大不同是:WebSocket是一種雙向通信協(xié)議。在建立連接后,WebSocket服務(wù)器端和客戶端都能主動(dòng)向?qū)Ψ桨l(fā)送或接收數(shù)據(jù),就像Socket一樣;WebSocket需要像TCP一樣,先建立連接,連接成功后才能相互通信。傳統(tǒng)HTTP客戶端與服務(wù)器請(qǐng)求響應(yīng)模式如下圖所示:

WebSocket模式客戶端與服務(wù)器請(qǐng)求響應(yīng)模式如下圖:

上圖對(duì)比可以看出,==相對(duì)于傳統(tǒng)HTTP每次請(qǐng)求-應(yīng)答都需要客戶端與服務(wù)端建立連接的模式,WebSocket是類似Socket的TCP長連接通訊模式。一旦WebSocket連接建立后,后續(xù)數(shù)據(jù)都以幀序列的形式傳輸。在客戶端斷開WebSocket連接或Server端中斷連接前,不需要客戶端和服務(wù)端重新發(fā)起連接請(qǐng)求==。在海量并發(fā)及客戶端與服務(wù)器交互負(fù)載流量大的情況下,極大的節(jié)省了網(wǎng)絡(luò)帶寬資源的消耗,有明顯的性能優(yōu)勢,且客戶端發(fā)送和接受消息是在同一個(gè)持久連接上發(fā)起,實(shí)時(shí)性優(yōu)勢明顯。
相比HTTP長連接,WebSocket有以下特點(diǎn):
是真正的全雙工方式,建立連接后客戶端與服務(wù)器端是完全平等的,可以互相主動(dòng)請(qǐng)求。而HTTP長連接基于HTTP,是傳統(tǒng)的客戶端對(duì)服務(wù)器發(fā)起請(qǐng)求的模式。HTTP長連接中,每次數(shù)據(jù)交換除了真正的數(shù)據(jù)部分外,服務(wù)器和客戶端還要大量交換HTTP header,信息交換效率很低。
Websocket協(xié)議通過第一個(gè)request建立了TCP連接之后,之后交換的數(shù)據(jù)都不需要發(fā)送 HTTP header就能交換數(shù)據(jù),這顯然和原有的HTTP協(xié)議有區(qū)別所以它需要對(duì)服務(wù)器和客戶端都進(jìn)行升級(jí)才能實(shí)現(xiàn)(主流瀏覽器都已支持HTML5)。
此外還有 multiplexing、不同的URL可以復(fù)用同一個(gè)WebSocket連接等功能。這些都是HTTP長連接不能做到的。
連接建立后定期的心跳檢測
在客戶端,new WebSocket實(shí)例化一個(gè)新的WebSocket客戶端對(duì)象,請(qǐng)求類似 ws://yourdomain:port/path 的服務(wù)端WebSocket URL,客戶端WebSocket對(duì)象會(huì)自動(dòng)解析并識(shí)別為WebSocket請(qǐng)求,并連接服務(wù)端端口,執(zhí)行雙方握手過程,客戶端發(fā)送數(shù)據(jù)格式類似:
GET /webfin/websocket/ HTTP/1.1
Host: localhost
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: xqBt3ImNzJbYqRINxEFlkg==
Origin: http://localhost:8080
Sec-WebSocket-Version: 13
可以看到,客戶端發(fā)起的WebSocket連接報(bào)文類似傳統(tǒng)HTTP報(bào)文
Upgrade:websocket參數(shù)值表明這是WebSocket類型請(qǐng)求,
Sec-WebSocket-Key是WebSocket客戶端發(fā)送的一個(gè) base64編碼的密文,要求服務(wù)端必須返回一個(gè)對(duì)應(yīng)加密的Sec-WebSocket-Accept應(yīng)答,否則客戶端會(huì)拋出Error during WebSocket handshake錯(cuò)誤,并關(guān)閉連接。
Upgrade: websocket
Connection: Upgrade
這個(gè)就是Websocket的核心了,告訴Apache、Nginx等服務(wù)器進(jìn)行協(xié)議轉(zhuǎn)換
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
首先,Sec-WebSocket-Key 是一個(gè)Base64 encode的值,這個(gè)是瀏覽器隨機(jī)生成的,告訴服務(wù)器驗(yàn)證websocket協(xié)議。
然后,Sec_WebSocket-Protocol 是一個(gè)用戶定義的字符串,用來區(qū)分同URL下,不同的服務(wù)所需要的協(xié)議。
服務(wù)端收到報(bào)文后返回的數(shù)據(jù)格式類似:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: K7DJLdLooIwIG/MOpvWFB3y3FE8=
Sec-WebSocket-Accept
的值是服務(wù)端采用與客戶端一致的密鑰計(jì)算出來后返回客戶端的
HTTP/1.1 101 Switching Protocols表示服務(wù)端接受WebSocket協(xié)議的客戶端連接,經(jīng)過這樣的請(qǐng)求-響應(yīng)處理后,兩端的WebSocket連接握手成功, 后續(xù)就可以進(jìn)行TCP通訊了。用戶可以查閱WebSocket協(xié)議棧了解WebSocket客戶端和服務(wù)端更詳細(xì)的交互數(shù)據(jù)格式。
在開發(fā)方面,WebSocket API 也十分簡單:只需要實(shí)例化 WebSocket,創(chuàng)建連接,然后服務(wù)端和客戶端就可以相互發(fā)送和響應(yīng)消息。在WebSocket 實(shí)現(xiàn)及案例分析部分可以看到詳細(xì)的 WebSocket API 及代碼實(shí)現(xiàn)。

golang中的websokect
github.com/gorilla/websocket
項(xiàng)目中主要使用 github.com/gorilla/websocket 這個(gè)包。
通過上面對(duì)websocket原理的描述可以知道,http到websocket有一個(gè)協(xié)議轉(zhuǎn)換的過程,重點(diǎn)關(guān)注 Upgrade服務(wù)端協(xié)議轉(zhuǎn)換函數(shù)。
// Upgrade upgrades the HTTP server connection to the WebSocket protocol.
//
// The responseHeader is included in the response to the client's upgrade
// request. Use the responseHeader to specify cookies (Set-Cookie) and the
// application negotiated subprotocol (Sec-Websocket-Protocol).
//
// If the upgrade fails, then Upgrade replies to the client with an HTTP error
// response.
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error) {
if r.Method != "GET" {
return u.returnError(w, r, http.StatusMethodNotAllowed, "websocket: not a websocket handshake: request method is not GET")
}
if _, ok := responseHeader["Sec-Websocket-Extensions"]; ok {
return u.returnError(w, r, http.StatusInternalServerError, "websocket: application specific 'Sec-Websocket-Extensions' headers are unsupported")
}
if !tokenListContainsValue(r.Header, "Connection", "upgrade") {
return u.returnError(w, r, http.StatusBadRequest, "websocket: not a websocket handshake: 'upgrade' token not found in 'Connection' header")
}
if !tokenListContainsValue(r.Header, "Upgrade", "websocket") {
return u.returnError(w, r, http.StatusBadRequest, "websocket: not a websocket handshake: 'websocket' token not found in 'Upgrade' header")
}
if !tokenListContainsValue(r.Header, "Sec-Websocket-Version", "13") {
return u.returnError(w, r, http.StatusBadRequest, "websocket: unsupported version: 13 not found in 'Sec-Websocket-Version' header")
}
checkOrigin := u.CheckOrigin
if checkOrigin == nil {
checkOrigin = checkSameOrigin
}
if !checkOrigin(r) {
return u.returnError(w, r, http.StatusForbidden, "websocket: 'Origin' header value not allowed")
}
challengeKey := r.Header.Get("Sec-Websocket-Key")
if challengeKey == "" {
return u.returnError(w, r, http.StatusBadRequest, "websocket: not a websocket handshake: `Sec-Websocket-Key' header is missing or blank")
}
subprotocol := u.selectSubprotocol(r, responseHeader)
// Negotiate PMCE
var compress bool
if u.EnableCompression {
for _, ext := range parseExtensions(r.Header) {
if ext[""] != "permessage-deflate" {
continue
}
compress = true
break
}
}
var (
netConn net.Conn
err error
)
h, ok := w.(http.Hijacker)
if !ok {
return u.returnError(w, r, http.StatusInternalServerError, "websocket: response does not implement http.Hijacker")
}
var brw *bufio.ReadWriter
netConn, brw, err = h.Hijack()
if err != nil {
return u.returnError(w, r, http.StatusInternalServerError, err.Error())
}
if brw.Reader.Buffered() > 0 {
netConn.Close()
return nil, errors.New("websocket: client sent data before handshake is complete")
}
c := newConnBRW(netConn, true, u.ReadBufferSize, u.WriteBufferSize, brw)
c.subprotocol = subprotocol
if compress {
c.newCompressionWriter = compressNoContextTakeover
c.newDecompressionReader = decompressNoContextTakeover
}
p := c.writeBuf[:0]
p = append(p, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "...)
p = append(p, computeAcceptKey(challengeKey)...)
p = append(p, "\r\n"...)
if c.subprotocol != "" {
p = append(p, "Sec-Websocket-Protocol: "...)
p = append(p, c.subprotocol...)
p = append(p, "\r\n"...)
}
if compress {
p = append(p, "Sec-Websocket-Extensions: permessage-deflate; server_no_context_takeover; client_no_context_takeover\r\n"...)
}
for k, vs := range responseHeader {
if k == "Sec-Websocket-Protocol" {
continue
}
for _, v := range vs {
p = append(p, k...)
p = append(p, ": "...)
for i := 0; i < len(v); i++ {
b := v[i]
if b <= 31 {
// prevent response splitting.
b = ' '
}
p = append(p, b)
}
p = append(p, "\r\n"...)
}
}
p = append(p, "\r\n"...)
// Clear deadlines set by HTTP server.
netConn.SetDeadline(time.Time{})
if u.HandshakeTimeout > 0 {
netConn.SetWriteDeadline(time.Now().Add(u.HandshakeTimeout))
}
if _, err = netConn.Write(p); err != nil {
netConn.Close()
return nil, err
}
if u.HandshakeTimeout > 0 {
netConn.SetWriteDeadline(time.Time{})
}
return c, nil
}
通過該函數(shù)可以看到大致流程:
- 判斷請(qǐng)求方法是否為GET,不是GET則為非法握手方法
- 根據(jù)client的請(qǐng)求頭信息,確認(rèn)升級(jí)協(xié)議
- 校驗(yàn)跨域
- 填充響應(yīng)頭,響應(yīng)返回客戶端,鏈接建立
具體實(shí)現(xiàn)
Server端
主要采用Upgrade函數(shù)進(jìn)行協(xié)議轉(zhuǎn)換。指定了ReadBufferSize、WriteBufferSize、HandshakeTimeout參數(shù),同時(shí)跨域叫為采用默認(rèn)校驗(yàn)函數(shù),自定義的校驗(yàn)函數(shù)總是返回true跳過了跨域校驗(yàn)
//controller
type MyWebSocketController struct {
beego.Controller
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
HandshakeTimeout: 5 * time.Second,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func (c *MyWebSocketController) Get() {
ws, err := upgrader.Upgrade(c.Ctx.ResponseWriter, c.Ctx.Request, nil)
if err != nil {
log.Fatal(err)
}
socket.Clients.Set(ws, true)
_, body, _ := ws.ReadMessage()
msg := socket.Message{Message: string(body)}
socket.Broadcast <- msg
}
消息處理及轉(zhuǎn)發(fā)
var (
Clients = make(map[*websocket.Conn]bool, 1024)
Broadcast = make(chan Message, 1024)
)
type Message struct {
Message string `json:"message"`
}
func init() {
go handleMessages()
}
//廣播發(fā)送至頁面
func handleMessages() {
for {
msg := <-Broadcast
for client := range Clients {
err := client.WriteJSON(msg)
if err != nil {
client.Close()
delete(Clients, client)
}
}
}
}
路由注冊(cè)(采用beego的注解式路由無法完成協(xié)議轉(zhuǎn)換,具體原因還未找到)
beego.Router("/ws", ¬iceMq.MyWebSocketController{})
go client
采用golang自帶的golang.org/x/net/websocket包發(fā)送消息
package websocket
import (
"net/url"
"github.com/astaxie/beego"
"golang.org/x/net/websocket"
)
type Client struct {
Host string
Path string
}
func NewWebsocketClient(host, path string) *Client {
return &Client{
Host: host,
Path: path,
}
}
func (this *Client) SendMessage(body []byte) error {
u := url.URL{Scheme: "ws", Host: this.Host, Path: this.Path}
ws, err := websocket.Dial(u.String(), "", "http://"+this.Host+"/")
defer ws.Close() //關(guān)閉連接
if err != nil {
beego.Error(err)
return err
}
_, err = ws.Write(body)
if err != nil {
beego.Error(err)
return err
}
return nil
}
js client
目前主流瀏覽器都支持WebSocket協(xié)議(包括IE 10+)
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8" />
<title>Sample of websocket with golang</title>
<script src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>
<script>
$(function() {
var ws = new WebSocket('ws://api.mdevo.com/ws');
ws.onopen = function(e) {
$('<li>').text("connected").appendTo($ul);
}
ws.onmessage = function(e) {
$('<li>').text(event.data).appendTo($ul);
};
var $ul = $('#msg-list');
});
</script>
</head>
<body>
<ul id="msg-list"></ul>
</body>
</html>