RxJava3原理解析

文章首發(fā)于我建立的一個(gè)筆記倉庫

1. 背景

RxJava是一個(gè)基于事件流、實(shí)現(xiàn)異步操作的庫。

官方介紹: RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
(RxJava 是一個(gè)在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫)

文中用到的RxJava源碼版本為3.0.13,文中的demo源碼 https://github.com/xfhy/AllInOne/tree/master/app/src/main/java/com/xfhy/allinone/opensource/rxjava

2. 基礎(chǔ)使用

簡單介紹一下如何與Retrofit結(jié)合使用。引入:

implementation "io.reactivex.rxjava3:rxjava:3.0.13"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation "com.github.akarnokd:rxjava3-retrofit-adapter:3.0.0"

//Retrofit
implementation "com.squareup.retrofit2:retrofit:2.9.0"
//可選
implementation "com.squareup.retrofit2:converter-gson:2.9.0"

構(gòu)建Retrofit實(shí)例

private val retrofit by lazy {
        Retrofit.Builder()
            .baseUrl("https://www.wanandroid.com")
            //使用Gson解析
            .addConverterFactory(GsonConverterFactory.create())
            //轉(zhuǎn)換器   RxJava3   每次執(zhí)行的時(shí)候在IO線程
            .addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io()))
            .build()
    }

定義Retrofit的API:

interface WanAndroidService {

    @GET("wxarticle/chapters/json")
     fun listReposByRxJava(): Single<WxList?>

}

class WxList {
    var errorMsg = ""
    var errorCode = -1
    var data = mutableListOf<Wx>()

    class Wx {
        var id: Int = 0
        var name: String = ""
    }
}

請(qǐng)求網(wǎng)絡(luò):

fun reqNet() {
    val request = retrofit.create(WanAndroidService::class.java)
    val call = request.listReposByRxJava()
    call.observeOn(AndroidSchedulers.mainThread()).subscribe(object : SingleObserver<WxList?> {
        override fun onSubscribe(d: Disposable?) {
            tvContent.text = "開始請(qǐng)求網(wǎng)絡(luò)"
        }

        override fun onSuccess(t: WxList?) {
            t?.let {
                tvContent.text = it.data[0].name
            }
        }

        override fun onError(e: Throwable?) {
            tvContent.text = "網(wǎng)絡(luò)出錯(cuò)"
        }
    })
}

這樣,一個(gè)簡單的Retrofit與OKHttp的結(jié)合案例就完成了?,F(xiàn)在請(qǐng)求網(wǎng)絡(luò)的時(shí)候就可以使用RxJava那些鏈?zhǔn)讲僮髁恕?/p>

3. just : 最簡單的訂閱關(guān)系

先從最簡單的just開始,看一下RxJava的訂閱關(guān)系是怎么樣的。

val just: Single<Int> = Single.just(1)
just.subscribe(object : SingleObserver<Int> {
    override fun onSubscribe(d: Disposable?) {
    }

    override fun onSuccess(t: Int) {
    }

    override fun onError(e: Throwable?) {
    }
})

Single.just(1)會(huì)構(gòu)建一個(gè)SingleJust實(shí)例出來,

//Single.java
public static <@NonNull T> Single<T> just(T item) {
    Objects.requireNonNull(item, "item is null");
    return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}

其中RxJavaPlugins.onAssembly是一個(gè)鉤子,不用在意,這段代碼就是返回一個(gè)SingleJust對(duì)象。

點(diǎn)進(jìn)去看一下subscribe是怎么走的

//Single.java
@Override
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
    ...
    subscribeActual(observer);
    ...
}

核心代碼就一句,調(diào)用subscribeActual方法,從名字看是進(jìn)行實(shí)際地訂閱。那么我們將目光聚焦到subscribeActual里面,它是一個(gè)抽象方法,就上面的demo而言其實(shí)際實(shí)現(xiàn)是剛才創(chuàng)建出來的SingleJust。

