源碼解讀篇 — RxJava2源碼解讀

本篇包含內(nèi)容包括
1 RxJava事件流向的基本流程;
2 自己手寫(xiě)一個(gè)RxJava的基本流程。

1 RxJava事件流向的基本流程

最開(kāi)始接觸RxJava時(shí),很多文章把observer,observable對(duì)象定義為觀察者和被觀察者。其實(shí)這樣容易把人給繞暈,直接把observable看成上游產(chǎn)生事件者,把observer看出下游接收處理事件者。

RxJava2最簡(jiǎn)單的調(diào)用方式如下:

Observable.create(new ObservableOnSubscribe<String>() { // ①
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {

    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

基本的邏輯是:Observable通過(guò)create方法創(chuàng)建一個(gè)<? extends Observable>的類,然后通過(guò)<? extends Observable>類調(diào)用subscribe方法,并傳入一個(gè)觀察者observer。

需要搞定的問(wèn)題:

問(wèn):create方法創(chuàng)建的到底是什么類型的實(shí)例?

帶著問(wèn)題我們先來(lái)看處的create方法,主要涉及的內(nèi)容如下:

// Observable 類
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));  ①
}

// RxJavaPlugins類
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;  ②
}

為避免分散精力,我們只看主流分支,搞清主流分支,所有的脈絡(luò)就基本清晰。

由上處可知,返回的Observable對(duì)象就是方法onAssembly的返回值,而當(dāng)onObservableAssembly為空時(shí),返回值其實(shí)就是處傳進(jìn)來(lái)的ObservableCreate對(duì)象。

所以到此,我們可以回答上面的問(wèn)題了:create創(chuàng)建的對(duì)象就是ObservableCreate。

那么正常邏輯下,我們現(xiàn)在肯定要找ObservableCreatesubscribe方法。

嗯嗯~可事情會(huì)如我們預(yù)期的那么順利嗎?我們?cè)?code>ObservableCreate類中是找不到subscribe方法。

問(wèn):subscribe在哪里?

我們來(lái)看看ObservableCreate類部分源碼:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    ......
}

由上可知ObservableCreate繼承至Observable,但我們并沒(méi)有找到subscribe方法。于是第一個(gè)想法就是去父類找,也就是Observable類中。

// Observable類
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

我的乖乖~,subscribe竟然是個(gè)final方法,難怪在子類中看不到這個(gè)方法,不過(guò)我們看到了另一個(gè)方法subscribeActual,點(diǎn)進(jìn)去一看:

// Observable類
protected abstract void subscribeActual(Observer<? super T> observer);

是個(gè)抽象方法,這我們就放心了,有點(diǎn)類似與Android控件ViewMeasure方法中的onMeasure。

于是乎,我們就可以回答上面提出的問(wèn)題。

答:Observablesubscribe方法是抽象方法,所有子類繼承Observable后,實(shí)現(xiàn)其抽象方法subscribeActual,進(jìn)行實(shí)際的訂閱操作。

于是我們就直接去看ObservableCreate類中的subscribeActual方法:

// ObservableCreate類
@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);   ①
    observer.onSubscribe(parent);  ②

    try {
        source.subscribe(parent);  ③
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

問(wèn):發(fā)送消息的基本原理?

①處創(chuàng)建發(fā)射器

②處的方法是不是看上去很眼熟,對(duì)的,沒(méi)錯(cuò),它就是我們觀察者Observer最先被調(diào)用的方法。

③處source就是我們最開(kāi)始調(diào)用的create方法中的參數(shù)ObservableOnSubscribe,也就是發(fā)射器中的參數(shù)。

還記得我們最開(kāi)始那個(gè)例子嗎?

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {

    }
})
......

③處傳入的參數(shù)就是此處ObservableEmitter對(duì)象的實(shí)例,所以每次當(dāng)
我們利用emitter發(fā)送消息時(shí),就觸發(fā)了CreateEmitter類的onNext方法。

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
        }
        if (!isDisposed()) {
            // 觸發(fā)observer對(duì)象的onNext方法
            observer.onNext(t);
        }
    }
  省略部分代碼.....
}

2 自己手寫(xiě)一個(gè)RxJava的基本流程

古人云:紙上得來(lái)終覺(jué)淺,絕知此事要躬行。我們自己手寫(xiě)一個(gè)這個(gè)的過(guò)程比任何教程都會(huì)要記得深刻,牢固。

寫(xiě)我們的Observable,我把命名都加上一個(gè)perry前綴。

abstract class PerryObservable<T> {
    // 訂閱開(kāi)始調(diào)用的方法
    fun subscribe(observer: PerryObserver<in T>) {
        subscribeActual(observer)
    }
    
    internal abstract fun subscribeActual(observer: PerryObserver<in T>)

    companion object {
        // create構(gòu)造方法
        fun <T> create(source: PerryObservableOnSubscribe<T>): PerryObservable<T> {
            return PerryObservableCreate(source)
        }
    }
}

接下來(lái)是PerryObservableCreate

class PerryObservableCreate<T>(private val source: PerryObservableOnSubscribe<T>) : PerryObservable<T>() {

    override fun subscribeActual(observer: PerryObserver<in T>) {
        val emitter = PerryCreateEmitter(observer)
        observer.onSubscribe(emitter)

        source.subscribe(emitter)
    }
    
    // 這是我們的發(fā)射器
    class PerryCreateEmitter<T> internal constructor(private val observer: PerryObserver<in T>) 
      : PerryDisposable, PerryEmitter<T> {

        override fun dispose() {

        }

        override fun isDisposed(): Boolean {
            return false
        }

        override fun onNext(value: T) {
            observer.onNext(value)
        }

        override fun onError(error: Throwable) {
            observer.onError(error)
        }

        override fun onComplete() {
            observer.onComplete()
        }
    }
}

其他全部都是一些接口,就不一一貼出來(lái)啦。

interface PerryEmitter<T> {

    fun onNext(@NonNull value: T)
    
    fun onError(@NonNull error: Throwable)
    
    fun onComplete()
}

最終調(diào)用方法如下:

PerryObservable.Companion.create(new PerryObservableOnSubscribe<String>() {
    @Override
    public void subscribe(PerryObservableCreate.PerryCreateEmitter<String> emitter) {
        emitter.onNext("hello");
        emitter.onNext("world");
    }
}).subscribe(new PerryObserver<String>() {
    @Override
    public void onSubscribe(PerryDisposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d("zp_test", s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

打印日志:

image.png

至此,我們基本搞清其發(fā)生消息的邏輯,由于篇幅有限,其他更加高級(jí)的功能,期待下次再見(jiàn)!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容