go語言實現(xiàn)websocket數(shù)據(jù)推送

主要分享下go語言實現(xiàn)的websocket推送。
有這樣一個應(yīng)用場景,服務(wù)端產(chǎn)生了一條很重要的數(shù)據(jù),需要實時推送到所有客戶端。
數(shù)據(jù)源采用redis 訂閱方式,有數(shù)據(jù)產(chǎn)生就pub,然后服務(wù)sub讀取數(shù)據(jù),最后用 websocket模式推送數(shù)據(jù)到所有客戶端。
這是基本設(shè)計思想。

首先 我們用第三方的庫 websocket和一個生成uuid的庫還有一個redis

go get -u github.com/gorilla/websocket
go get -u github.com/satori/go.uuid
go get -u github.com/go-redis/redis

開始服務(wù)端編程

server.go

 package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "io"

    "github.com/gorilla/websocket"
    "github.com/satori/go.uuid"
    "github.com/go-redis/redis"
)

type ClientManager struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
}

type Client struct {
    id     string
    socket *websocket.Conn
    send   chan []byte
}

type Message struct {
    Sender    string `json:"sender,omitempty"`
    Recipient string `json:"recipient,omitempty"`
    Content   string `json:"content,omitempty"`
}

var manager = ClientManager{
    broadcast:  make(chan []byte),
    register:   make(chan *Client),
    unregister: make(chan *Client),
    clients:    make(map[*Client]bool),
}

func (manager *ClientManager) start() {
    for {
        select {
        case conn := <-manager.register: //新客戶端加入
            manager.clients[conn] = true
            jsonMessage, _ := json.Marshal(&Message{Content: " a new socket has connected."})
            manager.send(jsonMessage, conn) //調(diào)用發(fā)送
        case conn := <-manager.unregister:
            if _, ok := manager.clients[conn]; ok {
                close(conn.send)
                delete(manager.clients, conn)
                jsonMessage, _ := json.Marshal(&Message{Content: "a socket has disconnected."})
                manager.send(jsonMessage, conn)
            }
        case message := <-manager.broadcast: //讀到廣播管道數(shù)據(jù)后的處理
        fmt.Println(string(message))
            for conn := range manager.clients {
                fmt.Println("每個客戶端",conn.id)

                select {
                    case conn.send <- message: //調(diào)用發(fā)送給全體客戶端
                    default:
                        fmt.Println("要關(guān)閉連接啊")
                        close(conn.send)
                        delete(manager.clients, conn)
                }
            }
        }
    }
}

func (manager *ClientManager) send(message []byte, ignore *Client) {
    for conn := range manager.clients {
        if conn != ignore {
            conn.send <- message //發(fā)送的數(shù)據(jù)寫入所有的 websocket 連接 管道
        }
    }
}
//客戶端寫入后 激活這里讀取
//想改成 讀取redis(已廢棄)
func (c *Client) read() {
    //pubsub := c.cache.Subscribe("mychannel1")
    defer func() {
        manager.unregister <- c
        c.socket.Close()
        fmt.Println("讀關(guān)閉")
    }()

    for {
        
        _, message, err := c.socket.ReadMessage()
        //msg,err := c.getRedis()
        fmt.Println("是在不停的讀嗎?")
        if err != nil {
            manager.unregister <- c
            c.socket.Close()
            //c.cache.Close()
            fmt.Println("讀不到數(shù)據(jù)就關(guān)閉?")
            break
        }
        jsonMessage, _ := json.Marshal(&Message{Sender: c.id, Content: string(message)})
        manager.broadcast <- jsonMessage  //激活start 程序 入廣播管道
        fmt.Println("發(fā)送數(shù)據(jù)到廣播")
    }
}
//寫入管道后激活這個進(jìn)程
func (c *Client) write() {
    defer func() {
        manager.unregister <- c
        c.socket.Close()
        fmt.Println("寫關(guān)閉了")
    }()

    for {
        select {
        case message, ok := <-c.send: //這個管道有了數(shù)據(jù) 寫這個消息出去
            if !ok {
                c.socket.WriteMessage(websocket.CloseMessage, []byte{})
                fmt.Println("發(fā)送關(guān)閉提示")
                return
            }

            err := c.socket.WriteMessage(websocket.TextMessage, message)
            if err != nil {
                manager.unregister <- c
                c.socket.Close()
                fmt.Println("寫不成功數(shù)據(jù)就關(guān)閉了")
                break
            }
            fmt.Println("寫數(shù)據(jù)")
        }
    }
}

