基于Rxjava2與OkHttp中WebSocket長連接封裝(斷連重試機(jī)制)

WebSocket 在開發(fā)中遇到的情況很少,導(dǎo)致在使用的時(shí)候可能遇到很多的問題,比如它的重連機(jī)制、發(fā)送數(shù)據(jù)的統(tǒng)一、結(jié)合Service使用;下面的文章將使用OkHttp中的WebSocket以及Rxjava2結(jié)合Service實(shí)現(xiàn)斷開重連,發(fā)送和接收服務(wù)器的數(shù)據(jù)。

WebSocket

我們使用WebSocket的庫有很多,類似 AndroidAsync,作者對它的介紹是一個(gè)更加底層的異步網(wǎng)絡(luò)庫,使用這個(gè)庫也非常簡單,傳一個(gè)地址和協(xié)議再加一個(gè)回調(diào)接口,就能簡單的使用:

AsyncHttpClient.getDefaultInstance()
    .websocket(get, "my-protocol", new WebSocketConnectCallback() {
    @Override
    public void onCompleted(Exception ex, WebSocket webSocket) {
        if (ex != null) {
            ex.printStackTrace();
            return;
        }
        webSocket.send("a string");
        webSocket.send(new byte[10]);
        webSocket.setStringCallback(new StringCallback() {
            public void onStringAvailable(String s) {
                System.out.println("I got a string: " + s);
            }
        });
        webSocket.setDataCallback(new DataCallback() {
            public void onDataAvailable(DataEmitter emitter, 
                              ByteBufferList byteBufferList) {
                System.out.println("I got some bytes!");
                // note that this data has been read
                byteBufferList.recycle();
            }
        });
    }
});

但是,我們看到這個(gè)庫卻是有非常多的類,通常的,我們的網(wǎng)絡(luò)請求庫都只有一個(gè),現(xiàn)在大部分Android開發(fā)都會(huì)使用類似OkHttp或者Volley來請求網(wǎng)絡(luò),如果再加一個(gè)庫的話,無疑增加了應(yīng)用的代碼量,這里我們使用OkHttp中內(nèi)置的WebSocket,基于它的一個(gè)封裝,我們也能寫一個(gè)輕量級的WebSocket應(yīng)用; 而OkHttp中的WebSocket使用也是封裝方便:

// 創(chuàng)建一個(gè)Request
Request request = new Request.Builder()
        .url(socketUrl)
        .build();
OkHttpClient client = new OkHttpClient();
// 使用OkHttpClient 來創(chuàng)建一個(gè)WebSocket
client.newWebSocket(request, new WebSocketListener() {
    @Override
    public void onOpen(WebSocket webSocket, Response response) {
        super.onOpen(webSocket, response);
    }
    @Override
    public void onMessage(WebSocket webSocket, String text) {
        super.onMessage(webSocket, text);
    }
    @Override
    public void onMessage(WebSocket webSocket, ByteString bytes) {
        super.onMessage(webSocket, bytes);
    }
    @Override
    public void onClosing(WebSocket webSocket, int code, String reason) {
        super.onClosing(webSocket, code, reason);
    }
    @Override
    public void onClosed(WebSocket webSocket, int code, String reason) {
        super.onClosed(webSocket, code, reason);
    }
    @Override
    public void onFailure(WebSocket webSocket, Throwable t,
                @javax.annotation.Nullable Response response) {
        super.onFailure(webSocket, t, response);
    }
});
client.dispatcher().executorService().shutdown();

可以看到,先創(chuàng)建一個(gè)Request,我們在調(diào)用接口的時(shí)候,也是創(chuàng)建一個(gè)Request對象,然后使用OkHttpClient來創(chuàng)建一個(gè)WebSocket,回調(diào)到WebSocketListener監(jiān)聽接口之后,再做邏輯業(yè)務(wù)處理。(接口方法意思都很簡單,使用的時(shí)候再說明)

Rxjava2 使用

前面的博客都簡單的介紹了Rxjava2的使用,最近在項(xiàng)目中使用的也比較頻繁,后面再寫一些項(xiàng)目中Rxjava2的實(shí)戰(zhàn):

