理論知識(shí)可以參考
網(wǎng)絡(luò)信息怎么在網(wǎng)線中傳播的 (轉(zhuǎn)載自知乎)
Android 網(wǎng)絡(luò)(一) 概念 TCP/IP Socket Http Restful
腦殘式網(wǎng)絡(luò)編程入門(一):跟著動(dòng)畫來學(xué)TCP三次握手和四次揮手
腦殘式網(wǎng)絡(luò)編程入門(二):我們?cè)谧x寫Socket時(shí),究竟在讀寫什么?
TCP 粘包問題淺析及其解決方案,這個(gè)帖子里大家一頓噴粘包這個(gè)叫法
我工作五年的時(shí)候也不知道 “TCP 粘包”,繼續(xù)吐槽
一、API
1.服務(wù)端通過Listen加Accept
package main
import (
"fmt"
"net"
"os"
"time"
)
func main() {
//通過 ResolveTCPAddr 獲取一個(gè) TCPAddr
//ResolveTCPAddr(net, addr string) (*TCPAddr, os.Error)
//net參數(shù)是"tcp4"、"tcp6"、"tcp"中的任意一個(gè),
//分別表示 TCP(IPv4-only),TCP(IPv6-only)
//或者 TCP(IPv4,IPv6 的任意一個(gè))
//addr 表示域名或者IP地址,
//例如"www.google.com:80" 或者"127.0.0.1:22".
service := ":7777"
tcpAddr, err := net.ResolveTCPAddr("tcp4", service)
checkError(err)
//ListenTCP(net string, laddr *TCPAddr) (l *TCPListener, err os.Error)
listener, err := net.ListenTCP("tcp", tcpAddr)
checkError(err)
//func (l *TCPListener) Accept() (c Conn, err os.Error)
for {
conn, err := listener.Accept()
if err != nil {
continue
}
daytime := time.Now().String()
// don't care about return value
conn.Write([]byte(daytime))
// we're finished with this client
conn.Close()
}
}
func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}
上面的服務(wù)跑起來之后,它將會(huì)一直在那里等待,直到有新的客戶端請(qǐng)求到達(dá)。當(dāng)有新的客戶端請(qǐng)求到達(dá)并同意接受 Accept 該請(qǐng)求的時(shí)候他會(huì)反饋當(dāng)前的時(shí)間信息。值得注意的是,在代碼中 for 循環(huán)里,當(dāng)有錯(cuò)誤發(fā)生時(shí),直接 continue而不是退出,是因?yàn)樵诜?wù)器端跑代碼的時(shí)候,當(dāng)有錯(cuò)誤發(fā)生的情況下最好是由服務(wù)端記錄錯(cuò)誤,然后當(dāng)前連接的客戶端直接報(bào)錯(cuò)而退出,從而不會(huì)影響到當(dāng)前服務(wù)端運(yùn)行的整個(gè)服務(wù)。
上面的代碼有個(gè)缺點(diǎn),執(zhí)行的時(shí)候是單任務(wù)的,不能同時(shí)接收多個(gè)請(qǐng)求,那么該如何改造以使它支持多并發(fā)呢?
...
for {
conn, err := listener.Accept()
if err != nil {
continue
}
go handlerClient(conn)
}
...
func handleClient(conn net.Conn) {
defer conn.Close()
daytime := time.Now().String()
// don't care about return value
conn.Write([]byte(daytime))
// we're finished with this client
}
...
2.客戶端直接調(diào)用 Dial
package main
import (
"fmt"
"io/ioutil"
"net"
"os"
)
func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s host:port ", os.Args[0])
os.Exit(1)
}
service := os.Args[1]
tcpAddr, err := net.ResolveTCPAddr("tcp4", service)
checkError(err)
conn, err := net.DialTCP("tcp", nil, tcpAddr)
checkError(err)
_, err = conn.Write([]byte("HEAD / HTTP/1.0\r\n\r\n"))
checkError(err)
result, err := ioutil.ReadAll(conn)
checkError(err)
fmt.Println(string(result))
os.Exit(0)
}
func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}
首先程序?qū)⒂脩舻妮斎胱鳛閰?shù) service 傳入net.ResolveTCPAddr 獲取一個(gè) tcpAddr,然后把 tcpAddr 傳入 DialTCP 后創(chuàng)建了一個(gè) TCP連接 conn ,通過 conn 來發(fā)送請(qǐng)求信息,最后通過 ioutil.ReadAll 從 conn 中讀取全部的文本,也就是服務(wù)端響應(yīng)反饋的信息。
二、實(shí)現(xiàn)一個(gè)可以接受不同命令的服務(wù)端
參考使用 Go 進(jìn)行 Socket 編程
我們實(shí)現(xiàn)一個(gè)服務(wù)端, 它可以接受下面這些命令:
- ping 探活的命令, 服務(wù)端會(huì)返回 “pong”
- echo 服務(wù)端會(huì)返回收到的字符串
- quit 服務(wù)端收到這個(gè)命令后就會(huì)關(guān)閉連接
具體的服務(wù)端代碼如下所示:
package main
import (
"fmt"
"net"
"strings"
)
func connHandler(c net.Conn) {
if c == nil {
return
}
buf := make([]byte, 4096)
for {
cnt, err := c.Read(buf)
if err != nil || cnt == 0 {
c.Close()
break
}
inStr := strings.TrimSpace(string(buf[0:cnt]))
inputs := strings.Split(inStr, " ")
switch inputs[0] {
case "ping":
c.Write([]byte("pong\n"))
case "echo":
echoStr := strings.Join(inputs[1:], " ") + "\n"
c.Write([]byte(echoStr))
case "quit":
c.Close()
break
default:
fmt.Printf("Unsupported command: %s\n", inputs[0])
}
}
fmt.Printf("Connection from %v closed. \n", c.RemoteAddr())
}
func main() {
server, err := net.Listen("tcp", ":1208")
if err != nil {
fmt.Printf("Fail to start server, %s\n", err)
}
fmt.Println("Server Started ...")
for {
conn, err := server.Accept()
if err != nil {
fmt.Printf("Fail to connect, %s\n", err)
break
}
go connHandler(conn)
}
}
客戶端的實(shí)現(xiàn)
package main
import (
"bufio"
"fmt"
"net"
"os"
"strings"
)
func connHandler(c net.Conn) {
defer c.Close()
reader := bufio.NewReader(os.Stdin)
buf := make([]byte, 1024)
for {
input, _ := reader.ReadString('\n')
input = strings.TrimSpace(input)
if input == "quit" {
return
}
c.Write([]byte(input))
cnt, err := c.Read(buf)
if err != nil {
fmt.Printf("Fail to read data, %s\n", err)
continue
}
fmt.Print(string(buf[0:cnt]))
}
}
func main() {
conn, err := net.Dial("tcp", "localhost:1208")
if err != nil {
fmt.Printf("Fail to connect, %s\n", err)
return
}
connHandler(conn)
}
三、解決golang開發(fā)socket服務(wù)時(shí)粘包半包bug
基礎(chǔ)知識(shí)可以參考tcp是流的一些思考--拆包和粘包
tcp中有一個(gè)negal算法,用途是這樣的:通信兩端有很多小的數(shù)據(jù)包要發(fā)送,雖然傳送的數(shù)據(jù)很少,但是流程一點(diǎn)沒少,也需要tcp的各種確認(rèn),校驗(yàn)。這樣小的數(shù)據(jù)包如果很多,會(huì)造成網(wǎng)絡(luò)資源很大的浪費(fèi),negal算法做了這樣一件事,當(dāng)來了一個(gè)很小的數(shù)據(jù)包,我不急于發(fā)送這個(gè)包,而是等來了更多的包,將這些小包組合成大包之后一并發(fā)送,不就提高了網(wǎng)絡(luò)傳輸?shù)男实穆?。這個(gè)想法收到了很好的效果,但是我們想一下,如果是分屬于兩個(gè)不同頁面的包,被合并在了一起,那客戶那邊如何區(qū)分它們呢?
這就是粘包問題。從粘包問題我們更可以看出為什么tcp被稱為流協(xié)議,因?yàn)樗透饕粯?,是沒有邊界的,沒有消息的邊界保護(hù)機(jī)制,所以tcp只有流的概念,沒有包的概念。
解決tcp粘包的方法:
客戶端會(huì)定義一個(gè)標(biāo)示,比如數(shù)據(jù)的前4位是數(shù)據(jù)的長(zhǎng)度,后面才是數(shù)據(jù)。那么客戶端只需發(fā)送 ( 數(shù)據(jù)長(zhǎng)度+數(shù)據(jù) ) 的格式數(shù)據(jù)就可以了,接收方根據(jù)包頭信息里的數(shù)據(jù)長(zhǎng)度讀取buffer.
客戶端:
//客戶端發(fā)送封包
package main
import (
"fmt"
"math/rand"
"net"
"os"
"strconv"
"strings"
"time"
)
func main() {
server := "127.0.0.1:5000"
tcpAddr, err := net.ResolveTCPAddr("tcp4", server)
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
defer conn.Close()
for i := 0; i < 50; i++ {
//msg := strconv.Itoa(i)
msg := RandString(i)
msgLen := fmt.Sprintf("%03s", strconv.Itoa(len(msg)))
//fmt.Println(msg, msgLen)
words := "aaaa" + msgLen + msg
//words := append([]byte("aaaa"), []byte(msgLen), []byte(msg))
fmt.Println(len(words), words)
conn.Write([]byte(words))
}
}
/**
*生成隨機(jī)字符
**/
func RandString(length int) string {
rand.Seed(time.Now().UnixNano())
rs := make([]string, length)
for start := 0; start < length; start++ {
t := rand.Intn(3)
if t == 0 {
rs = append(rs, strconv.Itoa(rand.Intn(10)))
} else if t == 1 {
rs = append(rs, string(rand.Intn(26)+65))
} else {
rs = append(rs, string(rand.Intn(26)+97))
}
}
return strings.Join(rs, "")
}
服務(wù)端實(shí)例代碼:
package main
import (
"fmt"
"io"
"net"
"os"
"strconv"
)
func main() {
netListen, err := net.Listen("tcp", ":5000")
CheckError(err)
defer netListen.Close()
for {
conn, err := netListen.Accept()
if err != nil {
continue
}
go handleConnection(conn)
}
}
func handleConnection(conn net.Conn) {
allbuf := make([]byte, 0)
buffer := make([]byte, 1024)
for {
readLen, err := conn.Read(buffer)
//fmt.Println("readLen: ", readLen, len(allbuf))
if err == io.EOF {
break
}
if err != nil {
fmt.Println("read error")
return
}
if len(allbuf) != 0 {
allbuf = append(allbuf, buffer...)
} else {
allbuf = buffer[:]
}
var readP int = 0
for {
//fmt.Println("allbuf content:", string(allbuf))
//buffer長(zhǎng)度小于7
if readLen-readP < 7 {
allbuf = buffer[readP:]
break
}
msgLen, _ := strconv.Atoi(string(allbuf[readP+4 : readP+7]))
logLen := 7 + msgLen
//fmt.Println(readP, readP+logLen)
//buffer剩余長(zhǎng)度>將處理的數(shù)據(jù)長(zhǎng)度
if len(allbuf[readP:]) >= logLen {
//fmt.Println(string(allbuf[4:7]))
fmt.Println(string(allbuf[readP : readP+logLen]))
readP += logLen
//fmt.Println(readP, readLen)
if readP == readLen {
allbuf = nil
break
}
} else {
allbuf = buffer[readP:]
break
}
}
}
}
func CheckError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}
四、io包的ReadFull
對(duì)于第三部分的解決golang開發(fā)socket服務(wù)時(shí)粘包半包bug,有作者認(rèn)為太復(fù)雜了,參見golang tcp拆包的正確姿勢(shì),他提出可以用ReadFull來簡(jiǎn)化。
關(guān)于io包基礎(chǔ)知識(shí),參考Golang io reader writer
關(guān)于ReadFull,可以參考達(dá)達(dá)的博客系列:
Go語言小貼士1 - io包
Go語言小貼士2 - 協(xié)議解析
Go語言小貼士3 - bufio包
原文不再轉(zhuǎn)述,現(xiàn)在引用一下重點(diǎn):
io.Reader的定義如下:
type Reader interface {
Read(p []byte) (n int, err error)
}
其中文檔的說明非常重要,文檔中詳細(xì)描述了Read方法的各種返回可能性。
文檔描述中有一個(gè)要點(diǎn),就是n可能小于等于len(p),也就是說Go在讀IO的時(shí)候,是不會(huì)保證一次讀取預(yù)期的所有數(shù)據(jù)的。如果我們要確保一次讀取我們所需的所有數(shù)據(jù),就需要在一個(gè)循環(huán)里調(diào)用Read,累加每次返回的n并小心設(shè)置下次Read時(shí)p的偏移量,直到n的累加值達(dá)到我們的預(yù)期。
因?yàn)樯鲜鲂枨髮?shí)在太常見了,所以Go在io包中提供了一個(gè)ReadFull函數(shù)來做到一次讀取要求的所有數(shù)據(jù),通過閱讀ReadFull函數(shù)的代碼,也可以反過來幫助大家理解io.Reader是怎么運(yùn)作的。
//io.go源碼
func ReadFull(r Reader, buf []byte) (n int, err error) {
return ReadAtLeast(r, buf, len(buf))
}
func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, ErrShortBuffer
}
for n < min && err == nil {
var nn int
nn, err = r.Read(buf[n:])
n += nn
}
if n >= min {
err = nil
} else if n > 0 && err == EOF {
err = ErrUnexpectedEOF
}
return
}
在很多應(yīng)用場(chǎng)景中,消息包的長(zhǎng)度是不固定的,就像上面的字符串字段一樣。我們一樣可以用開頭固定的幾個(gè)字節(jié)來存放消息長(zhǎng)度,在解析通訊協(xié)議的時(shí)候就可以從字節(jié)流中截出一個(gè)個(gè)的消息包了,這樣的操作通常叫做協(xié)議分包或者粘包處理。
貼個(gè)從Socket讀取消息包的偽代碼(沒編譯):
func ReadPacket(conn net.Conn) ([]byte, error) {
var head [2]byte
if _, err := io.ReadFull(conn, head[:]); err != nil {
return err
}
size := binary.BigEndian.Uint16(head)
packet := make([]byte, size)
if _, err := io.ReadFull(conn, packet); err != nil {
return err
}
return packet
}
上面的代碼就用到了前一個(gè)小貼士中說到的io.ReadFull來確保一次讀取完整數(shù)據(jù)。
要注意,這段代碼不是線程安全的,如果有兩個(gè)線程同時(shí)對(duì)一個(gè)net.Conn進(jìn)行ReadPacket操作,很可能會(huì)發(fā)生嚴(yán)重錯(cuò)誤,具體邏輯請(qǐng)自行分析。
從上面結(jié)構(gòu)體序列化和反序列化的代碼中,大家不難看出,實(shí)現(xiàn)一個(gè)二進(jìn)制協(xié)議是挺繁瑣和容易出BUG的,只要稍微有一個(gè)數(shù)值計(jì)算錯(cuò)就解析出錯(cuò)了。所以在工程實(shí)踐中,不推薦大家手寫二進(jìn)制協(xié)議的解析代碼,項(xiàng)目中通常會(huì)用自動(dòng)化的工具來輔助生成代碼。
在Leaf 游戲服務(wù)器框架簡(jiǎn)介的tcp_msg.go中,Read方法也使用了ReadFull這種方式來處理。
五、WebSocket
參考封裝golang websocket
websocket是個(gè)二進(jìn)制協(xié)議,需要先通過Http協(xié)議進(jìn)行握手,從而協(xié)商完成從Http協(xié)議向websocket協(xié)議的轉(zhuǎn)換。一旦握手結(jié)束,當(dāng)前的TCP連接后續(xù)將采用二進(jìn)制websocket協(xié)議進(jìn)行雙向雙工交互,自此與Http協(xié)議無關(guān)。
可以通過這篇知乎了解一下websocket協(xié)議的基本原理:《WebSocket 是什么原理?為什么可以實(shí)現(xiàn)持久連接?》。
1.粘包
我們開發(fā)過TCP服務(wù)的都知道,需要通過協(xié)議decode從TCP字節(jié)流中解析出一個(gè)一個(gè)請(qǐng)求,那么websocket又怎么樣呢?
websocket以message為單位進(jìn)行通訊,本身就是一個(gè)在TCP層上的一個(gè)分包協(xié)議,其實(shí)并不需要我們?cè)龠M(jìn)行粘包處理。但是因?yàn)閱蝹€(gè)message可能很大很大(比如一個(gè)視頻文件),那么websocket顯然不適合把一個(gè)視頻作為一個(gè)message傳輸(中途斷了前功盡棄),所以websocket協(xié)議其實(shí)是支持1個(gè)message分多個(gè)frame幀傳輸?shù)摹?/p>
我們的瀏覽器提供的編程API都是message粒度的,把frame拆幀的細(xì)節(jié)對(duì)開發(fā)者隱蔽了,而服務(wù)端websocket框架一般也做了同樣的隱藏,會(huì)自動(dòng)幫我們收集所有的frame后拼成messasge再回調(diào),所以結(jié)論就是:
websocket以message為單位通訊,不需要開發(fā)者自己處理粘包問題。
更多參考Websocket需要像TCP Socket那樣進(jìn)行邏輯數(shù)據(jù)包的分包與合包嗎?
2.golang實(shí)現(xiàn)
golang官方標(biāo)準(zhǔn)庫里有一個(gè)websocket的包,但是它提供的就是frame粒度的API,壓根不能用。
不過官方其實(shí)已經(jīng)認(rèn)可了一個(gè)準(zhǔn)標(biāo)準(zhǔn)庫實(shí)現(xiàn),它實(shí)現(xiàn)了message粒度的API,讓開發(fā)者不需要關(guān)心websocket協(xié)議細(xì)節(jié),開發(fā)起來非常方便,其文檔地址:https://godoc.org/github.com/gorilla/websocket。
開發(fā)websocket服務(wù)時(shí),首先要基于http庫對(duì)外暴露接口,然后由websocket庫接管TCP連接進(jìn)行協(xié)議升級(jí),然后進(jìn)行websocket協(xié)議的數(shù)據(jù)交換,所以開發(fā)時(shí)總是要用到http庫和websocket庫。
上述websocket文檔中對(duì)開發(fā)websocket服務(wù)有明確的注意事項(xiàng)要求,主要是指:
- 讀和寫API不是并發(fā)安全的,需要啟動(dòng)單個(gè)goroutine串行處理。
- 關(guān)閉API是線程安全的,一旦調(diào)用則阻塞的讀和寫API會(huì)出錯(cuò)返回,從而終止處理。
六、心跳實(shí)現(xiàn)
Golang 心跳的實(shí)現(xiàn)
在多客戶端同時(shí)訪問服務(wù)器的工作模式下,首先要保證服務(wù)器的運(yùn)行正常。因此,Server和Client建立通訊后,確保連接的及時(shí)斷開就非常重要。否則,多個(gè)客戶端長(zhǎng)時(shí)間占用著連接不關(guān)閉,是非??膳碌姆?wù)器資源浪費(fèi)。會(huì)使得服務(wù)器可服務(wù)的客戶端數(shù)量大幅度減少。因此,針對(duì)短鏈接和長(zhǎng)連接,根據(jù)業(yè)務(wù)的需求,配套不同的處理機(jī)制。
- 短連接:一般建立完連接,就立刻傳輸數(shù)據(jù)。傳輸完數(shù)據(jù),連接就關(guān)閉。服務(wù)端根據(jù)需要,設(shè)定連接的時(shí)長(zhǎng)。超過時(shí)間長(zhǎng)度,就算客戶端超時(shí)。立刻關(guān)閉連接。
- 長(zhǎng)連接:建立連接后,傳輸數(shù)據(jù),然后要保持連接,然后再次傳輸數(shù)據(jù)。直到連接關(guān)閉。
socket讀寫可以通過 SetDeadline、SetReadDeadline、SetWriteDeadline設(shè)置阻塞的時(shí)間。
func (*IPConn) SetDeadline
func (c *IPConn) SetDeadline(t time.Time) error
func (*IPConn) SetReadDeadline
func (c *IPConn) SetReadDeadline(t time.Time) error
func (*IPConn) SetWriteDeadline
func (c *IPConn) SetWriteDeadline(t time.Time) error
如果做短連接,直接在Server端的連接上設(shè)置SetReadDeadline。當(dāng)你設(shè)置的時(shí)限到達(dá),無論客戶端是否還在繼續(xù)傳遞消息,服務(wù)端都不會(huì)再接收。并且已經(jīng)關(guān)閉連接。
func main() {
server := ":7373"
netListen, err := net.Listen("tcp", server)
if err != nil{
Log("connect error: ", err)
os.Exit(1)
}
Log("Waiting for Client ...")
for{
conn, err := netListen.Accept()
if err != nil{
Log(conn.RemoteAddr().String(), "Fatal error: ", err)
continue
}
//設(shè)置短連接(10秒)
conn.SetReadDeadline(time.Now().Add(time.Duration(10)*time.Second))
Log(conn.RemoteAddr().String(), "connect success!")
...
}
}
這就可以了。在這段代碼中,每當(dāng)10秒中的時(shí)限一道,連接就終止了。
根據(jù)業(yè)務(wù)需要,客戶端可能需要長(zhǎng)時(shí)間保持連接。但是服務(wù)端不能無限制的保持。這就需要一個(gè)機(jī)制,如果超過某個(gè)時(shí)間長(zhǎng)度,服務(wù)端沒有獲得客戶端的數(shù)據(jù),就判定客戶端已經(jīng)不需要連接了(比如客戶端掛掉了)。做到這個(gè),需要一個(gè)心跳機(jī)制。在限定的時(shí)間內(nèi),客戶端給服務(wù)端發(fā)送一個(gè)指定的消息,以便服務(wù)端知道客戶端還活著。
func sender(conn *net.TCPConn) {
for i := 0; i < 10; i++{
words := strconv.Itoa(i)+" Hello I'm MyHeartbeat Client."
msg, err := conn.Write([]byte(words))
if err != nil {
Log(conn.RemoteAddr().String(), "Fatal error: ", err)
os.Exit(1)
}
Log("服務(wù)端接收了", msg)
time.Sleep(2 * time.Second)
}
for i := 0; i < 2 ; i++ {
time.Sleep(12 * time.Second)
}
for i := 0; i < 10; i++{
words := strconv.Itoa(i)+" Hi I'm MyHeartbeat Client."
msg, err := conn.Write([]byte(words))
if err != nil {
Log(conn.RemoteAddr().String(), "Fatal error: ", err)
os.Exit(1)
}
Log("服務(wù)端接收了", msg)
time.Sleep(2 * time.Second)
}
}
這段客戶端代碼,實(shí)現(xiàn)了兩個(gè)相同的信息發(fā)送頻率給服務(wù)端。兩個(gè)頻率中間,我們讓運(yùn)行休息了12秒。然后,我們?cè)诜?wù)端的對(duì)應(yīng)機(jī)制是這樣的。
func HeartBeating(conn net.Conn, bytes chan byte, timeout int) {
select {
case fk := <- bytes:
Log(conn.RemoteAddr().String(), "心跳:第", string(fk), "times")
conn.SetDeadline(time.Now().Add(time.Duration(timeout) * time.Second))
break
case <- time.After(5 * time.Second):
Log("conn dead now")
conn.Close()
}
}
每次接收到心跳數(shù)據(jù)就 SetDeadline 延長(zhǎng)一個(gè)時(shí)間段 timeout。如果沒有接到心跳數(shù)據(jù),5秒后連接關(guān)閉。