Rxjava 2.x

官方網(wǎng)站
github

在ReactiveX中,一個觀察者(Observer)訂閱一個可觀察對象(Observable)。觀察者對Observable發(fā)射的數(shù)據(jù)或數(shù)據(jù)序列作出響應(yīng)。這種模式可以極大地簡化并發(fā)操作,因為它創(chuàng)建了一個處于待命狀態(tài)的觀察者哨兵,在未來某個時刻響應(yīng)Observable的通知,不需要阻塞等待Observable發(fā)射數(shù)據(jù)。

image.png
  1. 這是Observable的時間軸,從左到右。
  2. 這些是Obserable發(fā)出的事件
  3. 此垂直線表示Observable已成功發(fā)送處事件,事件發(fā)送完畢
  4. these dotted lines and this box indicate that a transformation is being applied to the Observable The text inside the box shows the nature of the transformation
    這些虛線和此框表示正在對Observable數(shù)據(jù)轉(zhuǎn)換。
    框內(nèi)的文本顯示轉(zhuǎn)換的性質(zhì)
  5. this Observable is the result of the transformation
    對Observable轉(zhuǎn)換的結(jié)果
  6. is for some reason the Observable terinates abnormally,with an error the vertical line is replaced by an x
    由于某種原因,Observable異常中斷,垂直線x代替錯誤。

創(chuàng)建Observables

  • Create — create an Observable from scratch by calling observer methods programmatically
image.png

創(chuàng)建一個默認(rèn)的Observable

  • Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
image.png

直到有觀察者訂閱時才創(chuàng)建Observable,并且為每個觀察者創(chuàng)建一個新的Observable

Defer操作符會一直等待直到有觀察者訂閱它,然后它使用Observable工廠方法生成一個Observable。它對每個觀察者都這樣做,因此盡管每個訂閱者都以為自己訂閱的是同一個Observable,事實上每個訂閱者獲取的是它們自己的單獨的數(shù)據(jù)序列。

  • Empty/Never/Throw — create Observables that have very precise and limited behavior

Empty 創(chuàng)建一個不發(fā)射任何數(shù)據(jù)但是正常終止的Observable

never 創(chuàng)建一個不發(fā)射數(shù)據(jù)也不終止的Observable

Throw 創(chuàng)建一個不發(fā)射數(shù)據(jù)以一個錯誤終止的Observable

  • From — convert some other object or data structure into an Observable
    image.png

一對多的關(guān)系

將一個Iterable, 一個Future, 或者一個數(shù)組轉(zhuǎn)換成一個Observable
產(chǎn)生的Observable會發(fā)射Iterable或數(shù)組的每一項數(shù)據(jù)。

  • Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
image.png

創(chuàng)建一個按固定時間間隔發(fā)射整數(shù)序列的Observable

  • Just — convert an object or a set of objects into an Observable that emits that or those objects
image.png

創(chuàng)建一個發(fā)射指定值的Observable

Just類似于From,但是From會將數(shù)組或Iterable的數(shù)據(jù)取出然后逐個發(fā)射,而Just只是簡單的原樣發(fā)射,將數(shù)組或Iterable當(dāng)做單個數(shù)據(jù)。

注意:如果你傳遞null給Just,它會返回一個發(fā)射null值的Observable。不要誤認(rèn)為它會返回一個空Observable(完全不發(fā)射任何數(shù)據(jù)的Observable),如果需要空Observable你應(yīng)該使用Empty操作符。

  • Range — create an Observable that emits a range of sequential integers

創(chuàng)建一個發(fā)射指定范圍的整數(shù)序列的Observable

Range操作符發(fā)射一個范圍內(nèi)的有序整數(shù)序列,你可以指定范圍的起始和長度。

RxJava將這個操作符實現(xiàn)為range函數(shù),它接受兩個參數(shù),一個是范圍的起始值,一個是范圍的數(shù)據(jù)的數(shù)目。如果你將第二個參數(shù)設(shè)為0,將導(dǎo)致Observable不發(fā)射任何數(shù)據(jù)(如果設(shè)置為負(fù)數(shù),會拋異常)。

  • Repeat — create an Observable that emits a particular item or sequence of items repeatedly

創(chuàng)建一個重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable

RxJava將這個操作符實現(xiàn)為repeat方法。它不是創(chuàng)建一個Observable,而是重復(fù)發(fā)射原始Observable的數(shù)據(jù)序列,這個序列或者是無限的,或者通過repeat(n)指定重復(fù)次數(shù)。

  • Start — create an Observable that emits the return value of a function
image.png

創(chuàng)建一個Observable,它發(fā)出類似函數(shù)的指令的返回值

  • Timer — create an Observable that emits a single item after a given delay

創(chuàng)建一個Observable,它在一個給定的延遲后發(fā)射一個特殊的值。

image.png

最基本使用

  • 創(chuàng)建 被觀察者
  • 創(chuàng)建 觀察者
  • 訂閱事件 連接 被觀察者 和觀察者