//Single.java
protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);

//SingleJust.java
public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposable.disposed());
        observer.onSuccess(value);
    }

}

SingleJust里面的代碼非常簡潔,在實(shí)際訂閱(調(diào)用subscribeActual)時(shí),直接將傳進(jìn)來的觀察者(也就是上面?zhèn)魅氲腟ingleObserver)回調(diào)onSubscribe和onSuccess就完事了。此處沒有onError,因?yàn)椴粫?huì)失敗。

4. map 操作符

4.1 原理

我們知道,RxJava中map可以轉(zhuǎn)換數(shù)據(jù),看一下它是怎么做到的

val singleInt = Single.just(1)
val singleString = singleInt.map(object : Function<Int, String> {
    override fun apply(t: Int): String {
        return t.toString()
    }
})
singleString.subscribe(object : SingleObserver<String> {
    override fun onSubscribe(d: Disposable?) {
    }

    override fun onSuccess(t: String) {
    }

    override fun onError(e: Throwable?) {
    }
})

點(diǎn)進(jìn)去map看一下:

//Single.java
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}

構(gòu)建了一個(gè)SingleMap,有了上面just的經(jīng)驗(yàn),訂閱的時(shí)候是走的SingleMap的subscribeActual方法。直接去看:

public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;

    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }
}

注意一下這個(gè)source,它是啥?在構(gòu)造方法里面?zhèn)魅氲模簿褪窃赟ingle.java的map方法那里傳入的this,這個(gè)this也就是Single.just(1)所構(gòu)建出來的SingleJust對(duì)象。這個(gè)SingleJust也就是此處map的上游,上游把事件給下游。

此處訂閱時(shí),就是調(diào)一下上游的subscribe與自己綁定起來,完成訂閱關(guān)系。現(xiàn)在生產(chǎn)者是上游,而此處的SingleMap就是下游的觀察者。

MapSingleObserver,也就是map的觀察者,來看一下它是怎么實(shí)現(xiàn)的

public final class SingleMap<T, R> extends Single<R> {
    static final class MapSingleObserver<T, R> implements SingleObserver<T> {

        final SingleObserver<? super R> t;

        final Function<? super T, ? extends R> mapper;

        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            this.t = t;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe(Disposable d) {
            t.onSubscribe(d);
        }

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                //mapper是demo中傳入的object : Function<Int, String>
                v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }

            t.onSuccess(v);
        }

        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
}

其實(shí)t是下游的觀察者,通過subscribeActual傳入。在上游調(diào)用map的onSubscribe同時(shí),map也向下傳遞這個(gè)事件,調(diào)用下游觀察者的onSubscribe。在上游調(diào)用map的onSuccess時(shí),map自己進(jìn)行轉(zhuǎn)換一下,再交給下游的onSuccess。同理,onError也是一樣的路線。

到這里就理清楚了。

4.2 框架結(jié)構(gòu)

RxJava的整體結(jié)構(gòu)是一條鏈,其中:

  1. 鏈的最上游:生產(chǎn)者Observable
  2. 鏈的最下游:觀察者Observer
  3. 鏈的中間:各個(gè)中介節(jié)點(diǎn),既是下游的Observable,又是上游的Observer

4.3 操作符Operator(map等)的本質(zhì)

  1. 基于原Observable創(chuàng)建一個(gè)新的Observable
  2. Observable內(nèi)部創(chuàng)建一個(gè)Observer
  3. 通過定制Observable的subscribeActual()方法和Observer的onXxx()方法,來實(shí)現(xiàn)自己的中介角色(例如數(shù)據(jù)轉(zhuǎn)換、線程切換等)

5. dispose工作原理

可以通過dispose()方法來讓上游或內(nèi)部調(diào)度器(或兩者都有)停止工作,達(dá)到「丟棄」的效果。

