SpringBoot整合WebSocket實(shí)現(xiàn)數(shù)據(jù)實(shí)時發(fā)送

WebSocket簡介

目的

HTML5 WebSocket設(shè)計出來的目的就是取代輪詢和長連接,使客戶端瀏覽器具備像C/S框架下桌面系統(tǒng)的即時通訊能力,實(shí)現(xiàn)了瀏覽器和服務(wù)器全雙工通信,建立在TCP之上,雖然WebSocket和HTTP一樣通過TCP來傳輸數(shù)據(jù),但WebSocket可以主動的向?qū)Ψ桨l(fā)送或接收數(shù)據(jù),就像Socket一樣;并且WebSocket需要類似TCP的客戶端和服務(wù)端通過握手連接,連接成功后才能互相通信。

優(yōu)點(diǎn)

雙向通信、事件驅(qū)動、異步、使用ws或wss協(xié)議的客戶端能夠真正實(shí)現(xiàn)意義上的推送功能。

缺點(diǎn)

少部分瀏覽器不支持。

示例

社交聊天(微信、QQ)、彈幕、多玩家玩游戲、協(xié)同編輯、股票基金實(shí)時報價、體育實(shí)況更新、視頻會議/聊天、基于位置的應(yīng)用、在線教育、智能家居等高實(shí)時性的場景。

WebSocket請求響應(yīng)客戶端服務(wù)器交互圖
請求響應(yīng)客戶端服務(wù)器交互圖.png

WebSocket方式減少了很多TCP打開和關(guān)閉連接的操作,WebSocket的資源利用率高。

java WebSocket實(shí)現(xiàn)

Oracle 發(fā)布的 java 的 WebSocket 的規(guī)范是 JSR356規(guī)范 ,Tomcat從7.0.27開始支持WebSocket,從7.0.47開始支持JSR-356。

websocket簡單實(shí)現(xiàn)分為以下幾個步驟:

  1. 添加websocket庫
  2. 編寫后臺代碼
  3. 編寫前端代碼。
添加websocket整合springboot依賴
   <!--websocket-->
   <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
   </dependency>
后臺代碼實(shí)現(xiàn)

配置類


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 首先注入一個ServerEndpointExporterBean,該Bean會自動注冊使用@ServerEndpoint注解申明的websocket endpoint
 * 鏈接:https://www.imooc.com/article/70702?block_id=tuijian_wz
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

websocket接口實(shí)現(xiàn)


@ServerEndpoint(value = "/websocket") //接受websocket請求路徑
@Component
public class PoundWebSocket {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 保存所有在線socket連接
     */
    private static Map<String, PoundWebSocket> webSocketMap = new LinkedHashMap<>();

    /**
     * 記錄當(dāng)前在線數(shù)目
     */
    private static int count = 0;

    /**
     * 當(dāng)前連接(每個websocket連入都會創(chuàng)建一個MyWebSocket實(shí)例
     */
    private Session session;

    /**
     * 創(chuàng)建監(jiān)聽串口
     */
    private static SerialPort serialPort = null;

    /**
     * 創(chuàng)建監(jiān)聽器
     */
    private static SerialPortEventListener serialPortEventListener = null;

    /**
     * 監(jiān)聽串口
     */
    private static String PORT_NAME;

    /**
     * 監(jiān)聽串口波特率
     */
    private static int BAUD_RATE;

    /**
     * 數(shù)據(jù)位
     */
    private static int DATA_BITS;

    /**
     * 停止位
     */
    private static int STOP_BITS;

    /**
     * 奇偶位
     */
    private static int PARITY;

    /**
     * 地磅型號
     */
    private static String MODEL;

    private static IPoundInfoService poundInfoService;

    private static ApplicationContext applicationContext;

    public static void setApplicationContext(ApplicationContext applicationContext) {
        PoundWebSocket.applicationContext = applicationContext;
    }

    private static StringBuffer stringBuffer = new StringBuffer();

