給初學者的RxJava2.0教程(十)

Outline

[TOC]

前言

在很久以前的一篇文章中,提到過如何利用Retrofit中的GsonConverter來處理API請求錯誤的方法,地址在這兒,今天給大家介紹另外一種優(yōu)雅的方法,利用RxJava內部的RxJavaPlugins來做這么一個騷操作。

正題

說到RxJavaPlugins可能有很多朋友還很陌生,畢竟我們日常開放也不會怎么接觸這個東西,但是從它的名字上來看就應該覺得它不一般,畢竟人家名字里帶了一個Plugin,廢話少說,我們先來看一下這個類到底是什么東西。

先找到這個類的位置,在io.reactivex.plugins這個包中,這個包就這一個類,再來看看類的定義:

package io.reactivex.plugins;
...
/**
 * Utility class to inject handlers to certain standard RxJava operations.
 */
public final class RxJavaPlugins {
    ....
}

首先映入眼簾的就是這句類注釋了,來翻譯一下:用于將一些騷操作注入到某些標準RxJava操作的工具類。

聽上去好像很牛逼??!我們來看一下它里面到底寫了些什么騷操作:

//代碼太長了,隨便粘貼幾句
public final class RxJavaPlugins {
    static volatile Consumer<? super Throwable> errorHandler;
    static volatile Function<? super Runnable, ? extends Runnable> onScheduleHandler;
    static volatile Function<? super Callable<Scheduler>, ? extends Scheduler> onInitComputationHandler;
    ...
    static volatile Function<? super Scheduler, ? extends Scheduler> onComputationHandler;
    static volatile Function<? super Scheduler, ? extends Scheduler> onSingleHandler;
    static volatile Function<? super Scheduler, ? extends Scheduler> onIoHandler;
    ...
    static volatile BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> onFlowableSubscribe;
    static volatile BiFunction<? super Maybe, ? super MaybeObserver, ? extends MaybeObserver> onMaybeSubscribe;
    static volatile BiFunction<? super Observable, ? super Observer, ? extends Observer> onObservableSubscribe;
    ...
    public static Consumer<? super Throwable> getErrorHandler() {}
    public static void setErrorHandler(@Nullable Consumer<? super Throwable> handler) {}
    ...
}

看到這里,我相信大家應該都是和我一樣的想法:這他嗎是啥啊。。。為什么每個字母我都認識,寫到一起我就不知道什么意思了。。。

懵逼

先不慌。。我們先粗略看一下這個類的結構,emmmm…先是定義了一大堆static的變量,但是沒有public出來,所以應該會有對應的getter和setter方法,好像就是這樣,沒毛病,好了,到此為止,這個類可以關了,也看不出啥東西來。。

既然這條路碰壁了,那我們換一條路來試試。

先來看一段正常得不能再正常的RxJava代碼:

        Maybe.just(1)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "Real onSuccess");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d(TAG, "Real onError");
                    }
                });

運行的結果就是:

zlc.season.javademo D/MainActivity: Real onSuccess

看過之前的教程的都知道這個subscribe()方法是個很重要的方法啦,那我們就來看看這個方法到底干了啥!

之前說過,subscribe方法有多個重載的方法,通過源碼得知,這些重載的方法最后都會調用到其中的一個subscribe方法中:

public final void subscribe(MaybeObserver<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");

        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "observer returned by the RxJavaPlugins hook is null");

        try {
            subscribeActual(observer);
        } catch (NullPointerException ex) {...
        } catch (Throwable ex) {...}
    }

通過這個源碼我們一下子就找到了一行關鍵的代碼:

observer = RxJavaPlugins.onSubscribe(this, observer);

先簡單解釋一下,這里的this就是當前的Maybe對象,也就是我們的上游,這里的observer就是我們的下游。

這意味著什么呢,意味著RxJavaPlugins對我們的subscribe方法做了一個騷操作呀!

這樣我們一下子就找到了RxJavaPlugins和調用鏈之間的聯(lián)系,接下來就需要順藤摸瓜,更加深入的了解一下,來看一下RxJavaPlugins.onSubscribe()的源碼吧:

//為了便于理解,把源碼中的范型去掉了
public final class RxJavaPlugins {
    ...
    static volatile BiFunction onMaybeSubscribe;
    ...
    public static void setOnMaybeSubscribe(BiFunction onMaybeSubscribe) {
        RxJavaPlugins.onMaybeSubscribe = onMaybeSubscribe;
    }
    ...
    //source就是我們的上游,observer就是我們的下游
    public static  MaybeObserver onSubscribe(Maybe source, MaybeObserver observer) {
        BiFunction f = onMaybeSubscribe;   
        if (f != null) {     //如果onMaybeSubscribe不為空
            return apply(f, source, observer); //調用apply方法創(chuàng)建一個新的下游
        }
        return observer;    
    }
    ...
    static MaybeObserver apply(BiFunction f, Maybe source, MaybeObserver observer) {
        return f.apply(source, observer);  
    }
}

這個代碼簡直不能再清晰了,大概就是如果我調用了setOnMaybeSubscribe()設置了一個BiFunction類型的變量onMaybeSubscribe,那么當我調用subscribe()方法的時候就會調用這個變量的apply()方法來做一個騷操作返回一個新的下游,否則就原封不動的把原來的下游返回。

這就給了我們無限的想象力啊,我們可以通過這個apply()方法直接把原本的下游返回,這樣就什么也不做,也可以包裝一下原來的下游,在真正的下游的方法執(zhí)行前后插入一些自己的操作,哇哦,好像很厲害的樣子。。。

那既然要包裝,首先肯定得有一個包裝類:

class WrapDownStreamObserver<T> implements MaybeObserver<T> {

        private MaybeObserver<T> actual;

        public WrapDownStreamObserver(MaybeObserver<T> actual) {
            this.actual = actual;
        }

        @Override
        public void onSubscribe(Disposable d) {
            actual.onSubscribe(d);
        }

        @Override
        public void onSuccess(T t) {
            Log.d(TAG, "Hooked onSuccess");
            actual.onSuccess(t);
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "Hooked onError");
            actual.onError(e);
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "Hooked onComplete");
            actual.onComplete();
        }
    }

這就是一個簡單的包裝類了,它和下游都是同樣的類型,并且內部持有真正的下游,我們在真正的下游方法調用前都插入了一條日志。

有了包裝類,那么我們就可以調用RxJavaPlugins的setOnMaybeSubscribe()方法來做騷操作了:

RxJavaPlugins.setOnMaybeSubscribe(new BiFunction<Maybe, MaybeObserver, MaybeObserver>() {
            @Override
            public MaybeObserver apply(Maybe maybe, MaybeObserver maybeObserver) throws Exception {
                return new WrapDownStreamObserver(maybeObserver); //這個maybeObserver就是我們真正的下游
            }
        });

接下來就是拭目以待的運行結果啦:

zlc.season.javademo D/MainActivity: Hooked onSuccess
zlc.season.javademo D/MainActivity: Real onSuccess

哈哈,果不其然,不愧是騷操作?。」辉谡嬲南掠螆?zhí)行前先去執(zhí)行了包裝類里的代碼,似乎已經看見了勝利的曙光??!

不過剛才的是在同一個線程的代碼,我們再來一個帶有線程切換的代碼驗證一下:

        Maybe.just(1)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "Real onSuccess");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d(TAG, "Real onError");
                    }
                });

當我們滿懷信心的時候,生活總是會給你潑一盆冷水:

zlc.season.javademo D/MainActivity: Hooked onSuccess
zlc.season.javademo D/MainActivity: Hooked onSuccess
zlc.season.javademo D/MainActivity: Hooked onSuccess
zlc.season.javademo D/MainActivity: Real onSuccess

發(fā)生了什么?是不是代碼貼錯了???為什么會打印三次Hooked onSuccess。。。我明明只包裝了一個下游呀。。。

這個問題要詳細的解釋清楚估計得花一段時間了,這里就直接給出答案了,因為我們使用RxJavaPluginssetOnMaybeSubscribe()方法實際上是給所有的Maybe類型的subscribe()方法都做了一個騷操作,而在我們的RxJava調用鏈中,除了我們的上游下游,其實還有中游,這些中游位于RxJava的內部,我們每做一次鏈式調用,都會生成一個新的中游,因此我們的騷操作不僅僅只對下游生效,對這些中游也會生效,所以出現(xiàn)上面的打印結果。從代碼也可以看出來,我們分別調用了一次subscribeOn和一次observeOn,因此對應的產生了兩個中游,再加上我們自己的下游,所以一共打印三次Hooked onSuccess也說得通。

但是盡管打印了這么多,我們還是可以從中看到,我們的騷操作依然是有效的,在真正的下游方法執(zhí)行前,依然執(zhí)行了包裝類中的代碼,所以我們的這個方案是完全可行的,只需要避免一下重復處理就可以了。

看到這里,廣大吃瓜群眾估計還是處于一臉懵逼的狀態(tài)。。。這TM跟我處理API錯誤有啥關系?

鏟你一耳屎.jpg

emmmm...目前來說好像確實沒什么太大的關系。。。但是,下面這段代碼看完你也許就明白了。

我們繼續(xù)來看一段Retrofit請求的代碼:

public interface Api {
    @GET
    Maybe<BaseResponse> getSomeThing(@Url String url); //注意這里使用的是Maybe
}

private void requestSomeThing(String url) {
        api.getSomeThing(url)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<BaseResponse>() {
                    @Override
                    public void accept(BaseResponse baseResponse) throws Exception {
                        if(baseResponse.getCode()==100){
                            //Token 過期,跳轉登錄頁面。。。
                            ....
                        }else if(...){
                            ...
                        }
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "Something wrong", throwable);
                        if (throwable instanceof ConnectionException) {
                            Log.d(TAG, "沒有網絡連接");
                        } else if (throwable instanceof SocketTimeoutException) {
                            Log.d(TAG, "連接超時");
                        } else {
                            //...
                        }
                    }
                });
    }

這是一段普通的請求代碼,包含了請求成功了要判斷code是否正確,判斷token是否過期,請求失敗了要針對不同的異常情況來做不同的處理,等一系列操作。

通過前面的鋪墊,我相信大家心里都有點B number了,我們只需要把判斷code是否正確,token是否過期,以及異常的情況放到包裝類里,這樣不就做到統(tǒng)一處理了嗎?

先別急,可能細心一點的朋友就發(fā)現(xiàn)了,我們這里Api 接口定義的時候使用的是Maybe,而我們知道,在RxJava2中除了Maybe,還有Single、Completable、Observable、Flowable,我們定義接口也可以寫成:

public interface Api {
    @GET  //Maybe
    Maybe<BaseResponse> getSomeThing(@Url String url); 
   
    @GET   //Observable
    Observable<BaseResponse> getSomeThing(@Url String url); 
    
    @GET   //Flowable
    Flowable<BaseResponse> getSomeThing(@Url String url); 
    ...
}

那是不是意味著我們要對每一個都要用RxJavaPlugin來做騷操作?。?/p>

答案是不需要,我們只需要對Observable做騷操作就行了!是的,就是Observable,為什么只需要對Observable做騷操作呢?這個答案可以從RetrofitRxJava2CallAdapter中找到答案:

final class RxJava2CallAdapter<R> implements CallAdapter<R, Object> {
    ......
        
    @Override
    public Object adapt(Call<R> call) {
        //這就是我們真正的上游
        Observable<Response<R>> responseObservable = isAsync
                ? new CallEnqueueObservable<>(call)
                : new CallExecuteObservable<>(call);

        Observable<?> observable;
        if (isResult) {
            observable = new ResultObservable<>(responseObservable);
        } else if (isBody) {
            observable = new BodyObservable<>(responseObservable);
        } else {
            observable = responseObservable;
        }

        if (scheduler != null) {
            observable = observable.subscribeOn(scheduler);
        }

        if (isFlowable) {
            return observable.toFlowable(BackpressureStrategy.LATEST);
        }
        if (isSingle) {
            return observable.singleOrError();
        }
        if (isMaybe) {
            return observable.singleElement();
        }
        if (isCompletable) {
            return observable.ignoreElements();
        }
        return observable;
    }
}

從這個代碼中可以看到,我們請求真正的上游其實是一個Observable,我們在Api接口中定義的不管是Maybe,還是Flowable,其實都是在Observable做了一次鏈式調用而已,所以我們只需要對Observable做一個騷操作,就可以了。

所以我們先來創(chuàng)建一個Observer的包裝類:

class ObservableSubscribeHooker<T> implements Observer<T> {
        private Observer<T> actual;

        public ObservableSubscribeHooker(Observer<T> actual) {
            this.actual = actual;
        }
    
        @Override
        public void onSubscribe(Disposable d) {
            actual.onSubscribe(d);
        }

        @Override
        public void onNext(T t) {
            hookOnNext(t);
            actual.onNext(t);
        }

        private void hookOnNext(T t) {
            if (t instanceof BaseResponse) {
                BaseResponse baseResponse = (BaseResponse) t;
                if (baseResponse.getCode() == 100) {
                    //登錄過期,跳轉到登錄頁
                    ...
                    throw new Exceptions.TokenExpired(); //注意這里的trick
                }
            }
        }

        @Override
        public void onError(Throwable e) {

            if (e instanceof ConnectException) {
                Log.e(TAG, "Connect failed: ", e);
                //處理ConnectException
                ...
                actual.onError(new Exceptions.Offline()); //注意這里的trick
                return;
            }

            if (e instanceof SocketTimeoutException) {
                Log.e(TAG, "Time out ", e);
                //處理SocketTimeoutException
                ...
                actual.onError(new Exceptions.TimeOut()); //注意這里的trick
                return;
            }

            //其余的異常處理...

            actual.onError(e);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }
    }

注意這里面的幾個小Trick,通過自定義的異常,避免了重復處理的問題,并且下游仍然可以針對自己的特殊情況進行自己的特殊處理。

接下來就是設置到RxJavaPlugins中了:

public class CustomApplication extends Application {
  
    @Override
    public void onCreate() {
        super.onCreate();
       
        RxJavaPlugins.setOnObservableSubscribe(new BiFunction<Observable, Observer, Observer>() {
            @Override
            public Observer apply(Observable observable, Observer observer) throws Exception {
                return new ObservableSubscribeHooker(observer);
            }
        });

    }
}

好啦,今天的教程就寫到這里吧~

最終的demo已經上傳到GitHub,地址在 鏈接在這里

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容