SpringBoot Websocket 實(shí)戰(zhàn)

什么是Websocket

Websocket 是一種在單個(gè)TCP連接上進(jìn)行全雙工通信的協(xié)議。WebSocket連接成功后,服務(wù)端與客戶端可以雙向通信。在需要消息推送的場(chǎng)景,Websocket 相對(duì)于輪詢能更好的節(jié)省服務(wù)器資源和帶寬,并且能夠更實(shí)時(shí)地進(jìn)行通訊。

HTTP與Websocket
  • 與 HTTP 協(xié)議有著良好的兼容性。默認(rèn)端口也是80和443,并且握手階段采用 HTTP 協(xié)議,因此握手時(shí)不容易屏蔽,能通過(guò)各種 HTTP 代理服務(wù)器。

  • 依賴于TCP協(xié)議

  • 數(shù)據(jù)格式比較輕量,性能開(kāi)銷小,通信高效。

  • 可以發(fā)送文本,也可以發(fā)送二進(jìn)制數(shù)據(jù)。

  • 沒(méi)有同源限制,客戶端可以與任意服務(wù)器通信。

  • 協(xié)議標(biāo)識(shí)符是ws(如果加密,則為wss),服務(wù)器網(wǎng)址就是 URL。

SpringBoot 中使用 Websocket

在簡(jiǎn)單了解Websocket 之后,我們來(lái)動(dòng)手實(shí)踐一下。SpringBoot 中有多種方式可以實(shí)現(xiàn)Websocket Server,這里我選擇使用Tomcat 中 javax.websocket.server 的api來(lái)實(shí)現(xiàn),結(jié)尾會(huì)給出demo地址

  1. 引入Maven依賴
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
  1. 創(chuàng)建一個(gè)Bean用于處理Websocket 請(qǐng)求,通過(guò)ServerEndpoint 聲明當(dāng)前Bean 接受的Websocket URL

這里為什么聲明的是 @Controller,后文會(huì)解釋

import org.springframework.stereotype.Controller;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint(value = "/message_websocket")
@Controller
public class MsgWebsocketController {

    @OnOpen
    public void onOpen(Session session) {
        // 先鑒權(quán),如果鑒權(quán)通過(guò)則存儲(chǔ)WebsocketSession,否則關(guān)閉連接,這里省略了鑒權(quán)的代碼 
        WebSocketSupport.storageSession(session);
        System.out.println("session open. ID:" + session.getId());
    }

    /**
     * 連接關(guān)閉調(diào)用的方法
     */
    @OnClose
    public void onClose(Session session) {
        System.out.println("session close. ID:" + session.getId());
    }

    /**
     * 收到客戶端消息后調(diào)用的方法
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("get client msg. ID:" + session.getId() + ". msg:" + message);
    }

    /**
     * 發(fā)生錯(cuò)誤時(shí)調(diào)用
     */
    @OnError
    public void onError(Session session, Throwable error) {
        error.printStackTrace();
    }

}

  1. 聲明 ServerEndpointExporter
@Configuration
public class WebsocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

至此,Websocket Server 已經(jīng)搭建完成,客戶端已經(jīng)可以和服務(wù)端通信了

服務(wù)端 向客戶端推送消息 通過(guò) session.getBasicRemote().sendText(message); 即可

源碼淺析

我們來(lái)看下上述的短短幾行代碼是如何為我們構(gòu)建 Websocket Server

ServerEndpointExporter
image.png

重點(diǎn)關(guān)注下紅框中的內(nèi)容

  1. ServerEndpointExporter 實(shí)現(xiàn)了 SmartInitializingSingleton,會(huì)在bean 實(shí)例化結(jié)束后調(diào)用 afterSingletonsInstantiated

  2. 從Spring上下文中獲取所有標(biāo)記@ServerEndpoint的Bean的name

其實(shí) 我們聲明的 MsgWebsocketController 中并不是只能標(biāo)記@Controller,只是為了將其注冊(cè)到Spring容器中,方便ServerEndpoint的注冊(cè)而已,標(biāo)記 @Controller 更符合Spring的開(kāi)發(fā)規(guī)范