    /**
     * 處理連接建立
     *
     * @param session
     */
    @OnOpen
    public void onOpen(Session session) {
        if (poundInfoService == null) {
            poundInfoService = applicationContext.getBean(IPoundInfoService.class);
        }
        //獲取地磅信息
        PoundInfo poundInfo = poundInfoService.findOne();
        PORT_NAME = poundInfo.getSerialPort();
        BAUD_RATE = poundInfo.getBaudRate();
        MODEL = poundInfo.getModel();
        DATA_BITS = poundInfo.getDataBits() != null ? poundInfo.getDataBits() : SerialPort.DATABITS_8;
        STOP_BITS = poundInfo.getStopBits() != null ? poundInfo.getStopBits() : SerialPort.STOPBITS_1;
        PARITY = poundInfo.getParity() != null ? poundInfo.getParity() : SerialPort.PARITY_NONE;

        this.session = session;
        webSocketMap.put(session.getId(), this);
        addCount();
//        logger.info("新的連接加入:{}", session.getId());
        try {
            //確保串口已被關(guān)閉,未關(guān)閉會導(dǎo)致重新監(jiān)聽串口失敗
            if (serialPort != null) {
                SerialPortUtil.closePort(serialPort);
                serialPort = null;
            }
            //創(chuàng)建串口 COM5位串口名稱 9600波特率
            if (serialPort == null && StringUtils.isNotEmpty(PORT_NAME) && StringUtils.isNotEmpty(MODEL)) {
                serialPort = SerialPortUtil.openPort(PORT_NAME, BAUD_RATE, DATA_BITS, PARITY, STOP_BITS);
//                logger.info("創(chuàng)建串口:{}", serialPort);
                //設(shè)置串口監(jiān)聽
                SerialPortUtil.addListener(serialPort, new SerialPortEventListener() {

                    @Override
                    public void serialEvent(SerialPortEvent serialPortEvent) {
                        if (serialPortEvent.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
                            try {
                                //讀取串口數(shù)據(jù)
                                byte[] bytes = SerialPortUtil.readFromPort(serialPort);

                                //根據(jù)型號解析字符串
                                switch (MODEL) {
                                    case PoundConstant.MODEL_XK_3190:
                                        parsingString1(bytes);
                                        break;
                                    case PoundConstant.MODEL_XK_3190_10:
                                        parsingString2(bytes);
                                        break;
                                    case PoundConstant.MODEL_D_2008:
                                        parsingString1(bytes);
                                        break;
                                    case PoundConstant.MODEL_DK_3230_D_6:
                                        parsingString3(bytes);
                                        break;
                                    case PoundConstant.MODEL_D_2009_F:
                                        parsingString4(bytes);
                                        break;
                                    default:
                                        String value = String.valueOf(Integer.valueOf(new String(bytes, "GB2312")) - RandomUtil.randomInt(1000, 10000));
                                        sendMessageToAll(value);
                                }

//                                System.out.println("收到的數(shù)據(jù):" + new String(bytes, "GB2312") + "----" + new Date());

                            } catch (ReadDataFromSerialPortFailure readDataFromSerialPortFailure) {
                                logger.error(readDataFromSerialPortFailure.toString());
                            } catch (SerialPortInputStreamCloseFailure serialPortInputStreamCloseFailure) {
                                logger.error(serialPortInputStreamCloseFailure.toString());
                            } catch (UnsupportedEncodingException e) {
                                logger.error(e.toString());
                            } catch (IOException e) {
                                logger.error(e.toString());
                            }
                        }
                    }
                });
            }
        } catch (SerialPortParameterFailure serialPortParameterFailure) {
            logger.error(serialPortParameterFailure.toString());
        } catch (NotASerialPort notASerialPort) {
            logger.error(notASerialPort.toString());
        } catch (NoSuchPort noSuchPort) {
            logger.error(noSuchPort.toString());
        } catch (PortInUse portInUse) {
            logger.error(portInUse.toString());
        } catch (TooManyListeners tooManyListeners) {
            logger.error(tooManyListeners.toString());
        }
    }

