試著將nsq的tcp部分提出來,看看它是怎么處理沾包, 協(xié)議分裝
這個過程以后自己寫tcp對外提供服務(wù)應(yīng)該也是可以做到有參考,有借鑒 一下就能上手去做這樣的一件事情了
package main
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"sync/atomic"
)
var (
clientIDSequence int64 = 0
separatorBytes = []byte(" ")
heartbeatBytes = []byte("_heartbeat_")//心跳
okBytes = []byte("OK")
)
const defaultBufferSize = 16 * 1024
type Client interface {
Close() error
}
type Protocol interface {
NewClient(net.Conn) Client
IOLoop(Client) error
}
type clientV2 struct {
ID int64 //為每個客戶端連接 定義一個唯一id
Reader *bufio.Reader
Writer *bufio.Writer
}
func (c *clientV2) Close() (err error) {
return
}
type protocolV2 struct {
}
func (p *protocolV2) NewClient(conn net.Conn) Client {
clientID := atomic.AddInt64(&clientIDSequence, 1)
return &clientV2{
ID: clientID,
Reader: bufio.NewReaderSize(conn, defaultBufferSize),
Writer: bufio.NewWriterSize(conn, defaultBufferSize),
}
}
//PUB <topic_name>\n
//[ 4-byte size in bytes ][ N-byte binary data ]
//
//<topic_name> - a valid string (optionally having #ephemeral suffix)
//TODO 關(guān)于tcp通訊協(xié)議,nsq首先對命令做了區(qū)分,如果本身的命令是 fin 這種通知類型的命令,不存在業(yè)務(wù)數(shù)據(jù),那直接用 readuntil(特殊字符分割方式) 就ok了
//TODO 如果是帶有業(yè)務(wù)數(shù)據(jù)的這種通訊,很明顯特殊字符就不好定義了,就還是先采用 \n 作為特殊字符不停的讀, 讀到命令后發(fā)現(xiàn)是帶業(yè)務(wù)body的,就再按定長讀4字節(jié),讀出來的就是接下來的body數(shù)據(jù)長度指標值了, 再io.ReadFull(這個秒) 連續(xù)讀就行了
//TODO 所以我們?nèi)绻鲆粋€通用的tcp處理handle 倒不如直接就預(yù)定成
//PUB PUB2 PUB3\n
//[ 4-byte size in bytes ][ N-byte binary data ]
//TODO PUB 這個就相當于命令類型,可以多級以空格分開。 這樣設(shè)計的妙處就是哪怕你業(yè)務(wù)數(shù)據(jù)有 \n 這個特殊字符, 我也是優(yōu)先讀的第一段tcp協(xié)議頭,然后再讀4字節(jié)body長度指標,最后再讀body內(nèi)容
//TODO 不好的地方是帶body的是讀了3次, 但nsq根據(jù) tcp協(xié)議頭就斷出了是否為絕對不帶body通知消息,是的話就不需要再讀了,通知消息就一次就讀好了
//TODO 倒是兼容了2情況,如果通知頻繁效率更高,如果全是帶body的那就沒必要這樣了,換一種后面再分析
func (p *protocolV2) IOLoop(c Client) (err error) {
client, ok := c.(*clientV2)
if !ok {
return errors.New("client TYPE error")
}
for {
line, rErr := client.Reader.ReadSlice('\n')
if rErr != nil && rErr != io.EOF {
err = fmt.Errorf("failed to read command - %s", rErr)
break
}
line = line[:len(line)-1]
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
params := bytes.Split(line, separatorBytes) // 例如: FIN 12 23 body數(shù)據(jù)
response,exErr:=p.Exec(client,params) //nsq 將客戶端發(fā)送給服務(wù)端的信息,都是按\n分割,
checkErr(exErr)
//TODO 不存在需要返回就讓response等于nil就行了
if response!=nil{
_,rErr=client.Writer.Write(response)
checkErr(rErr)
}
}
return
}
//都歸功于約定好的協(xié)議,所以exec既充當了我們web中的路由分發(fā)
func (p *protocolV2) Exec(client *clientV2,params [][]byte) ([]byte, error) {
switch {
case bytes.Equal(params[0], []byte("FIN")):
return p.PUB(client,params)
}
return nil, errors.New(fmt.Sprintf("invalid command %s", params[0]))
}
//這個就是真正響應(yīng)指令的handle了
//里面會用到golang的大小端將二進制數(shù)據(jù)轉(zhuǎn)成[]byte
//最終按約定好的message格式(按位讀),將[]byte渲染到message結(jié)構(gòu)體上(可不是json通過 bind 綁定上去的)
//可謂是將數(shù)據(jù)流大小控制到了極致了呀~
func (p *protocolV2) PUB(client *clientV2,params [][]byte) ([]byte, error) {
lenSlice := make([]byte, 4)
io.ReadFull(client.Reader, lenSlice) //讀取body的長度
bodyLen := int32(binary.BigEndian.Uint32(lenSlice))
body := make([]byte, bodyLen)
io.ReadFull(client.Reader, body) //讀取后面約定好的body長度
return nil,nil
}
func checkErr(err error) {
//打日志
}
type TCPHandler interface {
Handle(net.Conn)
}
const (
PROTOCOlV2 = " V2"
)
func Handle(conn net.Conn) {
defer func() {
checkErr(conn.Close())
}()
//nsq的tcp協(xié)議,conn第一次被accept的時候,服務(wù)端會先讀conn的前4個字節(jié),這前四個字節(jié)代表conn是要走什么通訊協(xié)議
//這樣拓展性就強了
buf:=make([]byte,4)
if _,err:=io.ReadFull(conn,buf);err!=nil{
checkErr(conn.Close())
return
}
protocolMagic := string(buf)
var prot Protocol
switch protocolMagic {
case PROTOCOlV2:
prot=&protocolV2{}
}
checkErr(prot.IOLoop(conn))
}
// TCPServer 起tcp服務(wù),注意里面是for循環(huán)的,同步則永遠阻塞
func TCPServer(listener net.Listener) {
for {
clientConn, aErr := listener.Accept()
if aErr!=nil{
continue
}
go func() {
Handle(clientConn)
}()
}
}
func main() {
listener,_:= net.Listen("tcp","0.0.0.0:8080")
TCPServer(listener)
}