前言
新工作接手了公司的一個(gè)使用golang編寫的agent程序,用于采集各個(gè)機(jī)器的性能指標(biāo)和監(jiān)控?cái)?shù)據(jù),之前使用http實(shí)現(xiàn)數(shù)據(jù)的上傳,最近想把它改成tcp上傳的方式,由于是新手上路,順手寫了一個(gè)小demo程序。
這個(gè)程序中包含:
簡單的TcpServer服務(wù)程序:偵聽,數(shù)據(jù)收發(fā)與解析
簡單的客戶端程序:數(shù)據(jù)收發(fā)與解析
服務(wù)器
與正常的其他語言一樣,go中也提供了豐富的網(wǎng)絡(luò)相關(guān)的包,按照正常的套路,它是這樣的:
綁定端口,初始化套接字
啟動偵聽,開啟后臺線程接收客戶端請求
接收請求,針對每個(gè)請求開啟一個(gè)線程來處理通信
資源回收
golang的套路也是如此,不同的地方在于它可以使用goroutine來替換上面的線程;
整體的代碼很簡單,可以參考文檔和api手冊,示例代碼如下:
package main
import (
"fmt"
"net"
"os"
"encoding/json"
"bufio"
"hash/crc32"
"io"
)
//數(shù)據(jù)包的類型
const (
HEART_BEAT_PACKET = 0x00
REPORT_PACKET = 0x01
)
var (
server = "127.0.0.1:8080"
)
//這里是包的結(jié)構(gòu)體,其實(shí)是可以不需要的
type Packet struct {
PacketType byte
PacketContent []byte
}
//心跳包,這里用了json來序列化,也可以用github上的gogo/protobuf包
//具體見(https://github.com/gogo/protobuf)
type HeartPacket struct {
Version string`json:"version"`
Timestamp int64`json:"timestamp"`
}
//正式上傳的數(shù)據(jù)包
type ReportPacket struct {
Content string`json:"content"`
Rand int`json:"rand"`
Timestamp int64`json:"timestamp"`
}
//與服務(wù)器相關(guān)的資源都放在這里面
type TcpServer struct {
listener *net.TCPListener
hawkServer *net.TCPAddr
}
func main() {
//類似于初始化套接字,綁定端口
hawkServer, err := net.ResolveTCPAddr("tcp", server)
checkErr(err)
//偵聽
listen, err := net.ListenTCP("tcp", hawkServer)
checkErr(err)
//記得關(guān)閉
defer listen.Close()
tcpServer := &TcpServer{
listener:listen,
hawkServer:hawkServer,
}
fmt.Println("start server successful......")
//開始接收請求
for {
conn, err := tcpServer.listener.Accept()
fmt.Println("accept tcp client %s",conn.RemoteAddr().String())
checkErr(err)
// 每次建立一個(gè)連接就放到單獨(dú)的協(xié)程內(nèi)做處理
go Handle(conn)
}
}
//處理函數(shù),這是一個(gè)狀態(tài)機(jī)
//根據(jù)數(shù)據(jù)包來做解析
//數(shù)據(jù)包的格式為|0xFF|0xFF|len(高)|len(低)|Data|CRC高16位|0xFF|0xFE
//其中l(wèi)en為data的長度,實(shí)際長度為len(高)*256+len(低)
//CRC為32位CRC,取了最高16位共2Bytes
//0xFF|0xFF和0xFF|0xFE類似于前導(dǎo)碼
func Handle(conn net.Conn) {
// close connection before exit
defer conn.Close()
// 狀態(tài)機(jī)狀態(tài)
state := 0x00
// 數(shù)據(jù)包長度
length := uint16(0)
// crc校驗(yàn)和
crc16 := uint16(0)
var recvBuffer []byte
// 游標(biāo)
cursor := uint16(0)
bufferReader := bufio.NewReader(conn)
//狀態(tài)機(jī)處理數(shù)據(jù)
for {
recvByte,err := bufferReader.ReadByte()
if err != nil {
//這里因?yàn)樽隽诵奶?,所以就沒有加deadline時(shí)間,如果客戶端斷開連接
//這里ReadByte方法返回一個(gè)io.EOF的錯(cuò)誤,具體可考慮文檔
if err == io.EOF {
fmt.Printf("client %s is close!\n",conn.RemoteAddr().String())
}
//在這里直接退出goroutine,關(guān)閉由defer操作完成
return
}
//進(jìn)入狀態(tài)機(jī),根據(jù)不同的狀態(tài)來處理
switch state {
case 0x00:
if recvByte == 0xFF {
state = 0x01
//初始化狀態(tài)機(jī)
recvBuffer = nil
length = 0
crc16 = 0
}else{
state = 0x00
}
break
case 0x01:
if recvByte == 0xFF {
state = 0x02
}else{
state = 0x00
}
break
case 0x02:
length += uint16(recvByte) * 256
state = 0x03
break
case 0x03:
length += uint16(recvByte)
// 一次申請緩存,初始化游標(biāo),準(zhǔn)備讀數(shù)據(jù)
recvBuffer = make([]byte,length)
cursor = 0
state = 0x04
break
case 0x04:
//不斷地在這個(gè)狀態(tài)下讀數(shù)據(jù),直到滿足長度為止
recvBuffer[cursor] = recvByte
cursor++
if(cursor == length){
state = 0x05
}
break
case 0x05:
crc16 += uint16(recvByte) * 256
state = 0x06
break
case 0x06:
crc16 += uint16(recvByte)
state = 0x07
break
case 0x07:
if recvByte == 0xFF {
state = 0x08
}else{
state = 0x00
}
case 0x08:
if recvByte == 0xFE {
//執(zhí)行數(shù)據(jù)包校驗(yàn)
if (crc32.ChecksumIEEE(recvBuffer) >> 16) & 0xFFFF == uint32(crc16) {
var packet Packet
//把拿到的數(shù)據(jù)反序列化出來
json.Unmarshal(recvBuffer,&packet)
//新開協(xié)程處理數(shù)據(jù)
go processRecvData(&packet,conn)
}else{
fmt.Println("丟棄數(shù)據(jù)!")
}
}
//狀態(tài)機(jī)歸位,接收下一個(gè)包
state = 0x00
}
}
}
//在這里處理收到的包,就和一般的邏輯一樣了,根據(jù)類型進(jìn)行不同的處理,因人而異
//我這里處理了心跳和一個(gè)上報(bào)數(shù)據(jù)包
//服務(wù)器往客戶端的數(shù)據(jù)包很簡單地以\n換行結(jié)束了,偷了一個(gè)懶:),正常情況下也可根據(jù)自己的協(xié)議來封裝好
//然后在客戶端寫一個(gè)狀態(tài)來處理
func processRecvData(packet *Packet,conn net.Conn) {
switch packet.PacketType {
case HEART_BEAT_PACKET:
var beatPacket HeartPacket
json.Unmarshal(packet.PacketContent,&beatPacket)
fmt.Printf("recieve heat beat from [%s] ,data is [%v]\n",conn.RemoteAddr().String(),beatPacket)
conn.Write([]byte("heartBeat\n"))
return
case REPORT_PACKET:
var reportPacket ReportPacket
json.Unmarshal(packet.PacketContent,&reportPacket)
fmt.Printf("recieve report data from [%s] ,data is [%v]\n",conn.RemoteAddr().String(),reportPacket)
conn.Write([]byte("Report data has recive\n"))
return
}
}
//處理錯(cuò)誤,根據(jù)實(shí)際情況選擇這樣處理,還是在函數(shù)調(diào)之后不同的地方不同處理
func checkErr(err error) {
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
}
<strong>特別需要注意:</strong>
Handle方法在一個(gè)死循環(huán)中使用了一個(gè)無阻塞的buff來讀取套接字中的數(shù)據(jù),因此當(dāng)客戶端主動關(guān)閉連接時(shí),如果不對這個(gè)io.EOF進(jìn)行處理,會導(dǎo)致這個(gè)goroutine空轉(zhuǎn),瘋狂吃cpu,在這里io.EOF的處理非常重要:)
客戶端
客戶端與一般的TCP通信程序一樣,它需要完成的工作有:
向服務(wù)器發(fā)送心跳包
向服務(wù)器發(fā)送數(shù)據(jù)包
接收服務(wù)器的數(shù)據(jù)包
需要注意的就是客戶端與服務(wù)端的數(shù)據(jù)協(xié)議保持一致,請?jiān)陂_始發(fā)送數(shù)據(jù)之前啟動數(shù)據(jù)接收
上面的3個(gè)工作我分別用了goroutine來做,整體的代碼如下:
package main
import (
"os"
"fmt"
"net"
"time"
"math/rand"
"encoding/json"
"bufio"
"hash/crc32"
"sync"
)
//數(shù)據(jù)包類型
const (
HEART_BEAT_PACKET = 0x00
REPORT_PACKET = 0x01
)
//默認(rèn)的服務(wù)器地址
var (
server = "127.0.0.1:9876"
)
//數(shù)據(jù)包
type Packet struct {
PacketType byte
PacketContent []byte
}
//心跳包
type HeartPacket struct {
Version string`json:"version"`
Timestamp int64`json:"timestamp"`
}
//數(shù)據(jù)包
type ReportPacket struct {
Content string`json:"content"`
Rand int`json:"rand"`
Timestamp int64`json:"timestamp"`
}
//客戶端對象
type TcpClient struct {
connection *net.TCPConn
hawkServer *net.TCPAddr
stopChan chan struct{}
}
func main() {
//拿到服務(wù)器地址信息
hawkServer,err := net.ResolveTCPAddr("tcp", server)
if err != nil {
fmt.Printf("hawk server [%s] resolve error: [%s]",server,err.Error())
os.Exit(1)
}
//連接服務(wù)器
connection,err := net.DialTCP("tcp",nil,hawkServer)
if err != nil {
fmt.Printf("connect to hawk server error: [%s]",err.Error())
os.Exit(1)
}
client := &TcpClient{
connection:connection,
hawkServer:hawkServer,
stopChan:make(chan struct{}),
}
//啟動接收
go client.receivePackets()
//發(fā)送心跳的goroutine
go func() {
heartBeatTick := time.Tick(2 * time.Second)
for{
select {
case <-heartBeatTick:
client.sendHeartPacket()
case <-client.stopChan:
return
}
}
}()
//測試用的,開300個(gè)goroutine每秒發(fā)送一個(gè)包
for i:=0;i<300;i++ {
go func() {
sendTimer := time.After(1 * time.Second)
for{
select {
case <-sendTimer:
client.sendReportPacket()
sendTimer = time.After(1 * time.Second)
case <-client.stopChan:
return
}
}
}()
}
//等待退出
<-client.stopChan
}
// 接收數(shù)據(jù)包
func (client *TcpClient)receivePackets() {
reader := bufio.NewReader(client.connection)
for {
//承接上面說的服務(wù)器端的偷懶,我這里讀也只是以\n為界限來讀區(qū)分包
msg, err := reader.ReadString('\n')
if err != nil {
//在這里也請?zhí)幚砣绻?wù)器關(guān)閉時(shí)的異常
close(client.stopChan)
break
}
fmt.Print(msg)
}
}
//發(fā)送數(shù)據(jù)包
//仔細(xì)看代碼其實(shí)這里做了兩次json的序列化,有一次其實(shí)是不需要的
func (client *TcpClient)sendReportPacket() {
reportPacket := ReportPacket{
Content:getRandString(),
Timestamp:time.Now().Unix(),
Rand:rand.Int(),
}
packetBytes,err := json.Marshal(reportPacket)
if err!=nil{
fmt.Println(err.Error())
}
//這一次其實(shí)可以不需要,在封包的地方把類型和數(shù)據(jù)傳進(jìn)去即可
packet := Packet{
PacketType:REPORT_PACKET,
PacketContent:packetBytes,
}
sendBytes,err := json.Marshal(packet)
if err!=nil{
fmt.Println(err.Error())
}
//發(fā)送
client.connection.Write(EnPackSendData(sendBytes))
fmt.Println("Send metric data success!")
}
//使用的協(xié)議與服務(wù)器端保持一致
func EnPackSendData(sendBytes []byte) []byte {
packetLength := len(sendBytes) + 8
result := make([]byte,packetLength)
result[0] = 0xFF
result[1] = 0xFF
result[2] = byte(uint16(len(sendBytes)) >> 8)
result[3] = byte(uint16(len(sendBytes)) & 0xFF)
copy(result[4:],sendBytes)
sendCrc := crc32.ChecksumIEEE(sendBytes)
result[packetLength-4] = byte(sendCrc >> 24)
result[packetLength-3] = byte(sendCrc >> 16 & 0xFF)
result[packetLength-2] = 0xFF
result[packetLength-1] = 0xFE
fmt.Println(result)
return result
}
//發(fā)送心跳包,與發(fā)送數(shù)據(jù)包一樣
func (client *TcpClient)sendHeartPacket() {
heartPacket := HeartPacket{
Version:"1.0",
Timestamp:time.Now().Unix(),
}
packetBytes,err := json.Marshal(heartPacket)
if err!=nil{
fmt.Println(err.Error())
}
packet := Packet{
PacketType:HEART_BEAT_PACKET,
PacketContent:packetBytes,
}
sendBytes,err := json.Marshal(packet)
if err!=nil{
fmt.Println(err.Error())
}
client.connection.Write(EnPackSendData(sendBytes))
fmt.Println("Send heartbeat data success!")
}
//拿一串隨機(jī)字符
func getRandString()string {
length := rand.Intn(50)
strBytes := make([]byte,length)
for i:=0;i<length;i++ {
strBytes[i] = byte(rand.Intn(26) + 97)
}
return string(strBytes)
}
后記
測試過程中,一共開了7個(gè)client,共計(jì)2100個(gè)goroutine,本機(jī)啟動服務(wù)器端,機(jī)器配置為i-5/8G的情況下,整體的資源使用情況如下:

需要改進(jìn)的地方,也是后兩篇的主題:
- 引入內(nèi)存池
- 服務(wù)無縫重啟