nsq 的tcp協(xié)議分裝

試著將nsq的tcp部分提出來,看看它是怎么處理沾包, 協(xié)議分裝

這個過程以后自己寫tcp對外提供服務(wù)應(yīng)該也是可以做到有參考,有借鑒 一下就能上手去做這樣的一件事情了

package main

import (
    "bufio"
    "bytes"
    "encoding/binary"
    "errors"
    "fmt"
    "io"
    "net"
    "sync/atomic"
)





var (
    clientIDSequence int64 = 0
    separatorBytes         = []byte(" ")
    heartbeatBytes         = []byte("_heartbeat_")//心跳
    okBytes                = []byte("OK")
)

const defaultBufferSize = 16 * 1024

type Client interface {
    Close() error
}

type Protocol interface {
    NewClient(net.Conn) Client
    IOLoop(Client) error
}

type clientV2 struct {
    ID     int64 //為每個客戶端連接 定義一個唯一id
    Reader *bufio.Reader
    Writer *bufio.Writer
}

func (c *clientV2) Close() (err error) {
    return
}

type protocolV2 struct {
}

func (p *protocolV2) NewClient(conn net.Conn) Client {
    clientID := atomic.AddInt64(&clientIDSequence, 1)

    return &clientV2{
        ID:     clientID,
        Reader: bufio.NewReaderSize(conn, defaultBufferSize),
        Writer: bufio.NewWriterSize(conn, defaultBufferSize),
    }
}
//PUB <topic_name>\n
//[ 4-byte size in bytes ][ N-byte binary data ]
//
//<topic_name> - a valid string (optionally having #ephemeral suffix)

//TODO 關(guān)于tcp通訊協(xié)議,nsq首先對命令做了區(qū)分,如果本身的命令是 fin 這種通知類型的命令,不存在業(yè)務(wù)數(shù)據(jù),那直接用 readuntil(特殊字符分割方式) 就ok了
//TODO 如果是帶有業(yè)務(wù)數(shù)據(jù)的這種通訊,很明顯特殊字符就不好定義了,就還是先采用 \n 作為特殊字符不停的讀, 讀到命令后發(fā)現(xiàn)是帶業(yè)務(wù)body的,就再按定長讀4字節(jié),讀出來的就是接下來的body數(shù)據(jù)長度指標值了, 再io.ReadFull(這個秒) 連續(xù)讀就行了
//TODO 所以我們?nèi)绻鲆粋€通用的tcp處理handle 倒不如直接就預(yù)定成
//PUB PUB2 PUB3\n
//[ 4-byte size in bytes ][ N-byte binary data ]
//TODO PUB 這個就相當于命令類型,可以多級以空格分開。 這樣設(shè)計的妙處就是哪怕你業(yè)務(wù)數(shù)據(jù)有 \n 這個特殊字符, 我也是優(yōu)先讀的第一段tcp協(xié)議頭,然后再讀4字節(jié)body長度指標,最后再讀body內(nèi)容
//TODO 不好的地方是帶body的是讀了3次, 但nsq根據(jù) tcp協(xié)議頭就斷出了是否為絕對不帶body通知消息,是的話就不需要再讀了,通知消息就一次就讀好了
//TODO 倒是兼容了2情況,如果通知頻繁效率更高,如果全是帶body的那就沒必要這樣了,換一種后面再分析

func (p *protocolV2) IOLoop(c Client) (err error) {

    client, ok := c.(*clientV2)
    if !ok {
        return errors.New("client TYPE error")
    }

    for {

        line, rErr := client.Reader.ReadSlice('\n')
        if rErr != nil && rErr != io.EOF {
            err = fmt.Errorf("failed to read command - %s", rErr)
            break
        }
        line = line[:len(line)-1]
        if len(line) > 0 && line[len(line)-1] == '\r' {
            line = line[:len(line)-1]
        }

        params := bytes.Split(line, separatorBytes) // 例如: FIN 12 23 body數(shù)據(jù)
        response,exErr:=p.Exec(client,params)  //nsq 將客戶端發(fā)送給服務(wù)端的信息,都是按\n分割,
        checkErr(exErr)

        //TODO 不存在需要返回就讓response等于nil就行了
        if response!=nil{
            _,rErr=client.Writer.Write(response)
            checkErr(rErr)
        }

    }
    return
}

