基于okhttp和RxJava封裝的自動(dòng)重連的WebSocket

*本篇文章已授權(quán)微信公眾號(hào) guolin_blog (郭霖)獨(dú)家發(fā)布

一 . 概述

1. RxWebSocket是一個(gè)基于okhttp和RxJava封裝的WebSocket客戶端,此庫(kù)的核心特點(diǎn)是 除了手動(dòng)關(guān)閉WebSocket(就是RxJava取消訂閱),WebSocket在異常關(guān)閉的時(shí)候(onFailure,發(fā)生異常,如WebSocketException等等),會(huì)自動(dòng)重連,永不斷連.其次,對(duì)WebSocket做的緩存處理,同一個(gè)URL,共享一個(gè)WebSocket.

2. 由于是基于RxJava封裝,所以帶來(lái)了無(wú)限可能,可以和RxBinding,Rxlifecycle一起使用,方便對(duì)WebSocket的管理.

項(xiàng)目地址: 歡迎star


效果圖

demo效果

重連

重連

項(xiàng)目已經(jīng)上傳Jcenter,依賴(lài)方法:

//本項(xiàng)目
compile 'com.dhh:websocket:1.3.0'

//由于項(xiàng)目是基于okhttp,RxJava,RxAndroid編寫(xiě),所以還需加入如下依賴(lài).
//okhttp,RxJava,RxAndroid
compile 'com.squareup.okhttp3:okhttp:3.9.0'
compile 'io.reactivex:rxjava:1.3.1'
compile 'io.reactivex:rxandroid:1.2.1'

二 . 使用方法

0. 初始化,可以也忽略直接使用.

如果你想使用自己的okhttpClient:

        OkHttpClient yourClient = new OkHttpClient();
        RxWebSocketUtil.getInstance().setClient(yourClient);

是否打印日志:

RxWebSocketUtil.getInstance().setShowLog(BuildConfig.DEBUG);

1.獲取一個(gè)WebSocket,接收消息,多種方式:

RxWebSocketUtil.getInstance().getWebSocketInfo(url)
                        .subscribe(new Action1<WebSocketInfo>() {
                            @Override
                            public void call(WebSocketInfo webSocketInfo) {
                                mWebSocket = webSocketInfo.getWebSocket();
                                Log.d("MainActivity", webSocketInfo.getString());
                                Log.d("MainActivity", "ByteString:" + webSocketInfo.getByteString());
                            }
                        });

mWebSocket.send("hello word");

        //get StringMsg
        RxWebSocketUtil.getInstance().getWebSocketString(url)
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                    }
                });
        // get ByteString
        RxWebSocketUtil.getInstance().getWebSocketByteString(url)
                .subscribe(new Action1<ByteString>() {
                    @Override
                    public void call(ByteString byteString) {

                    }
                });
        //get WebSocket
        RxWebSocketUtil.getInstance().getWebSocket(url)
                .subscribe(new Action1<WebSocket>() {
                    @Override
                    public void call(WebSocket webSocket) {

                    }
                });
       // 帶timeout的WebSocket,當(dāng)在指定時(shí)間內(nèi)沒(méi)有收到消息,就重連WebSocket.為了適配小米平板.
       //小米平板測(cè)試的時(shí)候,出現(xiàn)網(wǎng)絡(luò)斷連,不發(fā)送錯(cuò)誤,導(dǎo)致不能重連
        RxWebSocketUtil.getInstance().getWebSocketInfo(url,10, TimeUnit.SECONDS)
                .subscribe(new Action1<WebSocketInfo>() {
                    @Override
                    public void call(WebSocketInfo webSocketInfo) {
                        
                    }
                });       

2. 發(fā)送消息:

  //用WebSocket的引用直接發(fā)
  mWebSocket.send("hello word");

  //url 對(duì)應(yīng)的WebSocket已經(jīng)打開(kāi)可以這樣send,否則報(bào)錯(cuò)
  RxWebSocketUtil.getInstance().send(url, "hello");
  RxWebSocketUtil.getInstance().send(url, ByteString.EMPTY);

  //異步發(fā)送,若WebSocket已經(jīng)打開(kāi),直接發(fā)送,若沒(méi)有打開(kāi),打開(kāi)一個(gè)WebSocket發(fā)送完數(shù)據(jù),直接關(guān)閉.
  RxWebSocketUtil.getInstance().asyncSend(url, "hello");
  RxWebSocketUtil.getInstance().asyncSend(url, ByteString.EMPTY);