Rxjava2 學(xué)習(xí)創(chuàng)建型操作符
Rxjava2 學(xué)習(xí)變換操作符
Rxjava2 學(xué)習(xí)過濾操作符

項(xiàng)目開發(fā):

既然是使用到邏輯業(yè)務(wù)上的操作,和界面無關(guān),自然會(huì)想到Service; 將其直接封裝成Service,在使用的時(shí)候直接bindService或者startService會(huì)方便很多,不知不覺中將邏輯業(yè)務(wù)和頁面分隔開; 如果我們直接在頁面中使用也未嘗不可,但是這樣就不便復(fù)用了。

所以開始我們先創(chuàng)建一個(gè)Service:

public WebSocketService extends Service {
    public static final String LOG_TAG = "WebSocketTest";
   
    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        Log.v(LOG_TAG, "----- onBind -----");
        return new ServiceBinder();
    }
    public class ServiceBinder extends Binder {
        public WebSocketService getService() {
            return WebSocketService.this;
        }
    }
    @Override
    public void onCreate() {
        super.onCreate();
        Log.i(LOG_TAG, "----- onCreate -----");
    }
    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        //Use this to force restart service
        return START_STICKY;
    }
    @Override
    public void onDestroy() {
        super.onDestroy();
        Log.i(LOG_TAG, "----- onDestroy -----");
    }
}

Service常用的步驟,使用其中的方法,這里我們使用bindService回調(diào)一個(gè)ServiceConnect接口,因?yàn)槲覀冃枰褂玫竭@個(gè)WebSocketService實(shí)例;接下來,我們就需要在onCreate方法中做一些初始化的操作:

/**
 * 初始化
 *
 * @param startReason
 * @param isFirstConnect
 */
private void initSocketWrapper(String startReason, boolean isFirstConnect) {
    // 拿到Reason,打印log
    Observable.just(startReason)
            .filter(new Predicate<String>() {
                @Override
                public boolean test(String s) throws Exception {
                    // 判斷當(dāng)前是否正在連接
                    if (isAttemptConnecting) {
                        Log.v(LOG_TAG, startReason + " : Should reconnect but"+
                                       "already in process, skip.");
                        return Boolean.FALSE;
                    }
                    return Boolean.TRUE;
                }
            })
            .subscribeOn(AndroidSchedulers.mainThread())
            .doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    if ((mWebSocket == null)     // 如果已經(jīng)為空
                        && (!isFirstConnect)         // 不是第一次連接
                        && (!isAttemptConnecting)) {    // 當(dāng)前沒有在嘗試連接
                        showUiWebSocketStatus("與服務(wù)器失去連接?。?!");
                    }
                }
            })
            .observeOn(Schedulers.io())
            .subscribe(s -> initSocket());
}    

上面寫清楚了注釋,這個(gè)方法主要是初始化,拿到當(dāng)前的原因和是否是第一次連接,還有一個(gè)全局變量 isAttemptConnecting 來判斷當(dāng)前WebSocket是否在連接中,然后在這之間先判斷 如果websocket為空并且不是第一次連接,而且還沒有嘗試連接,則toast提示用戶斷開連接?。?!

接下來就是使用OkHttp中WebSocket創(chuàng)建連接了:

/**
 * 初始化WebSocket
 */
