一個(gè)簡單的golang TCP通信例子

前言

新工作接手了公司的一個(gè)使用golang編寫的agent程序,用于采集各個(gè)機(jī)器的性能指標(biāo)和監(jiān)控?cái)?shù)據(jù),之前使用http實(shí)現(xiàn)數(shù)據(jù)的上傳,最近想把它改成tcp上傳的方式,由于是新手上路,順手寫了一個(gè)小demo程序。

這個(gè)程序中包含:

  • 簡單的TcpServer服務(wù)程序:偵聽,數(shù)據(jù)收發(fā)與解析

  • 簡單的客戶端程序:數(shù)據(jù)收發(fā)與解析

服務(wù)器

與正常的其他語言一樣,go中也提供了豐富的網(wǎng)絡(luò)相關(guān)的包,按照正常的套路,它是這樣的:

  1. 綁定端口,初始化套接字

  2. 啟動偵聽,開啟后臺線程接收客戶端請求

  3. 接收請求,針對每個(gè)請求開啟一個(gè)線程來處理通信

  4. 資源回收

golang的套路也是如此,不同的地方在于它可以使用goroutine來替換上面的線程;

整體的代碼很簡單,可以參考文檔和api手冊,示例代碼如下:

package main

import (
    "fmt"
    "net"
    "os"
    "encoding/json"
    "bufio"
    "hash/crc32"
    "io"
)
//數(shù)據(jù)包的類型
const (
    HEART_BEAT_PACKET = 0x00
    REPORT_PACKET = 0x01
)

var (
    server = "127.0.0.1:8080"
)
//這里是包的結(jié)構(gòu)體,其實(shí)是可以不需要的
type Packet struct {
    PacketType      byte
    PacketContent     []byte
}
//心跳包,這里用了json來序列化,也可以用github上的gogo/protobuf包
//具體見(https://github.com/gogo/protobuf)
type HeartPacket struct {
    Version     string`json:"version"`
    Timestamp   int64`json:"timestamp"`
}
//正式上傳的數(shù)據(jù)包
type ReportPacket struct {
    Content   string`json:"content"`
    Rand         int`json:"rand"`
    Timestamp   int64`json:"timestamp"`
}
//與服務(wù)器相關(guān)的資源都放在這里面
type TcpServer struct {
    listener       *net.TCPListener
    hawkServer  *net.TCPAddr
}

func main() {
    //類似于初始化套接字,綁定端口
    hawkServer, err := net.ResolveTCPAddr("tcp", server)
    checkErr(err)
    //偵聽
    listen, err := net.ListenTCP("tcp", hawkServer)
    checkErr(err)
    //記得關(guān)閉
    defer listen.Close()
    tcpServer := &TcpServer{
        listener:listen,
        hawkServer:hawkServer,
    }
    fmt.Println("start server successful......")
    //開始接收請求
    for {
        conn, err := tcpServer.listener.Accept()
        fmt.Println("accept tcp client %s",conn.RemoteAddr().String())
        checkErr(err)
        // 每次建立一個(gè)連接就放到單獨(dú)的協(xié)程內(nèi)做處理
        go Handle(conn)
    }
}
//處理函數(shù),這是一個(gè)狀態(tài)機(jī)
//根據(jù)數(shù)據(jù)包來做解析
//數(shù)據(jù)包的格式為|0xFF|0xFF|len(高)|len(低)|Data|CRC高16位|0xFF|0xFE
//其中l(wèi)en為data的長度,實(shí)際長度為len(高)*256+len(低)
//CRC為32位CRC,取了最高16位共2Bytes
//0xFF|0xFF和0xFF|0xFE類似于前導(dǎo)碼
func Handle(conn net.Conn) {
    // close connection before exit
    defer conn.Close()
    // 狀態(tài)機(jī)狀態(tài)
    state := 0x00
    // 數(shù)據(jù)包長度
    length := uint16(0)
    // crc校驗(yàn)和
    crc16 := uint16(0)
    var recvBuffer []byte
    // 游標(biāo)
    cursor := uint16(0)
    bufferReader := bufio.NewReader(conn)
    //狀態(tài)機(jī)處理數(shù)據(jù)
    for {
        recvByte,err := bufferReader.ReadByte()
        if err != nil {
            //這里因?yàn)樽隽诵奶?,所以就沒有加deadline時(shí)間,如果客戶端斷開連接
            //這里ReadByte方法返回一個(gè)io.EOF的錯(cuò)誤,具體可考慮文檔
            if err == io.EOF {
                fmt.Printf("client %s is close!\n",conn.RemoteAddr().String())
            }
            //在這里直接退出goroutine,關(guān)閉由defer操作完成
            return
        }
        //進(jìn)入狀態(tài)機(jī),根據(jù)不同的狀態(tài)來處理
        switch state {
        case 0x00:
            if recvByte == 0xFF {
                state = 0x01
                //初始化狀態(tài)機(jī)
                recvBuffer = nil
                length = 0
                crc16 = 0
            }else{
                state = 0x00
            }
            break
        case 0x01:
            if recvByte == 0xFF {
                state = 0x02
            }else{
                state = 0x00
            }
            break
        case 0x02:
            length += uint16(recvByte) * 256
            state = 0x03
            break
        case 0x03:
            length += uint16(recvByte)
            // 一次申請緩存,初始化游標(biāo),準(zhǔn)備讀數(shù)據(jù)
            recvBuffer = make([]byte,length)
            cursor = 0
            state = 0x04
            break
        case 0x04:
            //不斷地在這個(gè)狀態(tài)下讀數(shù)據(jù),直到滿足長度為止
            recvBuffer[cursor] = recvByte
            cursor++
            if(cursor == length){
                state = 0x05
            }
            break
        case 0x05:
            crc16 += uint16(recvByte) * 256
            state = 0x06
            break
        case 0x06:
            crc16 += uint16(recvByte)
            state = 0x07
            break
        case 0x07:
            if recvByte == 0xFF {
                state = 0x08
            }else{
                state = 0x00
            }
        case 0x08:
            if recvByte == 0xFE {
                //執(zhí)行數(shù)據(jù)包校驗(yàn)
                if (crc32.ChecksumIEEE(recvBuffer) >> 16) & 0xFFFF == uint32(crc16) {
                    var packet Packet
                    //把拿到的數(shù)據(jù)反序列化出來
                    json.Unmarshal(recvBuffer,&packet)
                    //新開協(xié)程處理數(shù)據(jù)
                    go processRecvData(&packet,conn)
                }else{
                    fmt.Println("丟棄數(shù)據(jù)!")
                }
            }
            //狀態(tài)機(jī)歸位,接收下一個(gè)包
            state = 0x00
        }
    }
}