3.關(guān)閉WebSocket

項(xiàng)目是依托RxJava實(shí)現(xiàn)的,所以關(guān)閉WebSocket的方法也就是在適當(dāng)?shù)臅r(shí)候注銷(xiāo) Observable,項(xiàng)目里的demo里,寫(xiě)了一個(gè)簡(jiǎn)單的lifecycle,將Observable生命綁定到Activity的onDestroy,自動(dòng)注銷(xiāo).代碼細(xì)節(jié)請(qǐng)看demo,因?yàn)閮?nèi)部實(shí)現(xiàn)了同一個(gè)URL的WebSocket共享機(jī)制,所以當(dāng)外部所有持有這個(gè)URL的Observable都注銷(xiāo)后,這個(gè)WebSocket連接就會(huì)自動(dòng)關(guān)閉.請(qǐng)看原理解析部分.下面兩種常用注銷(xiāo)方法:

       //注意取消訂閱,有多種方式,比如 rxlifecycle
                        mSubscription = RxWebSocketUtil.getInstance().getWebSocketInfo(url)
                                .subscribe(new Action1<WebSocketInfo>() {
                                    @Override
                                    public void call(WebSocketInfo webSocketInfo) {
                                        mWebSocket = webSocketInfo.getWebSocket();
                                        if (webSocketInfo.isOnOpen()) {
                                            Log.d("MainActivity", " on WebSocket open");
                                        } else {

                                            String string = webSocketInfo.getString();
                                            if (string != null) {
                                                Log.d("MainActivity", string);
                                                textview.setText(Html.fromHtml(string));

                                            }

                                            ByteString byteString = webSocketInfo.getByteString();
                                            if (byteString != null) {
                                                Log.d("MainActivity", "webSocketInfo.getByteString():" + byteString);

                                            }
                                        }
                                    }
                                });
    //注銷(xiāo)
    if (mSubscription != null) {
            mSubscription.unsubscribe();
        }

//lifecycle注銷(xiāo),詳情看demo
        RxWebSocketUtil.getInstance().getWebSocketString(url)
                .compose(this.<String>bindOnActivityEvent(ActivityEvent.onDestory))
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        
                    }
                });

三. 原理解析

1. 首先需要將okhttp的WebSocket包裝成Observable,由于需要將WebSocket,Stringmsg,ByteString等信息一同發(fā)送給觀察者所以先構(gòu)建一個(gè)WebSocketInfo類(lèi),將信息封裝:

public class WebSocketInfo {
    private WebSocket mWebSocket;
    private String mString;
    private ByteString mByteString;
    private boolean onOpen;
    //其他省略
}

onOpen字段主要用來(lái)判斷當(dāng)前的這個(gè)WebSocketInfo是否是當(dāng)WebSocket打開(kāi)時(shí)發(fā)送的消息(onOpen),這時(shí),Stringmsg和ByteString都是null.

