關(guān)于 RxJava 最友好的文章—— RxJava 2.0 全新來(lái)襲

前言

之前寫(xiě)RxJava相關(guān)文章的時(shí)候,就有人想讓我談?wù)凴xJava2.0的新特性,說(shuō)實(shí)話,一開(kāi)始我是拒絕的。因?yàn)樵谖铱磥?lái),RxJava2.0雖然是版本的重大升級(jí),但總歸還是RxJava,升級(jí)一個(gè)版本還能上天是咋的?了解一下它的更新文檔不就好了么?真的有必要單出一篇文章來(lái)談這個(gè)么?

但是詳細(xì)的了解了RxJava2.0以及部分源碼之后,我覺(jué)得還是有必要對(duì)RxJava2.0做一個(gè)說(shuō)明,幫助大家對(duì)于RxJava有更好的認(rèn)識(shí)。


鋪墊

假如你還不是很熟悉RxJava,或者對(duì)于背壓這個(gè)概念(2.0更新中會(huì)涉及到背壓的概念)很模糊,希望你也能讀一讀下面兩篇鋪墊的文章:

  • 關(guān)于RxJava最友好的文章
  • 關(guān)于RxJava最友好的文章----背壓

關(guān)于背壓的那篇文章本來(lái)是本文的一部分,因?yàn)槠^(guò)大,被剝離出去了,所以建議大家有時(shí)間也一并閱讀。


正文

RxJava2.0有很多的更新,一些改動(dòng)甚至沖擊了我之前的文章里的內(nèi)容,這也是我想寫(xiě)這篇文章的原因之一。不過(guò)想要寫(xiě)這篇文章其實(shí)也是有難度的,因?yàn)橄嚓P(guān)的資料去其實(shí)是很少的,還得自己硬著頭皮上....不過(guò)俗話說(shuō)得好,有困難要上,沒(méi)有困難創(chuàng)造困難也要上。

在這里,我會(huì)按照我們之前關(guān)于RxJava的文章的講述順序:觀察者模式,操作符,線程調(diào)度,這三個(gè)方面依次看有哪些更新。


添加依賴(lài)

這個(gè)估計(jì)得放在最前面。

Android端使用RxJava需要依賴(lài)新的包名:

    //RxJava的依賴(lài)包(我使用的最新版本)
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    //RxAndroid的依賴(lài)包
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    

觀察者模式

首先聲明,RxJava以觀察者模式為骨架,在2.0中依然如此。

不過(guò)此次更新中,出現(xiàn)了兩種觀察者模式:

  • Observable(被觀察者)/Observer(觀察者)
  • Flowable(被觀察者)/Subscriber(觀察者)

RxJava2.X中,Observeable用于訂閱Observer,是不支持背壓的,而Flowable用于訂閱Subscriber,是支持背壓(Backpressure)的。

關(guān)于背壓這個(gè)概念以及它在1.0版本中的缺憾在上一篇文章中我已經(jīng)介紹到了,如果你不是很清楚,我在這里在做一個(gè)介紹:背壓是指在異步場(chǎng)景中,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略,在1.0中,關(guān)于背壓最大的遺憾,就是集中在Observable這個(gè)類(lèi)中,導(dǎo)致有的Observable支持背壓,有的不支持。為了解決這種缺憾,新版本把支持背壓和不支持背壓的Observable區(qū)分開(kāi)來(lái)。

Observable/Observer

Observable正常用法:

  Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
            }
        });
        
Observer mObserver=new Observer<Integer>() {
            //這是新加入的方法,在訂閱后發(fā)送數(shù)據(jù)之前,
            //回首先調(diào)用這個(gè)方法,而Disposable可用于取消訂閱
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };
        
mObservable.subscribe(mObserver);

這種觀察者模型是不支持背壓的。

啥叫不支持背壓呢?

當(dāng)被觀察者快速發(fā)送大量數(shù)據(jù)時(shí),下游不會(huì)做其他處理,即使數(shù)據(jù)大量堆積,調(diào)用鏈也不會(huì)報(bào)MissingBackpressureException,消耗內(nèi)存過(guò)大只會(huì)OOM

我在測(cè)試的時(shí)候,快速發(fā)送了100000個(gè)整形數(shù)據(jù),下游延遲接收,結(jié)果被觀察者的數(shù)據(jù)全部發(fā)送出去了,內(nèi)存確實(shí)明顯增加了,遺憾的是沒(méi)有OOM。

所以,當(dāng)我們使用Observable/Observer的時(shí)候,我們需要考慮的是,數(shù)據(jù)量是不是很大(官方給出以1000個(gè)事件為分界線,僅供各位參考)

Flowable/Subscriber

        Flowable.range(0,10)
        .subscribe(new Subscriber<Integer>() {
            Subscription sub;
            //當(dāng)訂閱后,會(huì)首先調(diào)用這個(gè)方法,其實(shí)就相當(dāng)于onStart(),
            //傳入的Subscription s參數(shù)可以用于請(qǐng)求數(shù)據(jù)或者取消訂閱
            @Override
            public void onSubscribe(Subscription s) {
                Log.w("TAG","onsubscribe start");
                sub=s;
                sub.request(1);
                Log.w("TAG","onsubscribe end");
            }

            @Override
            public void onNext(Integer o) {
                Log.w("TAG","onNext--->"+o);
                sub.request(1);
            }
            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }
            @Override
            public void onComplete() {
                Log.w("TAG","onComplete");
            }
        });

輸出如下:

onsubscribe start
onNext--->0
onNext--->1
onNext--->2
...
onNext--->10
onComplete
onsubscribe end

Flowable是支持背壓的,也就是說(shuō),一般而言,上游的被觀察者會(huì)響應(yīng)下游觀察者的數(shù)據(jù)請(qǐng)求,下游調(diào)用request(n)來(lái)告訴上游發(fā)送多少個(gè)數(shù)據(jù)。這樣避免了大量數(shù)據(jù)堆積在調(diào)用鏈上,使內(nèi)存一直處于較低水平。

當(dāng)然,F(xiàn)lowable也可以通過(guò)creat()來(lái)創(chuàng)建:

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        }
        //需要指定背壓策略
        , BackpressureStrategy.BUFFER);

Flowable雖然可以通過(guò)create()來(lái)創(chuàng)建,但是你必須指定背壓的策略,以保證你創(chuàng)建的Flowable是支持背壓的(這個(gè)在1.0的時(shí)候就很難保證,可以說(shuō)RxJava2.0收緊了create()的權(quán)限)。

根據(jù)上面的代碼的結(jié)果輸出中可以看到,當(dāng)我們調(diào)用subscription.request(n)方法的時(shí)候,不等onSubscribe()中后面的代碼執(zhí)行,就會(huì)立刻執(zhí)行到onNext方法,因此,如果你在onNext方法中使用到需要初始化的類(lèi)時(shí),應(yīng)當(dāng)盡量在subscription.request(n)這個(gè)方法調(diào)用之前做好初始化的工作;

當(dāng)然,這也不是絕對(duì)的,我在測(cè)試的時(shí)候發(fā)現(xiàn),通過(guò)create()自定義Flowable的時(shí)候,即使調(diào)用了subscription.request(n)方法,也會(huì)等onSubscribe()方法中后面的代碼都執(zhí)行完之后,才開(kāi)始調(diào)用onNext。

TIPS: 盡可能確保在request()之前已經(jīng)完成了所有的初始化工作,否則就有空指針的風(fēng)險(xiǎn)。

其他觀察者模式

當(dāng)然,除了上面這兩種觀察者,還有一類(lèi)觀察者

  • Single/SingleObserver
  • Completable/CompletableObserver
  • Maybe/MaybeObserver

其實(shí)這三者都差不多,Maybe/MaybeObserver可以說(shuō)是前兩者的復(fù)合體,因此以Maybe/MaybeObserver為例簡(jiǎn)單介紹一下這種觀察者模式的用法

//判斷是否登陸
Maybe.just(isLogin())
    //可能涉及到IO操作,放在子線程
    .subscribeOn(Schedulers.newThread())
    //取回結(jié)果傳到主線程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new MaybeObserver<Boolean>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Boolean value) {
                if(value){
                    ...
                }else{
                    ...
                }
            }

            @Override
            public void onError(Throwable e) {
                
            }

            @Override
            public void onComplete() {

            }
        });

上面就是Maybe/MaybeObserver的普通用法,你可以看到,實(shí)際上,這種觀察者模式并不用于發(fā)送大量數(shù)據(jù),而是發(fā)送單個(gè)數(shù)據(jù),也就是說(shuō),當(dāng)你只想要某個(gè)事件的結(jié)果(true or false)的時(shí)候,你可以用這種觀察者模式


這是上面那些被觀察者的上層接口:

//Observable接口
interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}
//Single接口
interface SingleSource<T> {
    void subscribe(SingleObserver<? super T> observer);
}
//Completable接口
interface CompletableSource {
    void subscribe(CompletableObserver observer);
}
//Maybe接口
interface MaybeSource<T> {
    void subscribe(MaybeObserver<? super T> observer);
}
//Flowable接口
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

其實(shí)我們可以看到,每一種觀察者都繼承自各自的接口,這也就把他們能完全的區(qū)分開(kāi),各自獨(dú)立(特別是Observable和Flowable),保證了他們各自的拓展或者配套的操作符不會(huì)相互影響。

例如flatMap操作符實(shí)現(xiàn):

//Flowable中flatMap的定義
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

//Observable中flatMap的定義
Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);

假如你想為Flowable寫(xiě)一個(gè)自定義的操作符,那么只要保證Function< Publisher >中的類(lèi)型實(shí)現(xiàn)了Publisher接口即可。這么說(shuō)可能很抽象,大家不理解其實(shí)也沒(méi)關(guān)系,因?yàn)椴⒉煌扑]大家自定義操作符,RxJava中的操縱符的組合已經(jīng)可以滿(mǎn)足大家的需求了。

當(dāng)然,你也會(huì)注意到上面那些接口中的subscribe()方法的返回類(lèi)型為void了,在1.X中,這個(gè)方法一般會(huì)返回一個(gè)Subscription對(duì)象,用于取消訂閱?,F(xiàn)在,這個(gè)功能的對(duì)象已經(jīng)被放到觀察者Observer或者subscriber的內(nèi)部實(shí)現(xiàn)方法中了,

Flowable/Subscriber

public interface Subscriber<T> {  
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

上面的實(shí)例中,onSubscribe(Subscription s)傳入的參數(shù)s就肩負(fù)著取消訂閱的功能,當(dāng)然,他也可以用于請(qǐng)求上游的數(shù)據(jù)。

在Observable/observer中,傳入的參數(shù)是另一個(gè)對(duì)象

Observable/Observer

public interface Observer<T> {
   void onSubscribe(Disposable d);
    void onNext(T value);
    void onError(Throwable e);
    void onComplete();
}

public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();
    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}

在Observer接口中,onSubscribe(Disposable d)方法傳入的Disposable也是用于取消訂閱,基本功能是差不多的,只不過(guò)命名不一致,大家知道就好。

其實(shí)這種設(shè)計(jì)可以說(shuō)還是符合邏輯的,因?yàn)槿∠嗛嗊@個(gè)動(dòng)作就只有觀察者(Observer等)才能做的,現(xiàn)在把它并入到觀察者內(nèi)部,也算順理成章吧。

最后再提一點(diǎn)更新,就是被觀察者不再接收null作為數(shù)據(jù)源了。


操作符相關(guān)

這一塊其實(shí)可以說(shuō)沒(méi)什么改動(dòng),大部分之前你用過(guò)的操作符都沒(méi)變,即使有所變動(dòng),也只是包名或類(lèi)名的改動(dòng)。大家可能經(jīng)常用到的就是Action和Function。

Action相關(guān)

之前我在文章里介紹過(guò)關(guān)于Action這類(lèi)接口,在1.0中,這類(lèi)接口是從Action0,Action1...往后排(數(shù)字代表可接受的參數(shù)),現(xiàn)在做出了改動(dòng)

Rx1.0-----------Rx2.0

Action1--------Action
Action1--------Consumer
Action2--------BiConsumer
后面的Action都去掉了,只保留了ActionN

Function相關(guān)

同上,也是命名方式的改變

上面那兩個(gè)類(lèi),和RxJava1.0相比,他們都增加了throws Exception,也就是說(shuō),在這些方法做某些操作就不需要try-catch。

例如:

Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);

Files.readLines(name)這類(lèi)io方法本來(lái)是需要try-catch的,現(xiàn)在直接拋出異常,就可以放心的使用lambda ,保證代碼的簡(jiǎn)潔優(yōu)美。

doOnCancel/doOnDispose/unsubscribeOn

以doOnCancel為例,大概就是當(dāng)取消訂閱時(shí),會(huì)調(diào)用這個(gè)方法,例如:

Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.take(2)
.subscribe(System.out::println);

take新操符會(huì)取消后面那些還未被發(fā)送的事件,因而會(huì)觸發(fā)doOnCancel

其他的一些操作符基本沒(méi)變,或者只是改變了名字,在這里就不一一介紹了,需要提一下的是,很多操作符都有兩套,一套用于Observable,一套用于Flowable。


線程調(diào)度

可以說(shuō)這一塊兒基本也沒(méi)有改動(dòng),如果一定要說(shuō)的話。

  • 那就是去掉了Schedulers.immediate()這個(gè)線程環(huán)境
  • 移除的還有Schedulers.test()(我好像從來(lái)沒(méi)用過(guò)這個(gè)方法)
  • io.reactivex.Scheduler這個(gè)抽象類(lèi)支持直接調(diào)度自定義線程任務(wù)(這個(gè)我也沒(méi)怎么用)

補(bǔ)充

如果你想把自己的RxJava1.0的遷移到2.0的版本,可以使用這個(gè)庫(kù)RxJava2Interop ,在github上可以找到,它可以在Rxjava1.0和2.0之間相互轉(zhuǎn)換,也就是說(shuō),不僅可以把1.0的代碼遷移到2.0,你還可以把2.0的代碼遷移到1.0,哈哈。

補(bǔ)充2

在RxJava1.0中,有的人會(huì)使用CompositeSubscription來(lái)收集Subscription,來(lái)統(tǒng)一取消訂閱,現(xiàn)在在RxJava2.0中,由于subscribe()方法現(xiàn)在返回void,那怎么辦呢?

其實(shí)在RxJava2.0中,Flowable提供了subscribeWith這個(gè)方法可以返回當(dāng)前訂閱的觀察者,并且通過(guò)ResourceSubscriber DisposableSubscriber等觀察者來(lái)提供 Disposable的接口。

所以,如果想要達(dá)成RxJava1.0的效果,現(xiàn)在應(yīng)該是這樣做:

CompositeDisposable composite = new CompositeDisposable();

composite.add(Flowable.range(1, 8).subscribeWith(subscriber));

這個(gè)subscriber 應(yīng)該是 ResourceSubscriber 或者 DisposableSubscriber 的實(shí)例。


結(jié)尾

其實(shí)從整篇文章的分析來(lái)看,改動(dòng)最大的還是觀察者模式的實(shí)現(xiàn),被拆分和細(xì)化了,主要分成了Observable和Flowable兩大類(lèi),當(dāng)然還有與之相關(guān)聯(lián)的其他變動(dòng),總體來(lái)看這一版本可以說(shuō)是對(duì)于觀察者和被觀察者的重構(gòu)。

RxJava2.0的范例代碼我沒(méi)精力去寫(xiě)了,也正巧有位外國(guó)朋友已經(jīng)寫(xiě)了RxJava2.0的demo,下面是他的項(xiàng)目地址:

在github上搜索:RxJava2-Android-Samples

當(dāng)然,學(xué)習(xí)2.0 的過(guò)程中有什么問(wèn)題也可以在這里留言討論。


附錄

下面我截圖展示一下2.0相對(duì)于1.0的一些改動(dòng)的細(xì)節(jié),僅做參考。





其實(shí)這些都是官方給出的列表,截圖在這里只是方便大家觀摩。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容