//在這里處理收到的包,就和一般的邏輯一樣了,根據(jù)類型進(jìn)行不同的處理,因人而異
//我這里處理了心跳和一個(gè)上報(bào)數(shù)據(jù)包
//服務(wù)器往客戶端的數(shù)據(jù)包很簡單地以\n換行結(jié)束了,偷了一個(gè)懶:),正常情況下也可根據(jù)自己的協(xié)議來封裝好
//然后在客戶端寫一個(gè)狀態(tài)來處理
func processRecvData(packet *Packet,conn net.Conn)  {
    switch packet.PacketType {
    case HEART_BEAT_PACKET:
        var beatPacket HeartPacket
        json.Unmarshal(packet.PacketContent,&beatPacket)
        fmt.Printf("recieve heat beat from [%s] ,data is [%v]\n",conn.RemoteAddr().String(),beatPacket)
        conn.Write([]byte("heartBeat\n"))
        return
    case REPORT_PACKET:
        var reportPacket ReportPacket
        json.Unmarshal(packet.PacketContent,&reportPacket)
        fmt.Printf("recieve report data from [%s] ,data is [%v]\n",conn.RemoteAddr().String(),reportPacket)
        conn.Write([]byte("Report data has recive\n"))
        return
    }
}
//處理錯(cuò)誤,根據(jù)實(shí)際情況選擇這樣處理,還是在函數(shù)調(diào)之后不同的地方不同處理
func checkErr(err error) {
    if err != nil {
        fmt.Println(err)
        os.Exit(-1)
    }
}

<strong>特別需要注意:</strong>

Handle方法在一個(gè)死循環(huán)中使用了一個(gè)無阻塞的buff來讀取套接字中的數(shù)據(jù),因此當(dāng)客戶端主動關(guān)閉連接時(shí),如果不對這個(gè)io.EOF進(jìn)行處理,會導(dǎo)致這個(gè)goroutine空轉(zhuǎn),瘋狂吃cpu,在這里io.EOF的處理非常重要:)

客戶端

客戶端與一般的TCP通信程序一樣,它需要完成的工作有:

  1. 向服務(wù)器發(fā)送心跳包

  2. 向服務(wù)器發(fā)送數(shù)據(jù)包

  3. 接收服務(wù)器的數(shù)據(jù)包

需要注意的就是客戶端與服務(wù)端的數(shù)據(jù)協(xié)議保持一致,請?jiān)陂_始發(fā)送數(shù)據(jù)之前啟動數(shù)據(jù)接收

上面的3個(gè)工作我分別用了goroutine來做,整體的代碼如下:

package main

import (
    "os"
    "fmt"
    "net"
    "time"
    "math/rand"
    "encoding/json"
    "bufio"
    "hash/crc32"
    "sync"
)
//數(shù)據(jù)包類型
const (
    HEART_BEAT_PACKET = 0x00
    REPORT_PACKET = 0x01
)
//默認(rèn)的服務(wù)器地址
var (
    server = "127.0.0.1:9876"
)
//數(shù)據(jù)包
type Packet struct {
    PacketType      byte
    PacketContent   []byte
}
//心跳包
type HeartPacket struct {
    Version     string`json:"version"`
    Timestamp   int64`json:"timestamp"`
}
//數(shù)據(jù)包
type ReportPacket struct {
    Content   string`json:"content"`
    Rand         int`json:"rand"`
    Timestamp   int64`json:"timestamp"`
}

//客戶端對象
type TcpClient struct {
    connection     *net.TCPConn
    hawkServer  *net.TCPAddr
    stopChan       chan struct{}
}

