- 對框架進行讀寫分離 我們要實現(xiàn)的是客戶端發(fā)送消息給客戶端 客戶端的Reader對消息進行接收 然后Reader通過管道將消息發(fā)送給Writer而不是直接發(fā)回給客戶端 最后Writer將消息發(fā)回給客戶端
因為之前沒有管道服務端不知道該什么時候將消息發(fā)回 可能客戶端發(fā)送了兩次 服務端才將這兩次消息一并返回
因此我們需要實現(xiàn)讀寫分離

消息處理
- 因為我們之前的Reader是在Connection模塊中實現(xiàn)的 所以我們在Connection結構體中新增添1個Channel
type Connection struct {
···
// 無緩沖的管道 用于讀寫的Goroutine之間的消息通信
MsgChan chan []byte
}
這里是channel存的是字節(jié)切片而不是Message 因為Writer要發(fā)送給客戶端的消息是已經序列化好的二進制流
這里就是我們的Writer模塊
/*
主要是寫消息的Goroutine 專門發(fā)送給客戶端的模塊
*/
func (c *Connection) StartWriter() {
fmt.Println("[Writer Goroutinr is running]")
defer fmt.Println(c.GetRemoteAddr().String(), " [conn Writer exit!]")
defer fmt.Println("Writer is exit!")
// 不斷阻塞等待Msgchan的消息
for {
select {
// data就是已經序列化好的Message 二進制流的結構MsgDatalen|MsgId|MsgData
case data := <-c.MsgChan:
// 有數據要寫給客戶端
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send data error", err, " Conn Writer exit")
return
}
case <-c.ExitChannel: //可讀代表Reader已經退出 此時Writer也要退出
return
}
}
}
然后我們在Connection的Start中同時開啟Reader和Writer這兩個協(xié)程
func (c *Connection) Start() {
fmt.Println("Conn Start() ... Conn ID = ", c.ConnId)
// 啟動從當前鏈接的讀數據的業(yè)務
// Todo 啟動從當前鏈接寫數據的業(yè)務
go c.StartReader()
go c.StartWriter()
}
此時兩者同時運行 當Reader讀到消息后 將調用路由對應的處理方法 而我們的處理方法中調用了SendMsg
func (this *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("Call PingRouter Handle...")
fmt.Println("Recv from client: msgId= ", request.GetMsgId(),
", data= ", string(request.GetData()))
// 回寫數據
err := request.GetConnection().SendMsg(200, []byte("ping..ping..ping.."))
if err != nil {
fmt.Println(err)
return
}
}
SendMsg是將服務端要發(fā)送的內容序列化好后并傳給客戶端
// 提供一個SendMsg方法 將我們要發(fā)送給客戶端的數據 先進行封包再發(fā)送
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.IsClosed == true {
return errors.New("Connection closed when send msg")
}
// 將data進行封包 MsgDataLen|MsgId|MsgData
dp := NewDataPack()
// 這個msg是已經序列化的二進制msg
binaryMsg, err := dp.Pack(NewMsgPackage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id = ", msgId)
return errors.New("Pack error msg")
}
// 把數據發(fā)送給管道
c.MsgChan <- binaryMsg
return nil
}
看函數中的倒數第二行 將封包了的數據輸送進管道 而當前的MsgChan我們初始化時是設置為無緩沖的 所以一有數據進入管道 Writer端通過select監(jiān)聽到有數據就會立馬接收數據并進行處理
基本的流程就是客戶端發(fā)送封裝好的消息->Reader進行接收->路由的處理方法進行處理->SendMsg數據發(fā)送給管道->Writer監(jiān)聽到數據并進行處理