下面分別講一下這幾種情況:

  • Single.just 無后續(xù),無延遲
  • Observable.interval 有后續(xù),有延遲
  • Single.map 無后續(xù),無延遲,有上下游
  • Single.delay 無后續(xù),有延遲
  • Observable.map 有后續(xù),無延遲
  • Observable.delay 無后續(xù),有延遲

這幾種情況已經(jīng)足夠把所有dispose的情況都說明完整了。

5.1 Single.just 無后續(xù),無延遲

對(duì)于Single.just,情況比較簡單,在SingleJust的subscribeActual中,給觀察者一個(gè)全局共享的Disposable對(duì)象。下游不能對(duì)其進(jìn)行取消,因?yàn)殚g隔太短了,馬上就調(diào)用onSuccess了。

@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
    observer.onSubscribe(Disposable.disposed());
    observer.onSuccess(value);
}

5.2 Observable.interval 有后續(xù),有延遲

先來一段示例代碼:

val longObservable: Observable<Long> = Observable.interval(0, 1, TimeUnit.SECONDS)
longObservable.subscribe(object : Observer<Long> {
    override fun onSubscribe(d: Disposable?) {
    }

    override fun onNext(t: Long?) {
    }

    override fun onError(e: Throwable?) {
    }

    override fun onComplete() {
    }
})

這里Observable.interval構(gòu)建的是ObservableInterval對(duì)象。有了前面的經(jīng)驗(yàn),直接進(jìn)去看ObservableInterval的subscribeActual方法。

//ObservableInterval.java
@Override
public void subscribeActual(Observer<? super Long> observer) {
    //1. 創(chuàng)建觀察者(該觀察者還實(shí)現(xiàn)了Disposable)
    IntervalObserver is = new IntervalObserver(observer);
    observer.onSubscribe(is);

    //線程調(diào)度器
    Scheduler sch = scheduler;

    ...
    //將is(它實(shí)現(xiàn)了Runnable)這個(gè)任務(wù)交給線程調(diào)度器去執(zhí)行,同時(shí)返回一個(gè)Disposable對(duì)象
    Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
    is.setResource(d);
    ...
}

首先是創(chuàng)建了一個(gè)觀察者,該觀察者很明顯是實(shí)現(xiàn)了Disposable接口,因?yàn)閷⒃撚^察者順著onSubscribe傳遞給了下游,方便下游取消。隨后,將該觀察者交給線程調(diào)度器去執(zhí)行,顯然它還實(shí)現(xiàn)了Runnable接口,緊接著將調(diào)度器返回的Disposable對(duì)象設(shè)置給該觀察者。

static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {

    private static final long serialVersionUID = 346773832286157679L;

    final Observer<? super Long> downstream;

    long count;
    
    //傳入的Observer是下游的
    IntervalObserver(Observer<? super Long> downstream) {
        this.downstream = downstream;
    }

    @Override
    public void dispose() {
        //取消自己
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return get() == DisposableHelper.DISPOSED;
    }

    @Override
    public void run() {
        //通知下游
        if (get() != DisposableHelper.DISPOSED) {
            downstream.onNext(count++);
        }
    }

    public void setResource(Disposable d) {
        //設(shè)置Disposable給自己
        DisposableHelper.setOnce(this, d);
    }
}

IntervalObserver繼承自AtomicReference(AtomicReference類提供了一個(gè)可以原子讀寫的對(duì)象引用變量,避免出現(xiàn)線程安全問題),泛型是Disposable。同時(shí)它也實(shí)現(xiàn)了Disposable和Runnable。在構(gòu)造方法里面?zhèn)魅胂掠蔚挠^察者,方便待會(huì)兒把事件傳給下游。