3~4. 通過(guò)ServerContainer 將所有標(biāo)記@ServerEndpoint的Bean 注冊(cè)

ServerContainer 默認(rèn)的實(shí)現(xiàn)類為 WsServerContainer,會(huì)對(duì)我們的ServerEndpoint做一個(gè)映射,URL => 對(duì)應(yīng)的class,然后針對(duì)不同的事件調(diào)用指定的方法(例如建立連接時(shí)調(diào)用標(biāo)記@Onopen的方法),這有點(diǎn)Spring DispatcherServlet 那味,感興趣的同學(xué)可以自己看下

在了解了 Spring 為我們做了什么后,我們來(lái)完善一下我們的Demo

建立一個(gè)SessionManager

當(dāng)我們想向客戶端推送消息的時(shí)候,首先我們需要找到客戶端與服務(wù)端建立的連接,也就是WebscoketSession

WsServerContainer 中雖然已經(jīng)存儲(chǔ)了 WebscoketSession,但是并沒(méi)有辦法直接通過(guò)SessionId,或者我們的業(yè)務(wù)Id 直接定位到指定的Session,所以我們需要實(shí)現(xiàn)一個(gè)自己的SessionManager

final ConcurrentHashMap<Object, Session> sessionPool = new ConcurrentHashMap<>();

使用 ConcurrentHashMap 管理即可

分布式推送解決

image.png

如圖,用戶1與服務(wù)器A建立Webscoket,用戶2與服務(wù)器B建立Webscoket,那么用戶1如果想向用戶2推送一條消息,該如何實(shí)現(xiàn)?

WebscoketSession 實(shí)際上是網(wǎng)絡(luò)連接,并不像我們傳統(tǒng)應(yīng)用的Session可以序列化到Redis,只能每個(gè)服務(wù)器管理自己的WebscoketSession,所以此時(shí)服務(wù)器A通知服務(wù)器B,你要給用戶2推送一條消息。

一個(gè)比較簡(jiǎn)單有效的實(shí)現(xiàn)方法,利用消息隊(duì)列,如下圖

image.png

這個(gè)方案優(yōu)點(diǎn)是實(shí)現(xiàn)簡(jiǎn)單,缺點(diǎn)是每臺(tái)服務(wù)器都需要判斷一遍當(dāng)前是否存在指定的WebscoketSession ,方案細(xì)化的話則需要維護(hù)用戶Session與每臺(tái)服務(wù)器的關(guān)系,這樣直接將消息推送給指定服務(wù)器即可

其他問(wèn)題

測(cè)試時(shí)發(fā)現(xiàn),當(dāng)客戶端斷網(wǎng)后,服務(wù)端檢測(cè)不到客戶端失去連接的情況,依然可以調(diào)用Session的推送方法,服務(wù)端會(huì)一直持有這個(gè)無(wú)效的Session

目前想到的解決方案:設(shè)置WebsocketSession的最大空閑時(shí)間(session.setMaxIdleTimeout(milliseconds);),當(dāng)超過(guò)這個(gè)時(shí)間時(shí),服務(wù)端會(huì)關(guān)閉Session。前端定期發(fā)送一條心跳包,用于維持Session,當(dāng)出現(xiàn)上述情況時(shí),服務(wù)端也不會(huì)一直持有Session了

完整demo地址

關(guān)于demo的細(xì)節(jié)參考項(xiàng)目地址中Readme

Github ?? https://github.com/TavenYin/taven-springboot-learning/tree/master/sp-websocket

Gitee ?? https://gitee.com/yintianwen7/taven-springboot-learning/tree/master/sp-websocket

參考

http://www.ruanyifeng.com/blog/2017/05/websocket.html

部分代碼參考了一位兄弟的博客,但是由于時(shí)間有點(diǎn)長(zhǎng),找不到了,在此說(shuō)一聲抱歉

如果覺(jué)得有收獲,可以關(guān)注我的公眾號(hào)【殷天文】,第一時(shí)間接收到我的更新

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容