//創(chuàng)建 被觀察者
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                Log.d(TAG, "subscribe: 1");

                emitter.onNext("2");
                Log.d(TAG, "subscribe: 2");

                emitter.onNext("3");
                Log.d(TAG, "subscribe: 3");

                emitter.onNext("4");
                Log.d(TAG, "subscribe: 4");

                emitter.onComplete();
            }
        }).subscribe(//訂閱
                new Observer<String>() {//觀察者

                    private Disposable disposable;


                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: " + d.toString());
                        this.disposable = d;
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: " + s);

                        Log.d(TAG, "onNext: disposed 前" + disposable.isDisposed());
                        if (s.equals("2")) {
                            disposable.dispose();
                        }
                        Log.d(TAG, "onNext: disposed 后" + disposable.isDisposed());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });

在 Observer觀察者 中,多了一個回調(diào)方法:onSubscribe,傳遞參數(shù)為Disposable,Disposable 相當(dāng)于 RxJava 1.x 中的 Subscription, 用于解除訂閱。當(dāng)s等于2時,解除訂閱,觀察者不會在接收到被觀察者發(fā)出的消息,也包裹 onComplete事件。

在請求的過程中Activity已經(jīng)退出了, 這個時候如果回到主線程去更新UI, 那么APP肯定就崩潰了, 怎么辦呢, 上一節(jié)我們說到了Disposable , 說它是個開關(guān), 調(diào)用它的dispose()方法時就會切斷水管, 使得下游收不到事件, 既然收不到事件, 那么也就不會再去更新UI了. 因此我們可以在Activity中將這個Disposable 保存起來, 當(dāng)Activity退出時, 切斷它即可.

那如果有多個Disposable 該怎么辦呢, RxJava中已經(jīng)內(nèi)置了一個容器CompositeDisposable, 每當(dāng)我們得到一個Disposable時就調(diào)用CompositeDisposable.add()將它添加到容器中, 在退出的時候, 調(diào)用CompositeDisposable.clear() 即可切斷所有的水管.

上游可以發(fā)送無限個onNext, 下游也可以接收無限個onNext.
當(dāng)上游發(fā)送了一個onComplete后, 上游onComplete之后的事件將會繼續(xù)發(fā)送, 而下游收到onComplete事件之后將不再繼續(xù)接收事件.
當(dāng)上游發(fā)送了一個onError后, 上游onError之后的事件將繼續(xù)發(fā)送, 而下游收到onError事件之后將不再繼續(xù)接收事件.
上游可以不發(fā)送onComplete或onError.
最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError, 反之亦然

下面是log信息。

    onNext: 1
    onNext: disposed 前false
    onNext: disposed 后false
    subscribe: 1
    onNext: 2
    onNext: disposed 前false
    onNext: disposed 后true
    subscribe: 2
    subscribe: 3
    subscribe: 4

form just 輸出一樣

Observable<Integer> form = Observable.fromArray(items);

Observable<Integer> just = Observable.just(1, 2, 3, 4, 5);
Observable<Integer> range = Observable.range(1, 10);

    onNext: 1
    onNext: 2
    onNext: 3
    onNext: 4
    onNext: 5
    onNext: 6
    onNext: 7
    onNext: 8
    onNext: 9
    onNext: 10
    onComplete: 

subscribe 觀察者

    public final Disposable subscribe() {}
    // 表示觀察者不對被觀察者發(fā)送的事件作出任何響應(yīng)(但被觀察者還是可以繼續(xù)發(fā)送事件)

    public final Disposable subscribe(Consumer<? super T> onNext) {}
    // 表示觀察者只對被觀察者發(fā)送的Next事件作出響應(yīng)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    // 表示觀察者只對被觀察者發(fā)送的Next事件 & Error事件作出響應(yīng)

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    // 表示觀察者只對被觀察者發(fā)送的Next事件、Error事件 & Complete事件作出響應(yīng)

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    // 表示觀察者只對被觀察者發(fā)送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出響應(yīng)

    public final void subscribe(Observer<? super T> observer) {}
    // 表示觀察者對被觀察者發(fā)送的任何事件都作出響應(yīng)

調(diào)度器 Scheduler

  • Schedulers.computation(?)
    用于計算任務(wù),如事件循環(huán)或和回調(diào)處理,不要用于IO操作(IO操作請使用Schedulers.io());默認(rèn)線程數(shù)等于處理器的數(shù)量

  • Schedulers.from(executor)
    使用指定的Executor作為調(diào)度器

  • Schedulers.immediate(?)
    在當(dāng)前線程立即開始執(zhí)行任務(wù)

  • Schedulers.io(?)
    用于IO密集型任務(wù),如異步阻塞IO操作,這個調(diào)度器的線程池會根據(jù)需要增長;對于普通的計算任務(wù),請使用Schedulers.computation();Schedulers.io(?)默認(rèn)是一個CachedThreadScheduler,很像一個有線程緩存的新線程調(diào)度器

  • Schedulers.newThread(?)
    為每個任務(wù)創(chuàng)建一個新線程

  • Schedulers.trampoline(?)
    當(dāng)其它排隊的任務(wù)完成后,在當(dāng)前線程排隊開始執(zhí)行

簡單的來說, subscribeOn() 指定的是上游發(fā)送事件的線程, observeOn() 指定的是下游接收事件的線程.
多次指定上游的線程只有第一次指定的有效, 也就是說多次調(diào)用subscribeOn() 只有第一次的有效, 其余的會被忽略.
多次指定下游的線程是可以的, 也就是說每調(diào)用一次observeOn() , 下游的線程就會切換一次.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容