2. 將WebSocketInfo包裝成Observable發(fā)出:

    private final class WebSocketOnSubscribe implements Observable.OnSubscribe<WebSocketInfo> {
        private String url;

        private WebSocket webSocket;

        private WebSocketInfo startInfo, stringInfo, byteStringInfo;

        public WebSocketOnSubscribe(String url) {
            this.url = url;
            startInfo = new WebSocketInfo(true);
            stringInfo = new WebSocketInfo();
            byteStringInfo = new WebSocketInfo();
        }

        @Override
        public void call(final Subscriber<? super WebSocketInfo> subscriber) {
            if (webSocket != null) {
                //降低重連頻率
                if (!"main".equals(Thread.currentThread().getName())) {
                    SystemClock.sleep(2000);
                }
            }
            initWebSocket(subscriber);
        }

        private void initWebSocket(final Subscriber<? super WebSocketInfo> subscriber) {
            webSocket = client.newWebSocket(getRequest(url), new WebSocketListener() {
                @Override
                public void onOpen(final WebSocket webSocket, Response response) {
                    if (showLog) {
                        Log.d("RxWebSocketUtil", url + " --> onOpen");
                    }
                    webSocketMap.put(url, webSocket);
                    AndroidSchedulers.mainThread().createWorker().schedule(new Action0() {
                        @Override
                        public void call() {
                            if (!subscriber.isUnsubscribed()) {
                                subscriber.onStart();
                                startInfo.setWebSocket(webSocket);
                                subscriber.onNext(startInfo);
                            }
                        }
                    });
                }

                @Override
                public void onMessage(WebSocket webSocket, String text) {
                    if (!subscriber.isUnsubscribed()) {
                        stringInfo.setWebSocket(webSocket);
                        stringInfo.setString(text);
                        subscriber.onNext(stringInfo);
                    }
                }

                @Override
                public void onMessage(WebSocket webSocket, ByteString bytes) {
                    if (!subscriber.isUnsubscribed()) {
                        byteStringInfo.setWebSocket(webSocket);
                        byteStringInfo.setByteString(bytes);
                        subscriber.onNext(byteStringInfo);
                    }
                }

                @Override
                public void onFailure(WebSocket webSocket, Throwable t, Response response) {
                    if (showLog) {
                        Log.e("RxWebSocketUtil", t.toString() + webSocket.request().url().uri().getPath());
                    }
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onError(t);
                    }
                }

                @Override
                public void onClosing(WebSocket webSocket, int code, String reason) {
                    webSocket.close(1000, null);
                }

                @Override
                public void onClosed(WebSocket webSocket, int code, String reason) {
                    if (showLog) {
                        Log.d("RxWebSocketUtil", url + " --> onClosed:code= " + code);
                    }
                }
            });
            subscriber.add(new MainThreadSubscription() {
                @Override
                protected void onUnsubscribe() {
                    webSocket.close(3000, "手動(dòng)關(guān)閉");
                }
            });
        }

    }

實(shí)現(xiàn)一個(gè)WebSocketOnSubscribe 將WebSocket的回調(diào)轉(zhuǎn)化成subscriber調(diào)用.發(fā)送給Observable下游.在onOpen時(shí)調(diào)用 subscriber.onStart(),并且發(fā)送一個(gè)onOpen的WebSocketInfo.在subscriber注銷(xiāo)的時(shí)候關(guān)閉WebSocket.在call方法最上面有個(gè)SystemClock.sleep(2000),這個(gè)主要是為了降低在斷連的時(shí)候的重連頻率,將在下面講到.

包裝成Observable:

Observable.create(new WebSocketOnSubscribe(url))
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());

3. 實(shí)現(xiàn)自動(dòng)重連:

Observable.create(new WebSocketOnSubscribe(url))
                    //自動(dòng)重連
                    .timeout(timeout, timeUnit)
                    .retry()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());

RxJava retry操作符,很完美的實(shí)現(xiàn)了這個(gè)功能,當(dāng)上游發(fā)出Throwable的時(shí)候,retry將錯(cuò)誤吃掉,并重新調(diào)用 onSubscribe的call方法,也就是WebSocketOnSubscribe的call,就會(huì)重新初始化一個(gè)WebSocket連接,達(dá)到重連的目的,如果一直沒(méi)有網(wǎng)絡(luò),這個(gè)retry的調(diào)用頻率非常高,所以在call方法里面,當(dāng)是重連的時(shí)候,就SystemClock.sleep(2000),休眠2秒,這樣重連的頻率就是2秒重連一次. 當(dāng)然在retry上面還有一個(gè)timeout操作符.當(dāng)subscriber.onNext()在指定時(shí)間間隔里沒(méi)有調(diào)用,就發(fā)出一個(gè)timeoutException,讓retry重連WebSocket.這個(gè)主要是為了適配部分國(guó)產(chǎn)機(jī)型,當(dāng)WebSocket發(fā)生連接異常時(shí),不會(huì)及時(shí)發(fā)出錯(cuò)誤,如小米平板.在每次重連都會(huì)把原來(lái)的WebSocket關(guān)閉.

4. 實(shí)現(xiàn)同一個(gè)URL的WebSocket共享