func main()  {
    //拿到服務(wù)器地址信息
    hawkServer,err := net.ResolveTCPAddr("tcp", server)
    if err != nil {
        fmt.Printf("hawk server [%s] resolve error: [%s]",server,err.Error())
        os.Exit(1)
    }
    //連接服務(wù)器
    connection,err := net.DialTCP("tcp",nil,hawkServer)
    if err != nil {
        fmt.Printf("connect to hawk server error: [%s]",err.Error())
        os.Exit(1)
    }
    client := &TcpClient{
        connection:connection,
        hawkServer:hawkServer,
        stopChan:make(chan struct{}),
    }
    //啟動接收
    go client.receivePackets()
    
    //發(fā)送心跳的goroutine
    go func() {
        heartBeatTick := time.Tick(2 * time.Second)
        for{
            select {
            case <-heartBeatTick:
                client.sendHeartPacket()
            case <-client.stopChan:
                return
            }
        }
    }()
    
    //測試用的,開300個(gè)goroutine每秒發(fā)送一個(gè)包
    for i:=0;i<300;i++ {
        go func() {
            sendTimer := time.After(1 * time.Second)
            for{
                select {
                case <-sendTimer:
                    client.sendReportPacket()
                    sendTimer = time.After(1 * time.Second)
                case <-client.stopChan:
                    return
                }
            }
        }()
    }
    //等待退出
    <-client.stopChan
}

// 接收數(shù)據(jù)包
func (client *TcpClient)receivePackets()  {
    reader := bufio.NewReader(client.connection)
    for {
    //承接上面說的服務(wù)器端的偷懶,我這里讀也只是以\n為界限來讀區(qū)分包
        msg, err := reader.ReadString('\n')
        if err != nil {
            //在這里也請?zhí)幚砣绻?wù)器關(guān)閉時(shí)的異常
            close(client.stopChan)
            break
        }
        fmt.Print(msg)
    }
}
//發(fā)送數(shù)據(jù)包
//仔細(xì)看代碼其實(shí)這里做了兩次json的序列化,有一次其實(shí)是不需要的
func (client *TcpClient)sendReportPacket()  {
    reportPacket := ReportPacket{
        Content:getRandString(),
        Timestamp:time.Now().Unix(),
        Rand:rand.Int(),
    }
    packetBytes,err := json.Marshal(reportPacket)
    if err!=nil{
        fmt.Println(err.Error())
    }
        //這一次其實(shí)可以不需要,在封包的地方把類型和數(shù)據(jù)傳進(jìn)去即可
    packet := Packet{
        PacketType:REPORT_PACKET,
        PacketContent:packetBytes,
    }
    sendBytes,err := json.Marshal(packet)
    if err!=nil{
        fmt.Println(err.Error())
    }
    //發(fā)送
    client.connection.Write(EnPackSendData(sendBytes))
    fmt.Println("Send metric data success!")
}

//使用的協(xié)議與服務(wù)器端保持一致
func EnPackSendData(sendBytes []byte) []byte {
    packetLength := len(sendBytes) + 8
    result := make([]byte,packetLength)
    result[0] = 0xFF
    result[1] = 0xFF
    result[2] = byte(uint16(len(sendBytes)) >> 8)
    result[3] = byte(uint16(len(sendBytes)) & 0xFF)
    copy(result[4:],sendBytes)
    sendCrc := crc32.ChecksumIEEE(sendBytes)
    result[packetLength-4] = byte(sendCrc >> 24)
    result[packetLength-3] = byte(sendCrc >> 16 & 0xFF)
    result[packetLength-2] = 0xFF
    result[packetLength-1] = 0xFE
    fmt.Println(result)
    return result
}
//發(fā)送心跳包,與發(fā)送數(shù)據(jù)包一樣
func (client *TcpClient)sendHeartPacket() {
    heartPacket := HeartPacket{
        Version:"1.0",
        Timestamp:time.Now().Unix(),
    }
    packetBytes,err := json.Marshal(heartPacket)
    if err!=nil{
        fmt.Println(err.Error())
    }
    packet := Packet{
        PacketType:HEART_BEAT_PACKET,
        PacketContent:packetBytes,
    }
    sendBytes,err := json.Marshal(packet)
    if err!=nil{
        fmt.Println(err.Error())
    }
    client.connection.Write(EnPackSendData(sendBytes))
    fmt.Println("Send heartbeat data success!")
}
//拿一串隨機(jī)字符
func getRandString()string  {
    length := rand.Intn(50)
    strBytes := make([]byte,length)
    for i:=0;i<length;i++ {
        strBytes[i] = byte(rand.Intn(26) + 97)
    }
    return string(strBytes)
}

后記

測試過程中,一共開了7個(gè)client,共計(jì)2100個(gè)goroutine,本機(jī)啟動服務(wù)器端,機(jī)器配置為i-5/8G的情況下,整體的資源使用情況如下:

測試結(jié)果.png

需要改進(jìn)的地方,也是后兩篇的主題:

  • 引入內(nèi)存池
  • 服務(wù)無縫重啟
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容