當(dāng)事件一開始時(shí),將IntervalObserver傳遞給下游,因?yàn)樗鼘?shí)現(xiàn)了Disposable,可以被下游取消。然后將IntervalObserver傳遞給調(diào)度器,調(diào)度器會(huì)執(zhí)行里面的run方法,run方法里面是將數(shù)據(jù)傳遞給下游。在交給調(diào)度器的時(shí)候,返回了一個(gè)Disposable對(duì)象,意味著可以隨時(shí)取消調(diào)度器里面的該任務(wù)。然后將該Disposable對(duì)象設(shè)置給IntervalObserver的內(nèi)部,通過setResource方法,其實(shí)就是設(shè)置給IntervalObserver自己的,它本身就是一個(gè)AtomicReference<Disposable>。當(dāng)下游調(diào)用dispose時(shí),即調(diào)用IntervalObserver的dispose,然后IntervalObserver內(nèi)部隨即調(diào)用自己的dispose方法,完成了取消。

這里為什么設(shè)計(jì)的這么繞?直接將調(diào)度器返回的Disposable對(duì)象返回給下游不就可以了么,下游也可以對(duì)其進(jìn)行取消啊?這樣設(shè)計(jì)的好處是上游傳遞給下游的永遠(yuǎn)是IntervalObserver對(duì)象,下游直接拿著這個(gè)實(shí)現(xiàn)了Disposable的IntervalObserver對(duì)象可以直接調(diào)用它的dispose進(jìn)行取消。而不用管它內(nèi)部當(dāng)前是握著哪個(gè)Disposable對(duì)象,即使IntervalObserver內(nèi)部的Disposable被更換了也絲毫不影響下游對(duì)上游的取消操作。

5.3 Single.map 無后續(xù),無延遲,有上下游

先來個(gè)簡單例子

val singleInt = Single.just(1)
val singleString = singleInt.map(object : Function<Int, String> {
    override fun apply(t: Int): String {
        return t.toString()
    }
})
singleString.subscribe(object : SingleObserver<String> {
    override fun onSubscribe(d: Disposable?) {
    }

    override fun onSuccess(t: String) {
    }

    override fun onError(e: Throwable?) {
    }
})

singleInt.map點(diǎn)進(jìn)去

//Single.java
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}

通過上面的例子我們知道,上游是創(chuàng)建了一個(gè)SingleJust對(duì)象。在調(diào)用map時(shí),將自己(也就是SingleJust)傳給下游SingleMap里面去了。

//SingleMap.java
public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;

    final Function<? super T, ? extends R> mapper;
    
    //source是上游,通過構(gòu)造方法傳入進(jìn)來
    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        //t是下游
        //訂閱
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

    static final class MapSingleObserver<T, R> implements SingleObserver<T> {

        final SingleObserver<? super R> t;

        final Function<? super T, ? extends R> mapper;

        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            this.t = t;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe(Disposable d) {
            t.onSubscribe(d);
        }

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }

            t.onSuccess(v);
        }

        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
}

一開場就直接調(diào)用上游source訂閱MapSingleObserver這個(gè)觀察者。在MapSingleObserver的邏輯也比較簡單,就是實(shí)現(xiàn)了onSubscribe、onSuccess、onError這些方法。然后在上游調(diào)用onSubscribe時(shí)調(diào)用下游的onSubscribe;在上游調(diào)用onSuccess時(shí)自己做了一下mapper.apply(value)轉(zhuǎn)換操作,將數(shù)據(jù)轉(zhuǎn)換成下游所需要的,然后再調(diào)用下游的onSuccess傳遞給下游;onError同onSubscribe原理是一樣的。

5.4 Single.delay 無后續(xù),有延遲

來段示例代碼:

val singleInt: Single<Int> = Single.just(1)
val singleDelay: Single<Int> = singleInt.delay(1, TimeUnit.SECONDS)
val observer = object : SingleObserver<Int> {
    override fun onSubscribe(d: Disposable?) {
        log("onSubscribe")
    }

    override fun onSuccess(t: Int?) {
        log("onSuccess")
    }

    override fun onError(e: Throwable?) {
        log("onError")
    }
}
singleDelay.subscribe(observer)