Observable.create(new WebSocketOnSubscribe(url))
                    //自動(dòng)重連
                    .timeout(timeout, timeUnit)
                    .retry()
                    //共享
                    .doOnUnsubscribe(new Action0() {
                        @Override
                        public void call() {
                            observableMap.remove(url);
                            webSocketMap.remove(url);
                            if (showLog) {
                                Log.d("RxWebSocketUtil", "注銷(xiāo)");
                            }
                        }
                    })
                    .doOnNext(new Action1<WebSocketInfo>() {
                        @Override
                        public void call(WebSocketInfo webSocketInfo) {
                            if (webSocketInfo.isOnOpen()) {
                                webSocketMap.put(url, webSocketInfo.getWebSocket());
                            }
                        }
                    })
                    .share()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());

實(shí)現(xiàn)共享功能,主要是為了防止一個(gè)URL的WebSocket,建立多個(gè)連接,這個(gè)主要是由RxJava的share操作符實(shí)現(xiàn),share操作符,使得一個(gè)Observable可以有多個(gè)subscriber,當(dāng)有多個(gè)subscriber時(shí),當(dāng)所有的subscriber都取消訂閱,這個(gè)Observable才會(huì)取消訂閱. getWebSocketInfo()方法完整代碼:

    public Observable<WebSocketInfo> getWebSocketInfo(final String url, final long timeout, final TimeUnit timeUnit) {
        Observable<WebSocketInfo> observable = observableMap.get(url);
        if (observable == null) {
            observable = Observable.create(new WebSocketOnSubscribe(url))
                    //自動(dòng)重連
                    .timeout(timeout, timeUnit)
                    .retry()
                    //共享
                    .doOnUnsubscribe(new Action0() {
                        @Override
                        public void call() {
                            observableMap.remove(url);
                            webSocketMap.remove(url);
                            if (showLog) {
                                Log.d("RxWebSocketUtil", "注銷(xiāo)");
                            }
                        }
                    })
                    .doOnNext(new Action1<WebSocketInfo>() {
                        @Override
                        public void call(WebSocketInfo webSocketInfo) {
                            if (webSocketInfo.isOnOpen()) {
                                webSocketMap.put(url, webSocketInfo.getWebSocket());
                            }
                        }
                    })
                    .share()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
            observableMap.put(url, observable);
        } else {
            observable = Observable.merge(Observable.just(new WebSocketInfo(webSocketMap.get(url), true)), observable);
        }
        return observable;
    }

doOnUnsubscribe作用:在Observable注銷(xiāo),即 WebSocket關(guān)閉時(shí),移除map中的緩存的Observable和WebSocket.

doOnNext作用: 判斷接收到的WebSocketInfo是否是WebSocket在onOpen的時(shí)候發(fā)的,然后將其緩存起來(lái).作用就是:如果有一個(gè)相同的URL訂閱Observable,就從緩存中取,這個(gè)時(shí)候我們應(yīng)該把一個(gè)WebSocket的onOpen事件也發(fā)給這個(gè)訂閱者:

//使用merge操作符,將onOpen事件發(fā)給訂閱者
observable = Observable.merge(Observable.just(new WebSocketInfo(webSocketMap.get(url), true)), observable);

這樣的話,同一個(gè)URL的WebSocket,不管在什么地方什么時(shí)間訂閱,都能收到一個(gè)onOpen事件,外部表現(xiàn)的就像一個(gè)新的WebSocket.

getWebSocketInfo方法的幾種變體:

    /**
     * default timeout: 30 days
     * <p>
     * 若忽略小米平板,請(qǐng)調(diào)用這個(gè)方法
     * </p>
     */
    public Observable<WebSocketInfo> getWebSocketInfo(String url) {
        return getWebSocketInfo(url, 30, TimeUnit.DAYS);
    }

    public Observable<String> getWebSocketString(String url) {
        return getWebSocketInfo(url)
                .map(new Func1<WebSocketInfo, String>() {
                    @Override
                    public String call(WebSocketInfo webSocketInfo) {
                        return webSocketInfo.getString();
                    }
                })
                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s != null;
                    }
                });
    }

    public Observable<ByteString> getWebSocketByteString(String url) {
        return getWebSocketInfo(url)
                .map(new Func1<WebSocketInfo, ByteString>() {
                    @Override
                    public ByteString call(WebSocketInfo webSocketInfo) {
                        return webSocketInfo.getByteString();
                    }
                })
                .filter(new Func1<ByteString, Boolean>() {
                    @Override
                    public Boolean call(ByteString byteString) {
                        return byteString != null;
                    }
                });
    }

    public Observable<WebSocket> getWebSocket(String url) {
        return getWebSocketInfo(url)
                .map(new Func1<WebSocketInfo, WebSocket>() {
                    @Override
                    public WebSocket call(WebSocketInfo webSocketInfo) {
                        return webSocketInfo.getWebSocket();
                    }
                });
    }