private void initSocket() {
    // ... 省略一些狀態(tài)切換代碼
    ...    

    // 開始初始化
    Observable.create(new ObservableOnSubscribe<WebSocket>() {
        @Override
        public void subscribe(ObservableEmitter<WebSocket> emitter) 
                                            throws Exception {
            //TODO 這里可以進(jìn)行登錄業(yè)務(wù)判斷
            Request request = new Request.Builder()
                    .url(socketUrl)
                    .build();
            OkHttpClient client = new OkHttpClient();
            client.newWebSocket(request, new WebSocketListener() {
                @Override
                public void onOpen(WebSocket webSocket, Response response) {
                    super.onOpen(webSocket, response);
                    isAttemptConnecting = false;
                    connectionAttemptCount = 0;
                    // 連接成功之后
                    mWebSocket = webSocket;
                    dispatchStringMessage("連接成功?。。?);
                    emitter.onNext(mWebSocket);
                    emitter.onComplete();
                }
                @Override
                public void onMessage(WebSocket webSocket, String text) {
                    super.onMessage(webSocket, text);
                    dispatchStringMessage(text);
                }
                @Override
                public void onMessage(WebSocket webSocket, ByteString bytes) {
                    super.onMessage(webSocket, bytes);
                }
                @Override
                public void onClosing(WebSocket webSocket, int code, String reason) {
                    super.onClosing(webSocket, code, reason);
                }
                @Override
                public void onClosed(WebSocket webSocket, int code, String reason) {
                    super.onClosed(webSocket, code, reason);
                    Log.i(LOG_TAG, "ClosedCallback: WebSocket closed.");
                    // 等待自檢重啟,或者自然關(guān)閉
                    if ((!preparedShutdown) && (shouldAutoReconnect)) {
                        initSocketWrapper("onClose");
                    }
                }
                @Override
                public void onFailure(WebSocket webSocket, Throwable t, 
                                  @Nullable Response response) {
                    super.onFailure(webSocket, t, response);
                    dispatchStringMessage("連接失敗?。?!");
                    emitter.onError(t != null ? t 
                        : new ConnectException("Cannot connect we service!!!"));
                }
            });
            client.dispatcher().executorService().shutdown();
        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Consumer<WebSocket>() {
        @Override
        public void accept(WebSocket webSocket) throws Exception {
            if (pongService == null) {
                startPongDaemonService();
            }
        }
    }, 
    new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        throwable.printStackTrace();
        // 判斷是否需要執(zhí)行診斷服務(wù)
        if (connectionAttemptCount >= ATTEMPT_TOLERANCE) {
            // 強(qiáng)制開始診斷服務(wù)
            startService(new Intent(WebSocketService.this, 
                                    NetworkDiagnosisService.class));
            // 重置標(biāo)記
            connectionAttemptCount = 0;
        }
    }
}

使用create操作符創(chuàng)建一個(gè)被觀察者對象發(fā)射器,在其中使用OkHttp的創(chuàng)建WebSocket方式創(chuàng)建WebSocket,然后根據(jù)連接的結(jié)果進(jìn)行emitter發(fā)射 onNext()、onError()、onComplete() 方法,連接成功之后,開始發(fā)送 自檢服務(wù) Pong; 連接失敗之后,開始檢測網(wǎng)絡(luò)是否有連接:NetWorkDiagnosisService 網(wǎng)絡(luò)診斷服務(wù)。

發(fā)送Pong守護(hù)進(jìn)程,先創(chuàng)建一個(gè)單線程線程池,然后發(fā)送消息:

/**
 * 給服務(wù)器發(fā)送Pong自檢
 */
private void startPongDaemonService() {
    pongService = Executors.newSingleThreadScheduledExecutor();
    pongService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            if (mWebSocket != null) {
                sendRequest(WsObjectPool.newPongRequest());
            }
        }
    }, 10, 10, TimeUnit.SECONDS);
    Log.i(LOG_TAG, "Pong service has been scheduled at " + 10 + " seconds delay.");
}

延遲十秒執(zhí)行,使用一個(gè)WebSocketObjectPool對象池,為了方便取出對象數(shù)據(jù)。