    /**
     * 解析字符串 方法1
     *
     * @param bytes 獲取的字節(jié)碼
     */
    private void parsingString1(byte[] bytes) {
        StringBuffer sb = new StringBuffer();
        //將ASCII碼轉(zhuǎn)成字符串
        for (int i = 0; i < bytes.length; i++) {
            sb.append((char) Integer.parseInt(String.valueOf(bytes[i])));
        }

        //解析字符串
        String[] strs = sb.toString().trim().split("\\+");
        int weight = 0;
        for (int j = 0; j < strs.length; j++) {
            if (strs[j].trim().length() >= 6) {
                weight = Integer.parseInt(strs[j].trim().substring(0, 6));
                //發(fā)送數(shù)據(jù)
                sendMessageToAll(String.valueOf(weight));
                break;
            }
        }
    }

    /**
     * 解析字符串 方法2
     *
     * @param bytes 獲取的字節(jié)碼
     */
    private void parsingString2(byte[] bytes) {
        StringBuffer sb = new StringBuffer();
        //將ASCII碼轉(zhuǎn)成字符串
        for (int i = 0; i < bytes.length; i++) {
            sb.append((char) Integer.parseInt(String.valueOf(bytes[i])));
        }
        //解析字符串
        String[] strs = sb.toString().trim().split("\\+");
        double weight = 0;
        for (int j = 0; j < strs.length; j++) {
            if (strs[j].trim().length() >= 6) {
                weight = Double.parseDouble(strs[j].trim().substring(0, 6)) / 10;
                //發(fā)送數(shù)據(jù)
                sendMessageToAll(String.valueOf(weight));
                break;
            }
        }
    }

    /**
     * 解析字符串 方法3
     *
     * @param bytes 獲取的字節(jié)碼
     */
    private void parsingString3(byte[] bytes) {
        StringBuffer sb = new StringBuffer();
        //將ASCII碼轉(zhuǎn)成字符串
        for (int i = 0; i < bytes.length; i++) {
            sb.append((char) Integer.parseInt(String.valueOf(bytes[i])));
        }

//        logger.info("sb:" + sb.toString());
        sb.reverse();

        //解析字符串
        String[] strs = sb.toString().trim().split("\\=");
        double weight = 0;
        for (int j = 0; j < strs.length; j++) {
            if (strs[j].trim().length() >= 6) {
                weight = Double.parseDouble(strs[j].trim());
                //發(fā)送數(shù)據(jù)
                sendMessageToAll(String.valueOf(weight));
                break;
            }
        }
    }

    /**
     * 解析字符串 方法3
     *
     * @param bytes 獲取的字節(jié)碼
     */
    private void parsingString4(byte[] bytes) {
        StringBuffer sb = new StringBuffer();
        //將ASCII碼轉(zhuǎn)成字符串
        for (int i = 0; i < bytes.length; i++) {
            sb.append((char) Integer.parseInt(String.valueOf(bytes[i])));
        }

//        logger.info("sb:" + sb.reverse());
        //字符串反轉(zhuǎn)
        sb.reverse();

        //解析字符串
        String[] strs = sb.toString().trim().split("\\=");
        int weight = 0;
        for (int j = 0; j < strs.length; j++) {
            if (strs[j].trim().length() >= 6) {
                weight = Integer.parseInt(strs[j].trim().substring(0, 6));
                //發(fā)送數(shù)據(jù)
                sendMessageToAll(String.valueOf(weight));
                break;
            }
        }
    }

    /**
     * 接受消息
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        logger.info("收到客戶端{(lán)}消息:{}", session.getId(), message);
        try {
            this.sendMessage(message);
        } catch (Exception e) {
            logger.error(e.toString());
        }
    }

    /**
     * 處理錯誤
     *
     * @param error
     * @param session
     */
    @OnError
    public void onError(Throwable error, Session session) {
        logger.info("發(fā)生錯誤{},{}", session.getId(), error.getMessage());
    }

    /**
     * 處理連接關(guān)閉
     */
    @OnClose
    public void onClose() {
        webSocketMap.remove(this.session.getId());
        reduceCount();
        logger.info("連接關(guān)閉:{}", this.session.getId());

        //連接關(guān)閉后關(guān)閉串口,下一次打開連接重新監(jiān)聽串口
        if (serialPort != null) {
            SerialPortUtil.closePort(serialPort);
            serialPort = null;
        }
    }