直搗黃龍,Single.delay背后的對(duì)象是SingleDelay?,F(xiàn)在有經(jīng)驗(yàn)了,直接看它的subscribeActual

@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
    //可以確定的是這是一個(gè)Disposable
    final SequentialDisposable sd = new SequentialDisposable();
    //將這個(gè)Disposable通過onSubscribe傳遞給下游
    observer.onSubscribe(sd);
    //讓上游訂閱Delay這個(gè)觀察者
    source.subscribe(new Delay(sd, observer));
}

看下SequentialDisposable是什么玩意兒

public final class SequentialDisposable
extends AtomicReference<Disposable>
implements Disposable {
    public SequentialDisposable() {
        // nothing to do
    }
    public SequentialDisposable(Disposable initial) {
        lazySet(initial);
    }
    public boolean update(Disposable next) {
        return DisposableHelper.set(this, next);
    }
    public boolean replace(Disposable next) {
        return DisposableHelper.replace(this, next);
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}

似曾相識(shí),上面的IntervalObserver也是這種思想。只不過這里多了2個(gè)update和replace方法,可以隨時(shí)更換AtomicReference里面的Disposable對(duì)象。這就體現(xiàn)出了這種設(shè)計(jì)的好處,不管里面的Disposable怎么更換,傳遞給下游的是這個(gè)SequentialDisposable,下游只需要調(diào)SequentialDisposable的dispose就將其里面的Disposable給取消掉了,而不用管里面的Disposable究竟是誰。

下面咱們來看SingleDelay里面的內(nèi)部類Delay(觀察者)

final class Delay implements SingleObserver<T> {
    //傳遞給下游的Disposable
    private final SequentialDisposable sd;
    //下游的觀察者
    final SingleObserver<? super T> downstream;

    Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {
        this.sd = sd;
        this.downstream = observer;
    }

    @Override
    public void onSubscribe(Disposable d) {
        //開始訂閱的時(shí)候,sd內(nèi)部的Disposable是上游給過來的
        sd.replace(d);
    }

    @Override
    public void onSuccess(final T value) {
        //上游把數(shù)據(jù)給過來之后,就不用管上游了,直接把sd里面Disposable 設(shè)置成線程調(diào)度器給回來那個(gè)
        //因?yàn)榇藭r(shí)下游調(diào)用dispose的話,直接取消調(diào)度器里面的任務(wù)就行了
        //巧妙地將sd里面的Disposable掉包了
        sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
    }

    @Override
    public void onError(final Throwable e) {
        sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
    }

    final class OnSuccess implements Runnable {
        private final T value;

        OnSuccess(T value) {
            this.value = value;
        }

        @Override
        public void run() {
            //調(diào)度器執(zhí)行到該任務(wù)時(shí),將數(shù)據(jù)傳遞給下游
            downstream.onSuccess(value);
        }
    }

    final class OnError implements Runnable {
        private final Throwable e;

        OnError(Throwable e) {
            this.e = e;
        }

        @Override
        public void run() {
            downstream.onError(e);
        }
    }
}

這段代碼比較精彩,首先在上游訂閱Delay的時(shí)候,觸發(fā)onSubscribe,Delay內(nèi)部隨即將該Disposable存入SequentialDisposable對(duì)象(需要注意的是下游拿到的Disposable始終是這個(gè)SequentialDisposable)中。此時(shí)如果下游調(diào)用dispose,也就是調(diào)用SequentialDisposable的dispose,也就是上游的dispose,dispose流程在這個(gè)節(jié)點(diǎn)上就完成了,向上傳遞。

上游有數(shù)據(jù)了,通過onSuccess傳遞給觀察者Delay的時(shí)候,SequentialDisposable就可以不用管上游的那個(gè)Disposable了,此時(shí)要關(guān)心的是傳遞給線程調(diào)度器里面的任務(wù)的取消事件了。所以直接將調(diào)度器返回的Disposable替換到SequentialDisposable內(nèi)部,此時(shí)下游進(jìn)行取消時(shí),就直接把任務(wù)給取消掉了。

當(dāng)調(diào)度器執(zhí)行到任務(wù)OnSuccess時(shí),就把數(shù)據(jù)傳遞給下游,這個(gè)節(jié)點(diǎn)的任務(wù)就完成了。

5.5 Observable.map 有后續(xù),無延遲

Observable.map所對(duì)應(yīng)的是ObservableMap,直接上代碼:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        //t是下游的觀察者
        //source是上游
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Throwable {
            T t = qd.poll();
            return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

在subscribeActual中并沒有直接調(diào)用onSubscribe,而MapObserver中又沒有這個(gè)方法,那onSubscribe肯定是在其父類中完成的。在看onSubscribe之前咱干脆先把onNext理一下,這里通過mapper.apply轉(zhuǎn)一下之后馬上就交給下游的onNext去了。

//BasicFuseableObserver.java
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
    public BasicFuseableObserver(Observer<? super R> downstream) {
        this.downstream = downstream;
    }
    @Override
    public final void onSubscribe(Disposable d) {
        //驗(yàn)證上游   d是上游的Disposable   upstream是當(dāng)前類的字段,還沒有被賦值
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            if (d instanceof QueueDisposable) {
                this.qd = (QueueDisposable<T>)d;
            }
            //onSubscribe之前想做點(diǎn)什么事情的話,在beforeDownstream里面做
            if (beforeDownstream()) {
                //調(diào)用下游的onSubscribe
                downstream.onSubscribe(this);
                //onSubscribe之后想做點(diǎn)什么事情的話,在afterDownstream里面做
                afterDownstream();
            }

        }
    }
    protected boolean beforeDownstream() {
        return true;
    }
    protected void afterDownstream() {
    }
    @Override
    public void dispose() {
        upstream.dispose();
    }
}

