zinx V0.7 讀寫分離

七、Zinx的讀寫分離模型 · 語雀 (yuque.com)

  1. 對框架進行讀寫分離 我們要實現(xiàn)的是客戶端發(fā)送消息給客戶端 客戶端的Reader對消息進行接收 然后Reader通過管道將消息發(fā)送給Writer而不是直接發(fā)回給客戶端 最后Writer將消息發(fā)回給客戶端
    因為之前沒有管道服務端不知道該什么時候將消息發(fā)回 可能客戶端發(fā)送了兩次 服務端才將這兩次消息一并返回
    因此我們需要實現(xiàn)讀寫分離
消息處理
  1. 因為我們之前的Reader是在Connection模塊中實現(xiàn)的 所以我們在Connection結構體中新增添1個Channel
type Connection struct {
  ···
  // 無緩沖的管道 用于讀寫的Goroutine之間的消息通信
    MsgChan chan []byte
}

這里是channel存的是字節(jié)切片而不是Message 因為Writer要發(fā)送給客戶端的消息是已經序列化好的二進制流
這里就是我們的Writer模塊

/*
    主要是寫消息的Goroutine 專門發(fā)送給客戶端的模塊
*/
func (c *Connection) StartWriter() {
    fmt.Println("[Writer Goroutinr is running]")
    defer fmt.Println(c.GetRemoteAddr().String(), " [conn Writer exit!]")
    defer fmt.Println("Writer is exit!")

    // 不斷阻塞等待Msgchan的消息
    for {
        select {
        // data就是已經序列化好的Message 二進制流的結構MsgDatalen|MsgId|MsgData
        case data := <-c.MsgChan:
            // 有數據要寫給客戶端
            if _, err := c.Conn.Write(data); err != nil {
                fmt.Println("Send data error", err, " Conn Writer exit")
                return
            }
        case <-c.ExitChannel: //可讀代表Reader已經退出 此時Writer也要退出
            return
        }
    }
}

然后我們在Connection的Start中同時開啟Reader和Writer這兩個協(xié)程

func (c *Connection) Start() {
    fmt.Println("Conn Start() ... Conn ID = ", c.ConnId)
    // 啟動從當前鏈接的讀數據的業(yè)務
    // Todo 啟動從當前鏈接寫數據的業(yè)務
    go c.StartReader()
    go c.StartWriter()
}

此時兩者同時運行 當Reader讀到消息后 將調用路由對應的處理方法 而我們的處理方法中調用了SendMsg

func (this *PingRouter) Handle(request ziface.IRequest) {
    fmt.Println("Call PingRouter Handle...")
    fmt.Println("Recv from client: msgId= ", request.GetMsgId(),
        ", data= ", string(request.GetData()))
    // 回寫數據
    err := request.GetConnection().SendMsg(200, []byte("ping..ping..ping.."))
    if err != nil {
        fmt.Println(err)
        return
    }
}

SendMsg是將服務端要發(fā)送的內容序列化好后并傳給客戶端

// 提供一個SendMsg方法 將我們要發(fā)送給客戶端的數據 先進行封包再發(fā)送
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
    if c.IsClosed == true {
        return errors.New("Connection closed when send msg")
    }
    // 將data進行封包 MsgDataLen|MsgId|MsgData
    dp := NewDataPack()
    // 這個msg是已經序列化的二進制msg
    binaryMsg, err := dp.Pack(NewMsgPackage(msgId, data))
    if err != nil {
        fmt.Println("Pack error msg id = ", msgId)
        return errors.New("Pack error msg")
    }
    // 把數據發(fā)送給管道
    c.MsgChan <- binaryMsg
    return nil
}

看函數中的倒數第二行 將封包了的數據輸送進管道 而當前的MsgChan我們初始化時是設置為無緩沖的 所以一有數據進入管道 Writer端通過select監(jiān)聽到有數據就會立馬接收數據并進行處理

基本的流程就是客戶端發(fā)送封裝好的消息->Reader進行接收->路由的處理方法進行處理->SendMsg數據發(fā)送給管道->Writer監(jiān)聽到數據并進行處理

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容