//都歸功于約定好的協(xié)議,所以exec既充當了我們web中的路由分發(fā)
func (p *protocolV2) Exec(client *clientV2,params [][]byte) ([]byte, error) {
    switch {
    case bytes.Equal(params[0], []byte("FIN")):
        return p.PUB(client,params)
    }
    return nil, errors.New(fmt.Sprintf("invalid command %s", params[0]))
}


//這個就是真正響應(yīng)指令的handle了
//里面會用到golang的大小端將二進制數(shù)據(jù)轉(zhuǎn)成[]byte
//最終按約定好的message格式(按位讀),將[]byte渲染到message結(jié)構(gòu)體上(可不是json通過 bind 綁定上去的)
//可謂是將數(shù)據(jù)流大小控制到了極致了呀~
func (p *protocolV2) PUB(client *clientV2,params [][]byte) ([]byte, error) {
    lenSlice := make([]byte, 4)
    io.ReadFull(client.Reader, lenSlice) //讀取body的長度
    bodyLen := int32(binary.BigEndian.Uint32(lenSlice))
    body := make([]byte, bodyLen)
    io.ReadFull(client.Reader, body) //讀取后面約定好的body長度
    return nil,nil
}
func checkErr(err error)  {
    //打日志
}

type TCPHandler interface {
    Handle(net.Conn)
}

const (
    PROTOCOlV2 = "  V2"
)

func Handle(conn net.Conn)  {
    defer func() {
        checkErr(conn.Close())
    }()

    //nsq的tcp協(xié)議,conn第一次被accept的時候,服務(wù)端會先讀conn的前4個字節(jié),這前四個字節(jié)代表conn是要走什么通訊協(xié)議
    //這樣拓展性就強了

    buf:=make([]byte,4)
    if _,err:=io.ReadFull(conn,buf);err!=nil{
        checkErr(conn.Close())
        return
    }
    protocolMagic := string(buf)
    var prot Protocol
    switch protocolMagic {
    case PROTOCOlV2:
        prot=&protocolV2{}
    }
    checkErr(prot.IOLoop(conn))
}


// TCPServer 起tcp服務(wù),注意里面是for循環(huán)的,同步則永遠阻塞
func TCPServer(listener net.Listener)  {
    
    for  {
        clientConn, aErr := listener.Accept()
        if aErr!=nil{
            continue
        }

        go func() {
            Handle(clientConn)
        }()

    }
}


func main() {
    listener,_:= net.Listen("tcp","0.0.0.0:8080")
    TCPServer(listener)
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 從大學(xué)就開始接觸到了網(wǎng)絡(luò)協(xié)議,零零散散不成體系,那時也沒有理解的多么透徹,更別談將它應(yīng)用到實際。工作開始漸漸的意識...
    PuHJ閱讀 14,878評論 2 19
  • 用兩張圖告訴你,為什么你的 App 會卡頓? - Android - 掘金 Cover 有什么料? 從這篇文章中你...
    hw1212閱讀 13,965評論 2 59
  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 32,289評論 2 89
  • nsq 需要搞清楚的幾點: nsqd各節(jié)點之間是沒有通訊的,所有nsqd只向nsqlookupd去register...
    Best博客閱讀 1,022評論 0 0
  • nsq官方文檔 項目結(jié)構(gòu)欣賞 1.通訊分析nsq運行起來主要組件有3,分別是nsqlookupd,nsqd,nsq...
    Best博客閱讀 444評論 0 1

友情鏈接更多精彩內(nèi)容