//DisposableHelper.java
public static boolean validate(Disposable current, Disposable next) {
    if (next == null) {
        RxJavaPlugins.onError(new NullPointerException("next is null"));
        return false;
    }
    if (current != null) {
        next.dispose();
        reportDisposableSet();
        return false;
    }
    return true;
}

還是先調(diào)用下游的onSubscribe,不過,并沒有將上游的Disposable直接傳給下游,而是將中間節(jié)點(diǎn)BasicFuseableObserver自己傳給了下游,同時(shí)將上游的Disposable存儲(chǔ)起來,方便待會(huì)兒dispose。

5.6 Observable.delay 無后續(xù),有延遲

Observable.delay 對(duì)應(yīng)的是ObservableDelay

public final class ObservableDelay<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    @SuppressWarnings("unchecked")
    public void subscribeActual(Observer<? super T> t) {
        Observer<T> observer;
        if (delayError) {
            observer = (Observer<T>)t;
        } else {
            observer = new SerializedObserver<>(t);
        }
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new DelayObserver<>(observer, delay, unit, w, delayError));
    }
}

在subscribeActual沒有調(diào)用下游的onSubscribe,那說明是在DelayObserver中完成的

static final class DelayObserver<T> implements Observer<T>, Disposable {
    final Scheduler.Worker w;
    Disposable upstream;

    DelayObserver(Observer<? super T> actual, long delay, TimeUnit unit, Worker w, boolean delayError) {
        super();
        this.downstream = actual;
        this.w = w;
        ...
    }

