第一章
一、創(chuàng)建WebSocket服務(wù)端程序
1、創(chuàng)建目錄chat并初始化
cd chat
go mod init chat
2、增加文件client.go,該文件處理WebSocket客戶端程序。其中serveWebsocket()函數(shù)處理客戶端連接。
package main
import (
"bytes"
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
var (
newline = []byte{'\n'}
space = []byte{' '}
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// Client is a middleman between the websocket connection and the hub.
type Client struct {
hub *Hub
// The websocket connection.
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan []byte
}
// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
c.hub.broadcast <- message
}
}
// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
// Add queued chat messages to the current websocket message.
n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// serveWebsocket handles websocket requests from the peer.
func serveWebsocket(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
client.hub.register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.writePump()
go client.readPump()
}
3、新建main.go文件,該文件為主啟動程序,負責(zé)建立HTTP服務(wù)以響應(yīng)客戶端到serveWebsocket請求響應(yīng)。
package main
import (
"flag"
"log"
"net/http"
)
var addr = flag.String("addr", ":3000", "http service address")
func serveHome(w http.ResponseWriter, r *http.Request) {
log.Println(r.URL)
if r.URL.Path != "/" {
http.Error(w, "Not found", http.StatusNotFound)
return
}
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
http.ServeFile(w, r, "public/index.html")
}
func main() {
flag.Parse()
hub := NewWebsocketServer()
go hub.Run()
http.HandleFunc("/", serveHome)
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
serveWebsocket(hub, w, r)
})
http.Handle("/assets/", http.StripPrefix("/assets/", http.FileServer(http.Dir("public/assets"))))
err := http.ListenAndServe(*addr, nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
二、創(chuàng)建WebSocket頁面端程序
1、在chat目錄下創(chuàng)建public目錄,在該目錄下創(chuàng)建index.html文件并引入三方包
<!-- public/index.html -->
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
<!-- Load required Bootstrap and BootstrapVue CSS -->
<link type="text/css" rel="stylesheet" />
<link type="text/css" rel="stylesheet" />
<!-- Load polyfills to support older browsers -->
<script src="http://polyfill.io/v3/polyfill.min.js?features=es2015%2CIntersectionObserver" crossorigin="anonymous"></script>
<!-- Load Vue followed by BootstrapVue -->
<script src="https://cdn.jsdelivr.net/npm/vue@2.6.14/dist/vue.js"></script>
<script src="http://unpkg.com/bootstrap-vue@latest/dist/bootstrap-vue.min.js"></script>
<!-- Load the following for BootstrapVueIcons support -->
<script src="http://unpkg.com/bootstrap-vue@latest/dist/bootstrap-vue-icons.min.js"></script>
</head>
<body>
<div id="app">
<div class="container-fluid h-100">
<div class="row justify-content-center h-100">
<div class="col-md-8 col-xl-6 chat">
<div class="card">
<div class="card-header msg_head">
<div class="d-flex bd-highlight justify-content-center">
Chat
</div>
</div>
<div class="card-body msg_card_body">
<div
v-for="(message, key) in messages"
:key="key"
class="d-flex justify-content-start mb-4"
>
<div class="msg_cotainer">
{{message.message}}
<span class="msg_time"></span>
</div>
</div>
</div>
<div class="card-footer">
<div class="input-group">
<textarea
v-model="newMessage"
name=""
class="form-control type_msg"
placeholder="Type your message..."
@keyup.enter.exact="sendMessage"
></textarea>
<div class="input-group-append">
<span class="input-group-text send_btn" @click="sendMessage"
>></span>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
</body>
<script src="assets/app.js"></script>
</html>
2、在public下創(chuàng)建assets目錄并在該目錄下創(chuàng)九點過 app.js。
// public/assets/app.js
var app = new Vue({
el: '#app',
data: {
ws: null,
serverUrl: "ws://localhost:3000/ws",
messages: [],
newMessage: ""
},
mounted: function() {
this.connectToWebsocket()
},
methods: {
connectToWebsocket() {
this.ws = new WebSocket( this.serverUrl );
this.ws.addEventListener('open', (event) => { this.onWebsocketOpen(event) });
this.ws.addEventListener('message', (event) => { this.handleNewMessage(event) });
},
onWebsocketOpen() {
console.log("connected to WS!");
},
handleNewMessage(event) {
let data = event.data;
data = data.split(/\r?\n/);
for (let i = 0; i < data.length; i++) {
let msg = JSON.parse(data[i]);
this.messages.push(msg);
}
},
sendMessage() {
if(this.newMessage !== "") {
this.ws.send(JSON.stringify({message: this.newMessage}));
this.newMessage = "";
}
}
}
})
三、發(fā)送和接收消息
創(chuàng)建新文件chatServer.go,該文件包含一個Hub結(jié)構(gòu)體類型中的Clients注冊客戶,使用兩個channel管道實現(xiàn)注冊和解除注冊請求。
package main
type Hub struct {
clients map[*Client]bool
register chan *Client
unregister chan *Client
broadcast chan []byte
}
// NewWebsocketServer creates a new Hub type
func NewWebsocketServer() *Hub {
return &Hub{
clients: make(map[*Client]bool),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan []byte),
}
}
// Run our websocket server, accepting various requests
func (hub *Hub) Run() {
for {
select {
case client := <-hub.register:
hub.registerClient(client)
case client := <-hub.unregister:
hub.unregisterClient(client)
case message := <-hub.broadcast:
hub.broadcastToClients(message)
}
}
}
func (hub *Hub) registerClient(client *Client) {
hub.clients[client] = true
}
func (hub *Hub) unregisterClient(client *Client) {
if _, ok := hub.clients[client]; ok {
delete(hub.clients, client)
}
}
func (hub *Hub) broadcastToClients(message []byte) {
for client := range hub.clients {
client.send <- message
}
}
Run()函數(shù)持續(xù)偵聽管道,該函數(shù)是處理請求專用函數(shù),現(xiàn)在只提供增加和刪除客戶連接map功能。
運行程序
go run .
目錄結(jié)構(gòu)如下:

運行結(jié)果如下:

第二章
引入CommunicationChannel通信頻道
創(chuàng)建communication_channel.go文件,建立CommunicationChannel結(jié)構(gòu)體,每一個頻道中能夠注冊客戶端,解除注冊,廣播。
在Hub結(jié)構(gòu)體中增加CommunicationChannel
// chatServer.go
package main
type Hub struct {
...
communicationChannels map[*CommunicationChannel]bool
}
func NewWebsocketServer() *Hub {
return &Hub{
...
communicationChannels: make(map[*CommunicationChannel]bool),
}
}
通過maps和channels,可以獲取到客戶端在線狀況。
在chatServer.go中增加方法查找存在頻道和創(chuàng)建新頻道:
// chatServer.go
func (hub *Hub) findCommunicationChannelByName(name string) *CommunicationChannel {
var foundCommunicationChannel *CommunicationChannel
for comchan := range hub.communicationChannels {
if comchan.GetName() == name {
foundCommunicationChannel = comchan
break
}
}
return foundCommunicationChannel
}
func (hub *Hub) createCommunicationChannel(name string) *CommunicationChannel {
comchan := NewCommunicationChannel(name)
go comchan.RunCommunicationChannel()
hub.communicationChannels[comchan] = true
return comchan
}
增加消息處理
處理不同類型的類型,例如加入頻道或發(fā)送消息,引入Message類型,包括:
Action: 活動狀態(tài)(發(fā)送消息,加入或離開頻道)
Message: 消息內(nèi)容。
Target: 消息目標。
Sender:消息發(fā)送人
創(chuàng)建message.go:
// message.go
package main
import (
"encoding/json"
"log"
)
const SendMessageAction = "send-message"
const JoinCommunicationChannelAction = "join-communication-channel"
const LeaveCommunicationChannelAction = "leave-communication-channel"
type Message struct {
Action string `json:"action"`
Message string `json:"message"`
Target string `json:"target"`
Sender *Client `json:"sender"`
}
func (message *Message) encode() []byte {
json, err := json.Marshal(message)
if err != nil {
log.Println(err)
}
return json
}
與頻道進行交互
開始進行客戶端在頻道中的加入、離開、廣播。首先增加一個頻道Map對加入和離開就行追蹤。修改client.go中的disconnect()方法對頻道客戶進行解除注冊。
type Client struct {
...
ID uuid.UUID `json:"id"`
Name string `json:"name"`
communicationChannels map[*CommunicationChannel]bool
}
func newClient(conn *websocket.Conn, hub *Hub, name string) *Client {
return &Client{
ID: uuid.New(),
Name: name,
conn: conn,
hub: hub,
send: make(chan []byte, 256),
communicationChannels: make(map[*CommunicationChannel]bool),
}
}
func (client *Client) disconnect() {
client.hub.unregister <- client
for communicationChannel := range client.communicationChannels {
communicationChannel.unregister <- client
}
close(client.send)
client.conn.Close()
}
信息處理
現(xiàn)在客戶端已能夠加入頻道。通過不同的action處理不同的消息類型。
首先,修改client增加一個新方法解析JSON消息傳遞給指定人:
func (client *Client) handleNewMessage(jsonMessage []byte) {
var message Message
if err := json.Unmarshal(jsonMessage, &message); err != nil {
log.Printf("Error on unmarshal JSON message %s", err)
return
}
message.Sender = client
switch message.Action {
case SendMessageAction:
// The send-message action, this will send messages to a specific channel now.
// Which channel wil depend on the message Target
communicationChannelName := message.Target
if communicationChannel := client.hub.findCommunicationChannelByName(communicationChannelName); communicationChannel != nil {
communicationChannel.broadcast <- &message
}
case JoinCommunicationChannelAction:
client.handleJoinCommunicationChannelMessage(message)
case LeaveCommunicationChannelAction:
client.handleLeaveCommunicationChannelMessage(message)
}
}
上面代碼中的方法,我們直送信息到頻道,使用Message取代[]byte來發(fā)送信息,需調(diào)整communication_channel.go
// RunCommunicationChannel runs our comchan, accepting various requests
func (comchan *CommunicationChannel) RunCommunicationChannel() {
for {
select {
...
case message := <-comchan.broadcast:
comchan.broadcastToClientsInCommunicationChannel(message.encode())
}
}
}
當(dāng)頻道不存在時使用程序重新創(chuàng)建一個:
// client.go
func (client *Client) handleJoinCommunicationChannelMessage(message Message) {
communicationChannelName := message.Message
communicationChannel := client.hub.findCommunicationChannelByName(communicationChannelName)
if communicationChannel == nil {
communicationChannel = client.hub.createCommunicationChannel(communicationChannelName)
}
client.communicationChannels[communicationChannel] = true
communicationChannel.register <- client
}
修改readPump()函數(shù),在收到新消息時使用handleNewMessage方法進行處理:
func (client *Client) readPump() {
defer func() {
client.disconnect()
}()
client.conn.SetReadLimit(maxMessageSize)
client.conn.SetReadDeadline(time.Now().Add(pongWait))
client.conn.SetPongHandler(func(string) error { client.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
// Start endless read loop, waiting for messages from client
for {
_, jsonMessage, err := client.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("unexpected close error: %v", err)
}
break
}
client.handleNewMessage(jsonMessage)
}
}
命名登錄客戶
// client.go
type Client struct {
...
Name string `json:"name"`
}
func newClient(conn *websocket.Conn, wsServer *WsServer, name string) *Client {
return &Client{
Name: name,
...
}
}
func (client *Client) GetName() string {
return client.Name
}
修改serveWebsocket函數(shù),在URL中增加用戶名:
// client.go
func serveWebsocket(hub *Hub, w http.ResponseWriter, r *http.Request) {
name, ok := r.URL.Query()["name"]
if !ok || len(name[0]) < 1 {
log.Println("Url Param 'name' is missing")
return
}
...
client := newClient(conn, hub, name[0])
...
}
歡迎消息
當(dāng)用戶加入頻道時其他用戶能夠看到
首先,在communication_channel.go增加一個新方法
// communication_channel.go
const welcomeMessage = "%s joined the channel"
func (comchan *CommunicationChannel) notifyClientJoined(client *Client) {
message := &Message{
Action: SendMessageAction,
Target: comchan.name,
Message: fmt.Sprintf(welcomeMessage, client.GetName()),
}
comchan.broadcastToClientsInCommunicationChannel(message.encode())
}
當(dāng)用戶注冊時調(diào)用下述方法:
func (comchan *CommunicationChannel) registerClientInCommunicationChannel(client *Client) {
comchan.notifyClientJoined(client)
comchan.clients[client] = true
}
服務(wù)端代碼完成,開始修改前端代碼
頻道接口
// public/assets/app.js
var app = new Vue({
el: '#app',
data: {
ws: null,
serverUrl: "ws://localhost:3000/ws",
channelInput: null,
channels: [],
user: {
name: ""
}
},
...
})
channelInput為新創(chuàng)建頻道。
channels對所有加入頻道列表。
user為用戶數(shù)據(jù)。
方法修改如下:
methods: {
connect() {
this.connectToWebsocket();
},
connectToWebsocket() {
// Pass the name paramter when connecting.
this.ws = new WebSocket(this.serverUrl + "?name=" + this.user.name);
this.ws.addEventListener('open', (event) => { this.onWebsocketOpen(event) });
this.ws.addEventListener('message', (event) => { this.handleNewMessage(event) });
},
onWebsocketOpen() {
console.log("connected to WS!");
},
handleNewMessage(event) {
let data = event.data;
data = data.split(/\r?\n/);
for (let i = 0; i < data.length; i++) {
let msg = JSON.parse(data[i]);
// display the message in the correct channel.
const channel = this.findChannel(msg.target);
if (typeof channel !== "undefined") {
channel.messages.push(msg);
}
}
},
sendMessage(channel) {
// send message to correct channel.
if (channel.newMessage !== "") {
this.ws.send(JSON.stringify({
action: 'send-message',
message: channel.newMessage,
target: channel.name
}));
channel.newMessage = "";
}
},
findChannel(channelName) {
for (let i = 0; i < this.channels.length; i++) {
if (this.channels[i].name === channelName) {
return this.channels[i];
}
}
},
joinChannel() {
this.ws.send(JSON.stringify({ action: 'join-channel', message: this.channelInput }));
this.messages = [];
this.channels.push({ "name": this.channelInput, "messages": [] });
this.channelInput = "";
},
leaveChannel(channel) {
this.ws.send(JSON.stringify({ action: 'leave-channel', message: channel.name }));
for (let i = 0; i < this.channels.length; i++) {
if (this.channels[i].name === channel.name) {
this.channels.splice(i, 1);
break;
}
}
}
}
引進了三個方法分別做查找渠道、加入渠道、離開渠道。
HTML修改如下: