Rxjava3使用教程:介紹和基本使用

一、簡介

RxJava是響應(yīng)式編程(Reactive Extensions)的java實現(xiàn),它基于觀察者模式的實現(xiàn)了異步編程接口。
Rxjava 3.x 的github官網(wǎng);
RxJava2將被支持到2021年2月28日,錯誤的會同時在2.x和3.x修復(fù),但新功能只會在3.x上添加;
Rxjava 3.0的一些改變:官方Wiki;
Rxjava 3.x 文檔可以在官方j(luò)avadoc中找到;
使用Rxjava3.x之前的準(zhǔn)備工作:

1.1 添加依賴:
    //RxJava的依賴包
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
    //RxAndroid的依賴包  
    implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
1.2 將項目的編譯目標(biāo)設(shè)置更改為 java8:
android {
    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
}

二、Rx概念

2.1 字段含義
  • Reactive 直譯為反應(yīng)性的,有活性的,根據(jù)上下文一般翻譯為反應(yīng)式、響應(yīng)式

  • Iterable 可迭代對象,支持以迭代器的形式遍歷,許多語言中都存在這個概念

  • Observable 可觀察對象,在Rx中定義為更強(qiáng)大的Iterable,在觀察者模式中是被觀察的對象,一旦數(shù)據(jù)產(chǎn)生或發(fā)生變化,會通過某種方式通知觀察者或訂閱者

  • Observer 觀察者對象,監(jiān)聽Observable發(fā)射的數(shù)據(jù)并做出響應(yīng),Subscriber是它的一個特殊實現(xiàn)

  • emit 直譯為發(fā)射,發(fā)布,發(fā)出,含義是Observable在數(shù)據(jù)產(chǎn)生或變化時發(fā)送通知給Observer,調(diào)用Observer對應(yīng)的方法,文章里一律譯為發(fā)射

  • items 直譯為項目,條目,在Rx里是指Observable發(fā)射的數(shù)據(jù)項,文章里一律譯為數(shù)據(jù),數(shù)據(jù)項。

2.2 上/下流

在RxJava中,數(shù)據(jù)以流的方式組織:Rxjava包括一個源數(shù)據(jù)流,源數(shù)據(jù)流后跟著若干個用于消費(fèi)數(shù)據(jù)流的步驟。

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

在代碼中,對于operator2來說,在它前面叫做上流,在它后面的叫做下流。

2.3 流對象

在RxJava的文檔中,emission, emits, item, event, signal, data and message都被認(rèn)為在數(shù)據(jù)流中被傳遞的數(shù)據(jù)對象。

2.4 背壓(Backpressure)

當(dāng)上下游在不同的線程中,通過Observable發(fā)射,處理,響應(yīng)數(shù)據(jù)流時,如果上游發(fā)射數(shù)據(jù)的速度快于下游接收處理數(shù)據(jù)的速度,這樣對于那些沒來得及處理的數(shù)據(jù)就會造成積壓,這些數(shù)據(jù)既不會丟失,也不會被垃圾回收機(jī)制回收,而是存放在一個異步緩存池中,如果緩存池中的數(shù)據(jù)一直得不到處理,越積越多,最后就會造成內(nèi)存溢出,這便是響應(yīng)式編程中的背壓(backpressure)問題。

為此,RxJava帶來了backpressure的概念。背壓是一種流量的控制步驟,在不知道上流還有多少數(shù)據(jù)的情形下控制內(nèi)存的使用,表示它們還能處理多少數(shù)據(jù)。背壓是指在異步場景中,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略

在Rxjava1.0中,有的Observable支持背壓,有的不支持,為了解決這種問題,2.0把支持背壓和不支持背壓的Observable區(qū)分開來:支持背壓的有Flowable類,不支持背壓的有Observable,Single, Maybe and Completable類。

  • 在訂閱的時候如果使用FlowableSubscriber,那么需要通過s.request(Long.MAX_VALUE)去主動請求上游的數(shù)據(jù)項。如果遇到背壓報錯的時候,F(xiàn)lowableSubscriber默認(rèn)已經(jīng)將錯誤try-catch,并通過onError()進(jìn)行回調(diào),程序并不會崩潰;
  • 在訂閱的時候如果使用Consumer,那么不需要主動去請求上游數(shù)據(jù),默認(rèn)已經(jīng)調(diào)用了s.request(Long.MAX_VALUE)。如果遇到背壓報錯、且對Throwable的Consumer沒有new出來,則程序直接崩潰;
  • 背壓策略的上游的默認(rèn)緩存池是128。
背壓策略:
  • error, 緩沖區(qū)大概在128

  • buffer, 緩沖區(qū)在1000左右

  • drop, 把存不下的事件丟棄

  • latest, 只保留最新的

  • missing, 缺省設(shè)置,不做任何操作

public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}
2.5 線程調(diào)度器(Schedulers)

對于Android開發(fā)者而言,RxJava最簡單的是通過調(diào)度器來方便地切換線程。在不同平臺還有不同的調(diào)度器,例如我們Android的主線程:AndroidSchedulers.mainThread()。

屬性 類型
AndroidSchedulers.mainThread() 需要引用rxandroid, 切換到UI線程
Schedulers.computation() 用于計算任務(wù),如事件循環(huán)和回調(diào)處理,默認(rèn)線程數(shù)等于處理器數(shù)量
Schedulers.io() 用于IO密集型任務(wù),如異步阻塞IO操作,這個調(diào)度器的線程池會根據(jù)需求,它默認(rèn)是一個CacheThreadScheduler
Schedulers.newThread() 為每一個任務(wù)創(chuàng)建一個新線程
Schedulers.trampoline() 在當(dāng)前線程中立刻執(zhí)行,如當(dāng)前線程中有任務(wù)在執(zhí)行則將其暫停, 等插入進(jìn)來的任務(wù)執(zhí)行完成之后,在將未完成的任務(wù)繼續(xù)完成。
Scheduler.from(executor) 指定Executor作為調(diào)度器
2.6 事件調(diào)度器

RxJava事件發(fā)出去并不是置之不顧,要有合理的管理者來管理它們,在合適的時機(jī)要進(jìn)行釋放事件,這樣才不會導(dǎo)致內(nèi)存泄漏,這里的管理者我們稱為事件調(diào)度器(或事件管理者)CompositeDisposable。

2.7 基類

RxJava 3 中的基類相比RxJava 2 沒啥改變,主要有以下幾個基類:

  • io.reactivex.Flowable:發(fā)送0個N個的數(shù)據(jù),支持Reactive-Streams和背壓
  • io.reactivex.Observable:發(fā)送0個N個的數(shù)據(jù),不支持背壓,
  • io.reactivex.Single:只能發(fā)送單個數(shù)據(jù)或者一個錯誤
  • io.reactivex.Completable:沒有發(fā)送任何數(shù)據(jù),但只處理 onComplete 和 onError 事件。
  • io.reactivex.Maybe:能夠發(fā)射0或者1個數(shù)據(jù),要么成功,要么失敗。
2.8 Observables的"熱"和"冷"

Observable什么時候開始發(fā)射數(shù)據(jù)序列?這取決于Observable的實現(xiàn),一個"熱"的Observable可能一創(chuàng)建完就開始發(fā)射數(shù)據(jù),因此所有后續(xù)訂閱它的觀察者可能從序列中間的某個位置開始接受數(shù)據(jù)(有一些數(shù)據(jù)錯過了)。一個"冷"的Observable會一直等待,直到有觀察者訂閱它才開始發(fā)射數(shù)據(jù),因此這個觀察者可以確保會收到整個數(shù)據(jù)序列。

在一些ReactiveX實現(xiàn)里,還存在一種被稱作Connectable的Observable,不管有沒有觀察者訂閱它,這種Observable都不會開始發(fā)射數(shù)據(jù),除非Connect方法被調(diào)用。

基本使用

需要知道的是,RxJava以觀察者模式為骨架,有兩種常見的觀察者模式:

  • Observable(被觀察者)/Observer(觀察者):不支持背壓
  • Flowable(被觀察者)/Subscriber(觀察者):支持背壓
3.1 Observable/Observer用法:
          Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
            }
        });
 
        Observer mObserver=new Observer<Integer>() {
            //這是新加入的方法,在訂閱后發(fā)送數(shù)據(jù)之前,
            //回首先調(diào)用這個方法,而Disposable可用于取消訂閱
            @Override
            public void onSubscribe(Disposable d) {
 
            }
 
            @Override
            public void onNext(Integer value) {
                Log.e("lucas", "onNext: "+value );
            }
 
            @Override
            public void onError(Throwable e) {
 
            }
 
            @Override
            public void onComplete() {
 
            }
        };
 
        mObservable.subscribe(mObserver);

這種觀察者模型不支持背壓:當(dāng)被觀察者快速發(fā)送大量數(shù)據(jù)時,下游不會做其他處理,即使數(shù)據(jù)大量堆積,調(diào)用鏈也不會報MissingBackpressureException,消耗內(nèi)存過大只會OOM。所以,當(dāng)我們使用Observable/Observer的時候,我們需要考慮的是,數(shù)據(jù)量是不是很大(官方給出以1000個事件為分界線作為參考)。

3.2 Flowable/Subscriber用法
        Flowable.range(0,10)
        .subscribe(new Subscriber<Integer>() {
            Subscription sub;
            //當(dāng)訂閱后,會首先調(diào)用這個方法,其實就相當(dāng)于onStart(),
            //傳入的Subscription s參數(shù)可以用于請求數(shù)據(jù)或者取消訂閱
            @Override
            public void onSubscribe(Subscription s) {
                Log.w("TAG","onsubscribe start");
                sub=s;
                sub.request(1);
                Log.w("TAG","onsubscribe end");
            }
 
            @Override
            public void onNext(Integer o) {
                Log.w("TAG","onNext--->"+o);
                sub.request(1);
            }
            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }
            @Override
            public void onComplete() {
                Log.w("TAG","onComplete");
            }
        });

Flowable是支持背壓的,也就是說,一般而言,上游的被觀察者會響應(yīng)下游觀察者的數(shù)據(jù)請求,下游調(diào)用request(n)來告訴上游發(fā)送多少個數(shù)據(jù)。這樣避免了大量數(shù)據(jù)堆積在調(diào)用鏈上,使內(nèi)存一直處于較低水平。

當(dāng)然,F(xiàn)lowable也可以通過creat()來創(chuàng)建:

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        }
        //需要指定背壓策略
        , BackpressureStrategy.BUFFER);

Flowable雖然可以通過create()來創(chuàng)建,但是你必須指定背壓的策略,以保證你創(chuàng)建的Flowable是支持背壓的。

根據(jù)上面的代碼的結(jié)果輸出中可以看到,當(dāng)我們調(diào)用subscription.request(n)方法的時候,不等onSubscribe()中后面的代碼執(zhí)行,就會立刻執(zhí)行到onNext方法,因此,如果你在onNext方法中使用到需要初始化的類時,應(yīng)當(dāng)盡量在subscription.request(n)這個方法調(diào)用之前做好初始化的工作;

當(dāng)然,這也不是絕對的,我在測試的時候發(fā)現(xiàn),通過create()自定義Flowable的時候,即使調(diào)用了subscription.request(n)方法,也會等onSubscribe()方法中后面的代碼都執(zhí)行完之后,才開始調(diào)用onNext。

TIPS: 盡可能確保在request()之前已經(jīng)完成了所有的初始化工作,否則就有空指針的風(fēng)險。

3.3 其他的觀察者

最常用的其實就是上面說的兩種訂閱觀察者,但是一些情況下,我們也會用到一些其他的一類觀察者比如

  • Single/SingleObserver
  • Completable/CompletableObserver
  • Maybe/MaybeObserver
3.3.1 Single/SingleObserver用法
        //被觀察者
        Single<String> single = Single.create(new SingleOnSubscribe<String>() {
            @Override
            public void subscribe(SingleEmitter<String> e) throws Exception {
                e.onSuccess("test");
                e.onSuccess("test2");//錯誤寫法,重復(fù)調(diào)用也不會處理,因為只會調(diào)用一次
            }
        });
 
        //訂閱觀察者SingleObserver
        single.subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {
 
            }
 
            @Override
            public void onSuccess(String s) {
                //相當(dāng)于onNext和onComplete
                Log.d("lucas",  s  );
            }
 
            @Override
            public void onError(Throwable e) {
 
            }
        });
//運(yùn)行結(jié)果
2020-04-03 23:02:37.337 15462-15462/com.ysalliance.getfan.myapplication D/lucas: test

Single類似于Observable,不同的是,它總是只發(fā)射一個值,或者一個錯誤通知,而不是發(fā)射一系列的值(當(dāng)然就不存在背壓問題),所以當(dāng)你使用一個單一連續(xù)事件流,這樣你可以使用Single。Single觀察者只包含兩個事件,一個是正常處理成功的onSuccess,另一個是處理失敗的onError。因此,不同于Observable需要三個方法onNext, onError, onCompleted,訂閱Single只需要兩個方法:

  • onSuccess - Single發(fā)射單個的值到這個方法

  • onError - 如果無法發(fā)射需要的值,Single發(fā)射一個Throwable對象到這個方法

Single只會調(diào)用這兩個方法中的一個,而且只會調(diào)用一次,調(diào)用了任何一個方法之后,訂閱關(guān)系終止。

Single的操作符:

Single也可以組合使用多種操作,一些操作符讓你可以混合使用Observable和Single:


Single操作符

詳細(xì)可參考:Single操作符

3.3.2 Completable/CompletableObserver

如果你的觀察者連onNext事件都不關(guān)心,可以使用Completable,它只有onComplete和onError兩個事件:

 
        Completable.create(new CompletableOnSubscribe() {//被觀察者
 
            @Override
            public void subscribe(CompletableEmitter e) throws Exception {
                e.onComplete();//單一onComplete或者onError
            }
 
        }).subscribe(new CompletableObserver() {//觀察者
            @Override
            public void onSubscribe(Disposable d) {
 
            }
 
            @Override
            public void onComplete() {
                Log.e("lucas", "onComplete: ");
            }
 
            @Override
            public void onError(Throwable e) {
 
            }
        });
//打印結(jié)果
2020-04-03 23:12:08.099 16264-16264/com.ysalliance.getfan.myapplication E/lucas: onComplete: 
3.3.3 Maybe/MaybeObserver

如果你有一個需求是可能發(fā)送一個數(shù)據(jù)或者不會發(fā)送任何數(shù)據(jù),這時候你就需要Maybe,它類似于Single和Completable的混合體。

Maybe可能會調(diào)用以下其中一種情況(也就是所謂的Maybe):

onSuccess或者onError
onComplete或者onError
可以看到onSuccess和onComplete是互斥的存在,例子代碼如下:

        //被觀察者
        Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {
            @Override
            public void subscribe(MaybeEmitter<String> e) throws Exception {
                e.onSuccess("test");//發(fā)送一個數(shù)據(jù)的情況,或者onError,不需要再調(diào)用onComplete(調(diào)用了也不會觸發(fā)onComplete回調(diào)方法)
                //e.onComplete();//不需要發(fā)送數(shù)據(jù)的情況,或者onError
            }
        });
 
        //訂閱觀察者
        maybe.subscribe(new MaybeObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {
 
            }
 
            @Override
            public void onSuccess(String s) {
                //發(fā)送一個數(shù)據(jù)時,相當(dāng)于onNext和onComplete,但不會觸發(fā)另一個方法onComplete
                Log.i("lucas", s);
            }
 
            @Override
            public void onComplete() {
                //無數(shù)據(jù)發(fā)送時候的onComplete事件
                Log.i("lucas", "onComplete");
            }
 
            @Override
            public void onError(Throwable e) {
 
            }
        });
//打印結(jié)果
2020-04-03 23:14:40.266 16558-16558/com.ysalliance.getfan.myapplication I/lucas: test

要轉(zhuǎn)換成其他類型的被觀察者,也是可以使用toFlowable()、toObservable()等方法去轉(zhuǎn)換。

//判斷是否登陸
Maybe.just(isLogin())
    //可能涉及到IO操作,放在子線程
    .subscribeOn(Schedulers.newThread())
    //取回結(jié)果傳到主線程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new MaybeObserver<Boolean>() {
            @Override
            public void onSubscribe(Disposable d) {
 
            }
 
            @Override
            public void onSuccess(Boolean value) {
                if(value){
                    ...
                }else{
                    ...
                }
            }
 
            @Override
            public void onError(Throwable e) {
 
            }
 
            @Override
            public void onComplete() {
 
            }
        });

上面就是Maybe/MaybeObserver的普通用法,你可以看到,實際上,這種觀察者模式并不用于發(fā)送大量數(shù)據(jù),而是發(fā)送單個數(shù)據(jù),也就是說,當(dāng)你只想要某個事件的結(jié)果(true or false)的時候,你可以用這種觀察者模式

3.4 事件調(diào)度器釋放事件
public class Main {
 
    private static CompositeDisposable mRxEvent = new CompositeDisposable();
 
    public static void main(String[] args) {
        Disposable subscribe = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("俊俊俊很帥");
                e.onNext("你值得擁有");
                e.onNext("取消關(guān)注");
                e.onNext("但還是要保持微笑");
                e.onComplete();
            }
        }).subscribe(
                new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        //對應(yīng)onNext()
                        System.out.println("accept=" + s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        //對應(yīng)onError()
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        //對應(yīng)onComplete()
                    }
                }, new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        //對應(yīng)onSubscribe()
                    }
                });
        
        mRxEvent.add(subscribe);
        mRxEvent.clear();
    }
}
 

CompositeDisposable提供的方法中,都是對事件的管理

  • dispose():釋放所有事件
  • clear():釋放所有事件,實現(xiàn)同dispose()
  • add():增加某個事件
  • addAll():增加所有事件
  • remove():移除某個事件并釋放
  • delete():移除某個事件
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容