    @Override
    public void onSubscribe(Disposable d) {
        //1. 先驗(yàn)證一下上游  然后將上游的Disposable賦值給upstream
        //2. 調(diào)用下游的onSubscribe,把自己傳給下游
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onNext(final T t) {
        //OnNext任務(wù)提交給調(diào)度器執(zhí)行->在執(zhí)行任務(wù)時(shí)調(diào)用下游的onNext方法
        w.schedule(new OnNext(t), delay, unit);
    }

    @Override
    public void onError(final Throwable t) {
        w.schedule(new OnError(t), delayError ? delay : 0, unit);
    }

    @Override
    public void onComplete() {
        w.schedule(new OnComplete(), delay, unit);
    }

    @Override
    public void dispose() {
        //同時(shí)取消上游的Disposable和自己執(zhí)行的調(diào)度器任務(wù)
        upstream.dispose();
        w.dispose();
    }

    final class OnNext implements Runnable {
        private final T t;

        OnNext(T t) {
            this.t = t;
        }

        @Override
        public void run() {
            downstream.onNext(t);
        }
    }
    ...
}

onXxx的所有操作都放到了DelayObserver里面來完成,在上游調(diào)用到這節(jié)的onSubscribe時(shí),先驗(yàn)證一下上游 然后將上游的Disposable賦值給upstream,調(diào)用下游的onSubscribe,把自己傳給下游。

當(dāng)下游調(diào)用dispose時(shí),在DelayObserver的dispose方法中將上游的Disposable給取消掉,然后把自己的調(diào)度器任務(wù)也給取消掉。

事件的傳遞:當(dāng)上游調(diào)用到這一節(jié)的onNext時(shí),OnNext任務(wù)(Runnable)提交給調(diào)度器執(zhí)行->在執(zhí)行任務(wù)時(shí)調(diào)用下游的onNext方法。

6. 線程切換

線程切換是RxJava的另一個(gè)重要功能。

6.1 subscribeOn

subscribeOn在Single場景下對(duì)應(yīng)的是SingleSubscribeOn這個(gè)類

public final class SingleSubscribeOn<T> extends Single<T> {
    final Scheduler scheduler;

    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }
    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
        observer.onSubscribe(parent);
        
        //切線程
        Disposable f = scheduler.scheduleDirect(parent);

        parent.task.replace(f);

    }
}

直接看subscribeActual方法,很明顯是將parent這個(gè)任務(wù)交給了線程調(diào)度器去執(zhí)行。那我們直接看SubscribeOnObserver的run方法即可

static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
    @Override
    public void run() {
        source.subscribe(this);
    }
}

在scheduleDirect那里切線程,然后在另一個(gè)線程中去執(zhí)行source.subscribe(this),也就是在Scheduler指定的線程里啟動(dòng)subscribe(訂閱)。

  • 切換起源Observable的線程
  • 當(dāng)多次調(diào)用subscribeOn()的時(shí)候,只有最上面的會(huì)對(duì)起源Observable起作用

observeOn

observeOn在Single場景下的類是SingleObserveOn。它的subscribeActual方法如下:

@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
    source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
}

上游訂閱了ObserveOnSingleObserver這個(gè)觀察者,核心就在這個(gè)觀察者里面。

static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
    private static final long serialVersionUID = 3528003840217436037L;

    final SingleObserver<? super T> downstream;

    final Scheduler scheduler;

    T value;
    Throwable error;

    ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
        this.downstream = actual;
        this.scheduler = scheduler;
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onSuccess(T value) {
        this.value = value;
        Disposable d = scheduler.scheduleDirect(this);
        DisposableHelper.replace(this, d);
    }

    @Override
    public void onError(Throwable e) {
        this.error = e;
        Disposable d = scheduler.scheduleDirect(this);
        DisposableHelper.replace(this, d);
    }

    @Override
    public void run() {
        Throwable ex = error;
        if (ex != null) {
            downstream.onError(ex);
        } else {
            downstream.onSuccess(value);
        }
    }
    ...
}

