RxJava 系列 (三)RxJava lift原理

前言

理解lift原理有什么意義?

  • 可以理解Rxjava最核心的原理,看懂了lift你就看懂了Rxjava

lift是Rxjava操作符的基礎(chǔ)原理,操作符是Rxjava功能如此豐富和好用的核心,理解了lift也就理解了Rxjava最核心的原理

  • 可以理解線程切換的原理,有助于靈活運用線程切換和調(diào)試線程相關(guān)的問題

線程切換也是用的操作符,所以原理也是lift

RxJava基本用法

  1. 創(chuàng)建 Observer
Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

Observer 需要實現(xiàn)三個方法,相當(dāng)于定義了三種類型的事件

  • Subscriber 與 Observer的關(guān)系

Subscriber相當(dāng)于增加了Subscription(訂閱關(guān)系管理)功能的Observer
Subscription 包含兩個方法:
unsubscribe();(取消訂閱)
isUnsubscribed();(查詢訂閱關(guān)系)

Subscriber 還增加了一個onStart()方法:它會在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準備工作

實質(zhì)上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉(zhuǎn)換成一個 Subscriber 再使用,所以為了統(tǒng)一,我們就統(tǒng)一以Subscriber來作為觀察者,就不再提Observer了。

  1. 創(chuàng)建 Observable
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

創(chuàng)建Observable對象時,會傳入一個 OnSubscribe 對象,OnSubscribe對象會被存儲在生成的 Observable 對象中。

OnSubscribe 對象的作用,就是拿到 subscriber對象,向subscriber 對象發(fā)送事件。
這里拿到觀察者Subscriber對象,并調(diào)用Subscriber實現(xiàn)的三個方法,就是在向觀察者發(fā)送事件

  1. Subscribe (訂閱)
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

Observable.subscribe(Subscriber) 的內(nèi)部實現(xiàn)是這樣的(僅核心代碼):

// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

可以看到,subscriber() 做了3件事:

  1. 調(diào)用 Subscriber.onStart() 。這個方法在前面已經(jīng)介紹過,是一個可選的準備方法。
  2. 調(diào)用 Observable 中的 OnSubscribe.call(Subscriber) 。在這里,事件發(fā)送的邏輯開始運行。從這也可以看出,在 RxJava 中, Observable 并不是在創(chuàng)建的時候就立即開始發(fā)送事件,而是在它被訂閱的時候,即當(dāng) subscribe() 方法執(zhí)行的時候。
  3. 將傳入的 Subscriber 作為 Subscription 返回。這是為了方便進行訂閱關(guān)系管理,比如 unsubscribe().

操作符-對事件序列進行變換

所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉(zhuǎn)換成不同的事件或事件序列。概念說著總是模糊難懂的,來看 例子。

舉個例子:map()

Observable.just("images/logo.png") // 輸入類型 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 參數(shù)類型 String
            return getBitmapFromPath(filePath); // 返回類型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap
            showBitmap(bitmap);
        }
    });

map(): 對事件對象的直接變換。

變換的原理:lift()

這些變換雖然功能各有不同,但實質(zhì)上都是針對事件序列的處理和再發(fā)送。而在 RxJava 的內(nèi)部,它們是基于同一個基礎(chǔ)的變換方法: lift(Operator)。首先看一下 lift() 的內(nèi)部實現(xiàn)(僅核心代碼):

// 注意:這不是 lift() 的源碼,而是將源碼中與性能、兼容性、擴展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}

先不用關(guān)心細節(jié),我們先知道 lift是接受一個operator參數(shù),返回一個新的Observable對象。
在分析 lift() 的內(nèi)部實現(xiàn)之前,我們先看一下加上操作符的一次調(diào)用的完整過程

一次包含操作符的調(diào)用的完整過程

Observable.just("images/logo.png") // 輸入類型 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 參數(shù)類型 String
            return getBitmapFromPath(filePath); // 返回類型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap
            showBitmap(bitmap);
        }
    });

