實現(xiàn)代碼如下:
import (
"errors"
"github.com/gorilla/websocket"
"sync"
)
type Connection struct {
wsConn *websocket.Conn
inChan chan []byte
outChan chan []byte
closeChan chan byte
mutex sync.Mutex
isClosed bool
}
func InitConnection(wsConn *websocket.Conn)(conn *Connection ,err error){
conn=&Connection{
wsConn:wsConn,
inChan:make(chan []byte,1000),
outChan:make(chan []byte,1000),
closeChan:make(chan byte,1),
}
go conn.readLoop()
go conn.writeLoop()
return
}
func (conn *Connection) ReadMessage()(data []byte,err error){
select {
case data=<-conn.inChan:
case <-conn.closeChan:
err=errors.New("connection is closed")
}
return
}
func (conn *Connection) WriteMessage(data []byte)(err error){
select {
case conn.outChan<-data:
case <-conn.closeChan:
err=errors.New("connection is closed")
}
return
}
func (conn *Connection) Close(){
//線程安全的Close,可重入
conn.wsConn.Close()
conn.mutex.Lock()
if !conn.isClosed {
close(conn.closeChan)
conn.isClosed=true
}
conn.mutex.Unlock()
}
func (conn *Connection) readLoop(){
var(
data []byte
err error
)
for{
if _,data,err=conn.wsConn.ReadMessage();err !=nil{
goto ERR
}
select {
case conn.inChan<-data:
case <-conn.closeChan:
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection) writeLoop(){
var (
data []byte
err error
)
for{
select {
case data=<-conn.outChan:
case <-conn.closeChan:
goto ERR
}
if err=conn.wsConn.WriteMessage(websocket.TextMessage,data);err!=nil{
goto ERR
}
}
ERR:
conn.Close()
}
go get github.com/gorilla/websocket
首先定義一個結(jié)構(gòu)體
type Connection struct {
wsConn *websocket.Conn
inChan chan []byte
outChan chan []byte
closeChan chan byte
mutex sync.Mutex
isClosed bool
}
wsConn websocket的長鏈接的實體
inChan 讀數(shù)據(jù)的channel
outChan 寫數(shù)據(jù)的channel
closeChan 鏈接關閉的channel
mutex 互斥鎖
isClosed 鏈接關閉標識符
func InitConnection(wsConn *websocket.Conn)(conn *Connection ,err error){
conn=&Connection{
wsConn:wsConn,
inChan:make(chan []byte,1000),
outChan:make(chan []byte,1000),
closeChan:make(chan byte,1),
}
go conn.readLoop()
go conn.writeLoop()
return
}
初始化鏈接
readLoop 和writeLoop 循環(huán)從websocket中讀取數(shù)據(jù)和寫入數(shù)據(jù)
func (conn *Connection) ReadMessage()(data []byte,err error){
select {
case data=<-conn.inChan:
case <-conn.closeChan:
err=errors.New("connection is closed")
}
return
}
ReadMessage() 從inChan中讀取數(shù)據(jù)
func (conn *Connection) WriteMessage(data []byte)(err error){
select {
case conn.outChan<-data:
case <-conn.closeChan:
err=errors.New("connection is closed")
}
return
}
WriteMessage(data []byte) 寫入數(shù)據(jù)傳遞給outChan ,writeLoop 監(jiān)聽outChan并寫入數(shù)據(jù)
func (conn *Connection) Close(){
//線程安全的Close,可重入
conn.wsConn.Close()
conn.mutex.Lock()
if !conn.isClosed {
close(conn.closeChan)
conn.isClosed=true
}
conn.mutex.Unlock()
}
mutex鎖住關閉操作 ,避免重復循環(huán)關閉鏈接
關閉鏈接時,傳遞closeChan ,同時關閉readLoop 和writeLoop
func (conn *Connection) readLoop(){
var(
data []byte
err error
)
for{
if _,data,err=conn.wsConn.ReadMessage();err !=nil{
goto ERR
}
select {
case conn.inChan<-data:
case <-conn.closeChan:
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection) writeLoop(){
var (
data []byte
err error
)
for{
select {
case data=<-conn.outChan:
case <-conn.closeChan:
goto ERR
}
if err=conn.wsConn.WriteMessage(websocket.TextMessage,data);err!=nil{
goto ERR
}
}
ERR:
conn.Close()
}
通過outChan和inChan 傳遞信息,保證線程的安全。