整個(gè)WebSocket初始化就在上面,流程還是比較簡單的,接下來就是從服務(wù)器拿到數(shù)據(jù)之后分發(fā)數(shù)據(jù)了, 因?yàn)槲疫@里測試只用到String數(shù)據(jù),如果需要用到Json的數(shù)據(jù),則分發(fā)json數(shù)據(jù)即可:

    /**
     * 方法字符串
     *
     * @param message
     */
    private void dispatchStringMessage(String message) {
        Observable.just(message)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {
                        WsListener<String> listener =
                         (WsListener<String>)activeListener.get(
                 SocketConstants.ResponseType.RESPONSE_STRING_MESSAGE);
                        Log.d(LOG_TAG, "Msg entity: " + s + ".");
                        if (listener != null) {
                            listener.handleData(s);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

拿到message String數(shù)據(jù)之后,或者到外部監(jiān)聽器,因?yàn)槭鞘褂玫絊tring字符串類型,所以這里直接獲取到一個(gè)監(jiān)聽器,然后分發(fā)給方法。

如果是方法json的話,就去GitHub下載代碼查看詳細(xì)代碼:代碼

自檢服務(wù)

連接成功之后,我們需要自檢,因?yàn)榭赡茉谶B接過程中出現(xiàn)斷連的情況,網(wǎng)絡(luò)不穩(wěn)定情況,所以需要使用循環(huán)的自檢:

/**
 * 啟動(dòng)自檢服務(wù),按照周期執(zhí)行
 */
private void startSelfCheckService() {
    // 自檢服務(wù)器打開
    mSelfCheckDispose = Observable
            .interval(10, 10, TimeUnit.SECONDS)
            .filter(new Predicate<Long>() {
                @Override
                public boolean test(Long aLong) throws Exception {
                    if (!shouldAutoReconnect) {
                        Log.i(LOG_TAG, "Auto reconnect has been disabled,"
                                       "maybe kicked?");
                    }
                    return shouldAutoReconnect;
                }
            })
            .map(new Function<Long, Boolean>() {
                @Override
                public Boolean apply(Long aLong) throws Exception {
                    return checkSocketAvailable();
                }
            })
            .subscribeOn(Schedulers.computation())
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    Log.i(LOG_TAG, "Self check task has been scheduled per " 
                                    + 10 + " seconds.");
                    shouldAutoReconnect = true;
                    Log.i(LOG_TAG, "Auto reconnect feature has been enabled.");
                }
            })
            .subscribe(new Consumer<Boolean>() {
                           @Override
                           public void accept(Boolean webSocketAlive)
                                             throws Exception {
                               if (webSocketAlive) {
                                   Log.v(LOG_TAG, "WebSocket self check: is alive.");
                                   return;
                               }
                               // 自檢服務(wù)器打開
                               initSocketWrapper("SelfCheckService");
                           }
                       },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.e(LOG_TAG, "Error while executing self check!" 
                                         + throwable);
                        }
                    });
}

這里我沒使用lambda表達(dá)式,為了讓方法清楚,所以代碼比較長,但是結(jié)構(gòu)比較清晰;

首先使用 interval 創(chuàng)建一個(gè)延時(shí)的周期被觀察者,然后根據(jù)當(dāng)前設(shè)置是否需要自動(dòng)連接來過濾是否進(jìn)行下面的操作,接下來判斷當(dāng)前的連接是否存在和連接,接下來判斷是否存活,如果斷連則調(diào)用初始化的方法,在上面解釋了什么情況下會(huì)調(diào)用自檢服務(wù)。

總結(jié)

整篇文章寫了OkHttp的WebSocket使用,斷連重試機(jī)制,Service使用等,主要的一些細(xì)節(jié)在文章中可能沒體現(xiàn)出來,如果有需要?jiǎng)t下載源碼自己修改運(yùn)行。 最后貼上Github地址,喜歡的話給個(gè)start! ??

項(xiàng)目的github地址

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,511評論 19 139
  • Spring Web MVC Spring Web MVC 是包含在 Spring 框架中的 Web 框架,建立于...
    Hsinwong閱讀 22,931評論 1 92
  • 1、Activity生命周期? onCreate() -> onStart() -> onResume() -> ...
    01_小小魚_01閱讀 2,867評論 0 44
  • 不知天上宮闕,只知地上扁鵲。 當(dāng)李白看完扁鵲幼年的日記時(shí),他瞬間明白,是什么讓一個(gè)人有這么大變化。 扁鵲的師傅徐福...
    越人哥哥閱讀 606評論 1 2
  • 漸漸變冷了,太陽給的光里溫暖被打了折扣。聽樹葉沙沙作響,感覺它們脆的隨時(shí)都會(huì)粉身碎骨。 午飯后去了超市,看到有點(diǎn)青...
    糖木小徑閱讀 295評論 0 0

友情鏈接更多精彩內(nèi)容