soul從入門到放棄5--淺析websocket數(shù)據(jù)同步(一)

零、前戲

最近在“玩”demo的過程中,就一直好奇。插件信息等元數(shù)據(jù),soul是怎么從soul-admin同步到soul-bootstrap中的。

本著大膽瞎猜,小心求證的心態(tài),就有今天的關(guān)于websocket這篇文章。

一、服務(wù)端--soul-admin

  • 首先從jar入手,soul-admin中的只有一個spring-boot-starter-websocket官方j(luò)ar包。

猜測:服務(wù)端實現(xiàn)是在寫在soul-admin項目內(nèi)部

  • 通過全文搜索“websocket”,找到了DataSyncConfiguration
image

涉及websocket的配置初始化代碼如下:

image

與yml中的配置項對應(yīng)

image
  • 進(jìn)入WebsocketCollector中一探究竟
image

@ServerEndpoint("/websocket")科普帖子:https://blog.csdn.net/weixin_39793752/article/details/80949128

@ServerEndpoint("/websocket")
public class WebsocketCollector {
    private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
    private static final String SESSION_KEY = "sessionKey";
    /**
     * On open.
     *
     * @param session the session
     */
    @OnOpen
    public void onOpen(final Session session) {
        log.info("websocket on open successful....");
        SESSION_SET.add(session);
    }
    /**
     * On message.
     * 
     * @param message the message
     * @param session the session
     */
    @OnMessage
    public void onMessage(final String message, final Session session) {
        if (message.equals(DataEventTypeEnum.MYSELF.name())) {
            try {
                ThreadLocalUtil.put(SESSION_KEY, session);
                SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
            } finally {
                ThreadLocalUtil.clear();
            }
        }
    }
    /**
     * On close.
     *
     * @param session the session
     */
    @OnClose
    public void onClose(final Session session) {
        SESSION_SET.remove(session);
        ThreadLocalUtil.clear();
    }
    /**
     * On error.
     *
     * @param session the session
     * @param error   the error
     */
    @OnError
    public void onError(final Session session, final Throwable error) {
        SESSION_SET.remove(session);
        ThreadLocalUtil.clear();
        log.error("websocket collection error: ", error);
    }
    /**
     * Send.
     *
     * @param message the message
     * @param type    the type
     */
    public static void send(final String message, final DataEventTypeEnum type) {
        if (StringUtils.isNotBlank(message)) {
            if (DataEventTypeEnum.MYSELF == type) {
                try {
                    Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);
                    if (session != null) {
                        session.getBasicRemote().sendText(message);
                    }
                } catch (IOException e) {
                    log.error("websocket send result is exception: ", e);
                }
                return;
            }
            for (Session session : SESSION_SET) {
                try {
                    session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    log.error("websocket send result is exception: ", e);
                }
            }
        }
    }
}

@OnOpen:連接建立成功調(diào)用的方法

@OnClose:連接關(guān)閉調(diào)用方法

@OnMessage:收到客戶端消息后調(diào)用的方法

@OnError:發(fā)送錯誤時調(diào)用

  • 當(dāng)服務(wù)端收到一條MYSELF類型的消息,將同步全量元數(shù)據(jù)信息
  • admin 會推送一次全量數(shù)據(jù),后續(xù)如果配置數(shù)據(jù)發(fā)生變更則將增量數(shù)據(jù)通過 websocket 主動推送給 soul-web

詳細(xì)機(jī)制此處暫留坑,后文再續(xù)

image

二、客戶端--soul-bootstrap

從pom文件入手,找到了soul-spring-boot-starter-sync-data-websocket中的WebsocketSyncDataConfiguration類

image
  • 深入WebsocketSyncDataService
image
  • 按照配置soul-admin的個數(shù),創(chuàng)建線程池大小
  • 通過線程池的定時任務(wù),實現(xiàn)心跳機(jī)制----保持心跳
  • /使用調(diào)度線程池進(jìn)行斷線重連,30秒進(jìn)行一次。如果連接斷開,將client.connectBlocking(3000, TimeUnit.MILLISECONDS);重連,重新發(fā)送MYSELF類型信息,進(jìn)行全量更新。
image

線程池科普貼:https://blog.csdn.net/weixin_35756522/article/details/81707276

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

command:執(zhí)行線程

initialDelay:初始化延時

period:兩次開始執(zhí)行最小間隔時間

unit:計時單位

  • SoulWebsocketClient 是具體處理websocket類
image
  • 在與服務(wù)端建立連接時,會發(fā)送MYSELF類型的消息。****如上文所述,服務(wù)端接收****MYSELF類****型消息后將發(fā)送全量消息用于同步。

小結(jié):

  • soul-admin與soul-bootstrap基本通信鏈路,基本捋順。相關(guān)數(shù)據(jù)處理的細(xì)節(jié),將在后續(xù)文章中淺析。
  • 日拱一卒,每天進(jìn)步一點點
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容