func main() {
    fmt.Println("Starting application...")
    go manager.start()
    go manager.getRedisData()
    http.HandleFunc("/ws", wsPage)
    http.ListenAndServe(":12345", nil)
}

func wsPage(res http.ResponseWriter, req *http.Request) {
    //解析一個連接
    conn, error := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}).Upgrade(res, req, nil)
    if error != nil {
        //http.NotFound(res, req)
        //http 請求一個輸出
        io.WriteString(res,"這是一個websocket,不是網(wǎng)站.")
        return
    }
    
    uid,_ := uuid.NewV4()
    sha1 := uid.String()
    
    //初始化一個客戶端對象
    client := &Client{id: sha1, socket: conn,send: make(chan []byte)}
    //把這個對象發(fā)送給 管道
    manager.register <- client

    //go client.read()
    go client.write()
}

func (manager *ClientManager) getRedisData(){
    redisClient := redis.NewClient(&redis.Options{
        Addr:     "20.10.1.31:6381",
        Password: "", // no password set
        DB:       0,  // use default DB
    })
    redisSubscript :=redisClient.Subscribe("mychannel1")
    for{
        msg,err := redisSubscript.ReceiveMessage()
        if err != nil{
            redisClient.Close()
        }
        //manager.redisData<- msg.String()
        fmt.Println("重新讀數(shù)據(jù)吧")
        jsonMessage, _ := json.Marshal(&Message{Sender: "hi", Content: msg.String()})
        manager.broadcast <- jsonMessage  //激活start 程序 入廣播管道

    }
}

客戶端

client.go

package main

import (
    "flag"
    "fmt"
    "net/url"
    "time"

    "github.com/gorilla/websocket"
)

var addr = flag.String("addr", "localhost:12345", "http service address")

func main() {
    u := url.URL{Scheme: "ws", Host: *addr, Path: "/ws"}
    var dialer *websocket.Dialer

    conn, _, err := dialer.Dial(u.String(), nil)
    if err != nil {
        fmt.Println(err)
        return
    }

    //go timeWriter(conn)

    for {
        _, message, err := conn.ReadMessage()
        if err != nil {
            fmt.Println("read:", err)
            return
        }

        fmt.Printf("received: %s\n", message)
    }
}

一個網(wǎng)頁測試

js核心代碼,起一個websocket連接然后去讀取數(shù)據(jù)

<script type="text/javascript">

    if (window["WebSocket"]) {
        conn = new WebSocket("ws://localhost:12345/ws");
        conn.onclose = function(evt) {
            appendLog($("<div><b>Connection Closed.</b></div>"))
        }
        conn.onmessage = function(evt) {
            appendLog($("<div/>").text(evt.data))
        }
    } else {
        appendLog($("<div><b>WebSockets Not Support.</b></div>"))
    }
    });

</script>

最后在redis端寫publish mychannel1 6

然后在客戶端和web客戶端都收到 received:

{"sender":"hi","content":"Message\u003cmychannel1: 6\u003e"}

基本思路

主線程里起兩個并行協(xié)程,一個處理各項數(shù)據(jù),一個循環(huán)讀取redis訂閱,當(dāng)用戶客戶端連接上socket,再起一個這個客戶端寫的協(xié)程,用戶處理當(dāng)客戶端退出后釋放資源。然后用管道傳遞數(shù)據(jù)循環(huán)發(fā)送消息到客戶端。

資料參考

https://blog.csdn.net/wangshubo1989/article/details/78250790

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容