5 . send信息到服務(wù)端

上面已經(jīng)講到WebSocketInfo包含了WebSocket,所以在訂閱后,就可以拿到這個(gè)WebSocket引用就可以WebSocket.send發(fā)送消息到服務(wù)端.當(dāng)然我們的RxWebSocketUtil已經(jīng)將開(kāi)啟的WebSocket已經(jīng)緩存.所以我們也可以這樣發(fā)消息:

    /**
     * 如果url的WebSocket已經(jīng)打開(kāi),可以直接調(diào)用這個(gè)發(fā)送消息.
     *
     * @param url
     * @param msg
     */
    public void send(String url, String msg) {
        WebSocket webSocket = webSocketMap.get(url);
        if (webSocket != null) {
            webSocket.send(msg);
        } else {
            throw new IllegalStateException("The WebSokcet not open");
        }
    }

    /**
     * 如果url的WebSocket已經(jīng)打開(kāi),可以直接調(diào)用這個(gè)發(fā)送消息.
     *
     * @param url
     * @param byteString
     */
    public void send(String url, ByteString byteString) {
        WebSocket webSocket = webSocketMap.get(url);
        if (webSocket != null) {
            webSocket.send(byteString);
        } else {
            throw new IllegalStateException("The WebSokcet not open");
        }
    }

當(dāng)指定的URL的WebSocket沒(méi)有打開(kāi)會(huì)直接報(bào)錯(cuò).

異步發(fā)送消息到服務(wù)端

    /**
     * 不用關(guān)心url 的WebSocket是否打開(kāi),可以直接發(fā)送
     *
     * @param url
     * @param msg
     */
    public void asyncSend(String url, final String msg) {
        getWebSocket(url)
                .first()
                .subscribe(new Action1<WebSocket>() {
                    @Override
                    public void call(WebSocket webSocket) {
                        webSocket.send(msg);
                    }
                });

    }

    /**
     * 不用關(guān)心url 的WebSocket是否打開(kāi),可以直接發(fā)送
     *
     * @param url
     * @param byteString
     */
    public void asyncSend(String url, final ByteString byteString) {
        getWebSocket(url)
                .first()
                .subscribe(new Action1<WebSocket>() {
                    @Override
                    public void call(WebSocket webSocket) {
                        webSocket.send(byteString);
                    }
                });
    }

這兩種發(fā)送方式,你不用關(guān)心URL的WebSocket是否打開(kāi),可以直接發(fā)送.實(shí)現(xiàn)思路也很簡(jiǎn)單,getWebSocket(url)會(huì)獲取到Observable,或者是從緩存中取,或者是重新開(kāi)啟一個(gè)WebSocket,但你都不需要關(guān)心,經(jīng)過(guò)first操作符后,如果是從緩存取的Observable,就注銷(xiāo)的當(dāng)前的Observable,當(dāng)是新開(kāi)的WebSocket,注銷(xiāo)掉當(dāng)前的subscriber后,就沒(méi)有其他subscriber了,這個(gè)新開(kāi)的WebSocket就會(huì)關(guān)閉(share操作符作用).

最后,如有什么好的建議,可以聯(lián)系我.

項(xiàng)目地址: https://github.com/dhhAndroid/RxWebSocket

如果對(duì)你有幫助,謝謝 star !

尊重原創(chuàng),禁止轉(zhuǎn)載!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,680評(píng)論 19 139
  • 文章轉(zhuǎn)自:http://gank.io/post/560e15be2dca930e00da1083作者:扔物線在正...
    xpengb閱讀 7,148評(píng)論 9 73
  • 我從去年開(kāi)始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy閱讀 5,777評(píng)論 7 62
  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符。對(duì)于擴(kuò)展包,由于使用率較低,如有需求,請(qǐng)讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 46,219評(píng)論 8 93
  • http://blog.csdn.net/yyh352091626/article/details/5330472...
    奈何心善閱讀 3,652評(píng)論 0 0

友情鏈接更多精彩內(nèi)容