我們重點(diǎn)關(guān)注一下onSuccess和onError方法,核心就是將當(dāng)前這個(gè)Runnable任務(wù)交給scheduler進(jìn)行執(zhí)行,而這里的scheduler是由使用者傳入的,比如說是AndroidSchedulers.mainThread()。那么在run方法執(zhí)行時(shí),就會(huì)在主線程中,那么在主線程中執(zhí)行下游的onError和onSuccess。 這里通過Scheduler指定的線程來調(diào)用下級(jí)Observer的對(duì)應(yīng)回調(diào)方法。

  • 切換observeOn下面的Observer的回調(diào)所在的線程
  • 當(dāng)多次調(diào)用observerOn()的時(shí)候,每個(gè)都好進(jìn)行一次線程切換,影響范圍是它下面的每個(gè)Observer(除非又遇到新的obServeOn())

6.2 Scheduler的原理

上面我們多次提到Scheduler,但是一直不知道它具體是什么。其實(shí)它就是用來控制控制線程的,用于將指定的邏輯在指定的線程中執(zhí)行。這里就不帶著大家讀源碼了,篇幅過于長了,這塊源碼也比較簡單,感興趣的讀者可以去翻閱一下。下面是幾個(gè)核心點(diǎn)。

其中Schedulers.newThread()里面是創(chuàng)建了一個(gè)線程池Executors.newScheduledThreadPool(1, factory)來執(zhí)行任務(wù),但是這個(gè)線程池里面的線程不會(huì)得到重用,每次都是新建的線程池。當(dāng) scheduleDirect() 被調(diào)用的時(shí)候,會(huì)創(chuàng)建一個(gè) Worker,Worker 的內(nèi)部 會(huì)有一個(gè) Executor,由 Executor 來完成實(shí)際的線程切換;scheduleDirect() 還會(huì)創(chuàng)建出一個(gè) Disposable 對(duì)象,交給外層的 Observer,讓它能執(zhí)行 dispose() 操作,取消訂閱鏈;

Schedulers.io()和Schedulers.newThread()差別不大,但是io()這兒線程可能會(huì)被重用,所以一般io()用得多一些。

AndroidSchedulers.mainThread()就更簡單了,直接使用Handler進(jìn)行線程切換,將任務(wù)放到主線程去做,不管再怎么花里胡哨的庫,最后要切到主線程還得靠Handler。

7. 小結(jié)

Rxjava由于其基于事件流的鏈?zhǔn)秸{(diào)用、邏輯簡潔 & 使用簡單的特點(diǎn),深受各大 Android開發(fā)者的歡迎。平時(shí)在項(xiàng)目中也使用得比較多,所以本文對(duì)RxJava3中的訂閱流程、取消流程、線程切換進(jìn)行了核心源碼分析,希望能幫助到各位。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 基本使用 添加依賴 定義Api請(qǐng)求接口倉庫 發(fā)起網(wǎng)絡(luò)請(qǐng)求 框架結(jié)構(gòu) RxJava的整體結(jié)構(gòu)是一條鏈 鏈的最上游:生...
    Hsicen閱讀 1,615評(píng)論 0 1
  • RxJava 相信各位已經(jīng)使用了很久,大部分人在剛學(xué)習(xí) RxJava 感嘆切換線程的方便,調(diào)用邏輯清晰的同時(shí),并不...
    連續(xù)三屆村草閱讀 690評(píng)論 0 2
  • Rxjava 框架結(jié)構(gòu) RxJava 的整體結(jié)構(gòu)是一條鏈,其中: 鏈的上游:生產(chǎn)者 Observable 鏈的最下...
    夏宇A(yù)I實(shí)戰(zhàn)筆記閱讀 1,062評(píng)論 0 2
  • 創(chuàng)建操作符 操作符使用 基本創(chuàng)建create() 完整創(chuàng)建1個(gè)被觀察者對(duì)象(Observable) 快速創(chuàng)建,發(fā)送...
    帝王鯊kingcp閱讀 1,593評(píng)論 0 1
  • 前言 自此文章起,逐層邁入RxJava2源碼世界,探索Rx思想。此前,需要對(duì)Rx有簡單了,起碼曾使用過。對(duì)于必要的...
    MxsQ閱讀 1,433評(píng)論 0 1

友情鏈接更多精彩內(nèi)容