做了三步:
1.生成observable對象
2.在observable對象上調(diào)用了map方法
3.在map方法的返回值上調(diào)用了.subscribe方法

問題:1.map方法內(nèi)部做了什么? 2.map方法的返回值是個什么?

回答兩個問題

因為map()內(nèi)部不是直接調(diào)用的lift方法(跟lift的原理一樣,只是沒有直接使用lift方法),所以我們以take()操作符的源碼為例,來看方法調(diào)用。


回答兩個問題:

  1. take方法內(nèi)部很簡單,就是調(diào)用了lift方法
    2.返回值就是lift方法的返回值,是個新new的observable對象。

將示例稍作修改

        Observable observable1 = Observable.just("images/logo.png"); // 輸入類型 String
        
        Observable observable2 = observable1.map(new Func1<String, Bitmap>() {
                    @Override
                    public Bitmap call(String filePath) { // 參數(shù)類型 String
                        return getBitmapFromPath(filePath); // 返回類型 Bitmap
                    }
                });
        
        observable2.subscribe(new Action1<Bitmap>() {
                    @Override
                    public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap
                        showBitmap(bitmap);
                    }
                });
observable2.subscribe 分析

回顧前面subscribe()的內(nèi)部實現(xiàn),我們發(fā)現(xiàn)observable2里的OnSubscribe對象的call方法會被調(diào)用。而observable2就是lift方法返回的Observable對象, observable2里的onSubscribe對象就是lift的核心重點。

observable2里的OnSubscribe對象 —lift的核心重點

observable2對象里面通過observable1對象的onSubscribe.call(newSubscriber)達到通知observable1目的。
因為observable1對象是我們初始的Observable對象,它的onSubscribe.call會發(fā)送事件到newSubscriber。
newSubscriber是operator.call(subscriber)返回的,newSubscriber做了兩件事:
1.進行事件的變換操作。newSubscriber能拿到初始的事件,可以進行轉(zhuǎn)換操作,這也是操作符發(fā)生效力的地方,不同的操作符的作用就是對事件進行不同的轉(zhuǎn)換。
2.轉(zhuǎn)發(fā)給subscriber,newSubscriber有subscriber的引用,可以將轉(zhuǎn)換后的事件轉(zhuǎn)發(fā)給subscriber,也就是最終的訂閱者。
lift方法返回的observable2對象在調(diào)用鏈的中間起到了一個中轉(zhuǎn)的作用,這就是lift原理的核心。

operator.call內(nèi)部通過傳入一個subscriber返回一個newSubscriber,newSubscriber能達到事件轉(zhuǎn)換和轉(zhuǎn)發(fā)的目的。
是如何做到的?
我們來舉個例子

Operator的一個例子

多個操作符的情況

多個操作符相當(dāng)于中間經(jīng)過了多層中轉(zhuǎn),原理都一樣

關(guān)于事件發(fā)送的觸發(fā)

結(jié)合上面多個操作符的圖,強調(diào)一下:

事件發(fā)送的觸發(fā)是從調(diào)用subscribe()方法后開始的,前面哪怕調(diào)用了N多的操作符方法只要還沒有調(diào)用subscribe()方法其實并沒有觸發(fā)事件的發(fā)送。

事件發(fā)送之前有一個從下往上通知的過程,當(dāng)subscribe()方法被調(diào)用之后,先是通過observable2對象里調(diào)用observable1對象的onSubscribe.call通知observable1對象,如果observable1對象不是最開始發(fā)送事件的Observable對象(多個操作符的情況),那么同樣的還會繼續(xù)往上通知,直到通知到初始的Observable對象,才會開始事件的發(fā)送。

所以RxJava是 先從下往上通知,然后再從上往下發(fā)送事件

參考文獻

給 Android 開發(fā)者的 RxJava 詳解 --扔物線
http://gank.io/post/560e15be2dca930e00da1083

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容