主要分享下go語言實現(xiàn)的websocket推送。
有這樣一個應(yīng)用場景,服務(wù)端產(chǎn)生了一條很重要的數(shù)據(jù),需要實時推送到所有客戶端。
數(shù)據(jù)源采用redis 訂閱方式,有數(shù)據(jù)產(chǎn)生就pub,然后服務(wù)sub讀取數(shù)據(jù),最后用 websocket模式推送數(shù)據(jù)到所有客戶端。
這是基本設(shè)計思想。
首先 我們用第三方的庫 websocket和一個生成uuid的庫還有一個redis
go get -u github.com/gorilla/websocket
go get -u github.com/satori/go.uuid
go get -u github.com/go-redis/redis
開始服務(wù)端編程
server.go
package main
import (
"encoding/json"
"fmt"
"net/http"
"io"
"github.com/gorilla/websocket"
"github.com/satori/go.uuid"
"github.com/go-redis/redis"
)
type ClientManager struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
}
type Client struct {
id string
socket *websocket.Conn
send chan []byte
}
type Message struct {
Sender string `json:"sender,omitempty"`
Recipient string `json:"recipient,omitempty"`
Content string `json:"content,omitempty"`
}
var manager = ClientManager{
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
}
func (manager *ClientManager) start() {
for {
select {
case conn := <-manager.register: //新客戶端加入
manager.clients[conn] = true
jsonMessage, _ := json.Marshal(&Message{Content: " a new socket has connected."})
manager.send(jsonMessage, conn) //調(diào)用發(fā)送
case conn := <-manager.unregister:
if _, ok := manager.clients[conn]; ok {
close(conn.send)
delete(manager.clients, conn)
jsonMessage, _ := json.Marshal(&Message{Content: "a socket has disconnected."})
manager.send(jsonMessage, conn)
}
case message := <-manager.broadcast: //讀到廣播管道數(shù)據(jù)后的處理
fmt.Println(string(message))
for conn := range manager.clients {
fmt.Println("每個客戶端",conn.id)
select {
case conn.send <- message: //調(diào)用發(fā)送給全體客戶端
default:
fmt.Println("要關(guān)閉連接啊")
close(conn.send)
delete(manager.clients, conn)
}
}
}
}
}
func (manager *ClientManager) send(message []byte, ignore *Client) {
for conn := range manager.clients {
if conn != ignore {
conn.send <- message //發(fā)送的數(shù)據(jù)寫入所有的 websocket 連接 管道
}
}
}
//客戶端寫入后 激活這里讀取
//想改成 讀取redis(已廢棄)
func (c *Client) read() {
//pubsub := c.cache.Subscribe("mychannel1")
defer func() {
manager.unregister <- c
c.socket.Close()
fmt.Println("讀關(guān)閉")
}()
for {
_, message, err := c.socket.ReadMessage()
//msg,err := c.getRedis()
fmt.Println("是在不停的讀嗎?")
if err != nil {
manager.unregister <- c
c.socket.Close()
//c.cache.Close()
fmt.Println("讀不到數(shù)據(jù)就關(guān)閉?")
break
}
jsonMessage, _ := json.Marshal(&Message{Sender: c.id, Content: string(message)})
manager.broadcast <- jsonMessage //激活start 程序 入廣播管道
fmt.Println("發(fā)送數(shù)據(jù)到廣播")
}
}
//寫入管道后激活這個進(jìn)程
func (c *Client) write() {
defer func() {
manager.unregister <- c
c.socket.Close()
fmt.Println("寫關(guān)閉了")
}()
for {
select {
case message, ok := <-c.send: //這個管道有了數(shù)據(jù) 寫這個消息出去
if !ok {
c.socket.WriteMessage(websocket.CloseMessage, []byte{})
fmt.Println("發(fā)送關(guān)閉提示")
return
}
err := c.socket.WriteMessage(websocket.TextMessage, message)
if err != nil {
manager.unregister <- c
c.socket.Close()
fmt.Println("寫不成功數(shù)據(jù)就關(guān)閉了")
break
}
fmt.Println("寫數(shù)據(jù)")
}
}
}
func main() {
fmt.Println("Starting application...")
go manager.start()
go manager.getRedisData()
http.HandleFunc("/ws", wsPage)
http.ListenAndServe(":12345", nil)
}
func wsPage(res http.ResponseWriter, req *http.Request) {
//解析一個連接
conn, error := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}).Upgrade(res, req, nil)
if error != nil {
//http.NotFound(res, req)
//http 請求一個輸出
io.WriteString(res,"這是一個websocket,不是網(wǎng)站.")
return
}
uid,_ := uuid.NewV4()
sha1 := uid.String()
//初始化一個客戶端對象
client := &Client{id: sha1, socket: conn,send: make(chan []byte)}
//把這個對象發(fā)送給 管道
manager.register <- client
//go client.read()
go client.write()
}
func (manager *ClientManager) getRedisData(){
redisClient := redis.NewClient(&redis.Options{
Addr: "20.10.1.31:6381",
Password: "", // no password set
DB: 0, // use default DB
})
redisSubscript :=redisClient.Subscribe("mychannel1")
for{
msg,err := redisSubscript.ReceiveMessage()
if err != nil{
redisClient.Close()
}
//manager.redisData<- msg.String()
fmt.Println("重新讀數(shù)據(jù)吧")
jsonMessage, _ := json.Marshal(&Message{Sender: "hi", Content: msg.String()})
manager.broadcast <- jsonMessage //激活start 程序 入廣播管道
}
}
客戶端
client.go
package main
import (
"flag"
"fmt"
"net/url"
"time"
"github.com/gorilla/websocket"
)
var addr = flag.String("addr", "localhost:12345", "http service address")
func main() {
u := url.URL{Scheme: "ws", Host: *addr, Path: "/ws"}
var dialer *websocket.Dialer
conn, _, err := dialer.Dial(u.String(), nil)
if err != nil {
fmt.Println(err)
return
}
//go timeWriter(conn)
for {
_, message, err := conn.ReadMessage()
if err != nil {
fmt.Println("read:", err)
return
}
fmt.Printf("received: %s\n", message)
}
}
一個網(wǎng)頁測試
js核心代碼,起一個websocket連接然后去讀取數(shù)據(jù)
<script type="text/javascript">
if (window["WebSocket"]) {
conn = new WebSocket("ws://localhost:12345/ws");
conn.onclose = function(evt) {
appendLog($("<div><b>Connection Closed.</b></div>"))
}
conn.onmessage = function(evt) {
appendLog($("<div/>").text(evt.data))
}
} else {
appendLog($("<div><b>WebSockets Not Support.</b></div>"))
}
});
</script>
最后在redis端寫publish mychannel1 6
然后在客戶端和web客戶端都收到 received:
{"sender":"hi","content":"Message\u003cmychannel1: 6\u003e"}
基本思路
主線程里起兩個并行協(xié)程,一個處理各項數(shù)據(jù),一個循環(huán)讀取redis訂閱,當(dāng)用戶客戶端連接上socket,再起一個這個客戶端寫的協(xié)程,用戶處理當(dāng)客戶端退出后釋放資源。然后用管道傳遞數(shù)據(jù)循環(huán)發(fā)送消息到客戶端。
資料參考
https://blog.csdn.net/wangshubo1989/article/details/78250790