    /**
     * 群發(fā)消息
     *
     * @param message
     */
    public void sendMessageToAll(String message) {
        for (int i = 0; i < webSocketMap.size(); i++) {
            try {
//                logger.info("session:id=" + session.getId());
                this.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                logger.error(e.getMessage());
            }
        }
    }

    /**
     * 發(fā)送消息
     *
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
//        logger.info("session:id=" + session.getId());
        this.session.getBasicRemote().sendText(message);
    }

    //廣播消息
    public static void broadcast() {
        PoundWebSocket.webSocketMap.forEach((k, v) -> {
            try {
                v.sendMessage("這是一條測試廣播");
            } catch (Exception e) {
            }
        });
    }

    //獲取在線連接數(shù)目
    public static int getCount() {
        return count;
    }

    //操作count,使用synchronized確保線程安全
    public static synchronized void addCount() {
        PoundWebSocket.count++;
    }

    public static synchronized void reduceCount() {
        PoundWebSocket.count--;
    }

}

解決websocket不能注入bean的問題

import com.yotrio.pound.sockets.PoundWebSocket;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.ConfigurableApplicationContext;

@ServletComponentScan
// mapper 接口類掃描包配置
@MapperScan("com.yotrio.pound.dao")
@SpringBootApplication
public class PoundClientApplication {

    public static void main(String[] args) {
        // 程序啟動入口
        // 啟動嵌入式的 Tomcat 并初始化 Spring 環(huán)境及其各 Spring 組件
//        SpringApplication.run(PoundClientApplication.class, args);

        ConfigurableApplicationContext applicationContext = SpringApplication.run(PoundClientApplication.class, args);
        //解決websocket不能注入bean的問題
        PoundWebSocket.setApplicationContext(applicationContext);
    }

}
前臺js實(shí)現(xiàn)
    //websocket實(shí)現(xiàn)
    var websocket;
    var socketUrl = "ws://127.0.0.1:8000/websocket";
    var count = 0;
    if ('WebSocket' in window) {
        // console.log("此瀏覽器支持websocket");
        websocket = new WebSocket(socketUrl);
    } else if ('MozWebSocket' in window) {
        alert("此瀏覽器只支持MozWebSocket");
    } else {
        alert("此瀏覽器只支持SockJS");
    }
    websocket.onopen = function (evnt) {
        // $("#tou").html("鏈接服務(wù)器成功!");
        console.log("鏈接服務(wù)器成功");
    };
    websocket.onmessage = function (evnt) {
        // console.log("event", evnt.data);
        $("#currentWeight").val(evnt.data);
    };
    websocket.onerror = function (evnt) {
        console.log("消息異常:" + evnt.data);
    };
    websocket.onclose = function (evnt) {
        console.log("與服務(wù)器斷開了鏈接");
    }

友情鏈接

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 原文地址:http://www.ibm.com/developerworks/cn/java/j-lo-WebSo...
    敢夢敢當(dāng)閱讀 9,032評論 0 50
  • WebSocket 機(jī)制 WebSocket 是 HTML5 一種新的協(xié)議。它實(shí)現(xiàn)了瀏覽器與服務(wù)器全雙工通信,能更...
    勇敢的_心_閱讀 2,379評論 0 4
  • (一) 昨天晚上跟先生講道: 我想去剪頭發(fā)。先生回我道: 你已經(jīng)想剪頭發(fā)好久了,可是一直沒見你去剪,只是想想有什么...
    idea偉閱讀 764評論 2 4
  • 低矮磚房,風(fēng)吹麥浪。麥子們互相摩擦著,像是戀愛了。田埂上行走的女人,背靠大樹,望向麥田,努力尋找著生命中的那一粒白...
    清歡亦歡閱讀 1,796評論 0 7
  • ??作為一個菜鳥,剛拿到Jeb,通過哪個文件啟動Jeb都不知道,隨便點(diǎn)了些文件,也是各種報錯或者沒反應(yīng),最后各種搜...
    A04閱讀 10,037評論 2 2

友情鏈接更多精彩內(nèi)容