RxJava2原理解析

基本使用

添加依賴

//retrofit 依賴
implementation 'com.squareup.retrofit2:retrofit:2.6.0'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.6.0'
implementation 'com.squareup.retrofit2:converter-gson:2.6.0'

//RxJava依賴
implementation 'io.reactivex.rxjava2:rxjava:2.2.8'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

定義Api請求接口倉庫

interface ApiStore {
    @GET("/users/{user}/repos")
    fun listRepos(@Path("user") user: String): Single<Any>
}

發(fā)起網(wǎng)絡(luò)請求

val mRetrofit = Retrofit.Builder()
    .baseUrl("https://api.github.com/")
    .addConverterFactory(GsonConverterFactory.create())
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
    //默認(rèn)所有訂閱都在IO線程中執(zhí)行
    //.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io())) 
    .build()

val apiStore = mRetrofit.create(ApiStore::class.java)
apiStore.listRepos("hsicen")
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object : SingleObserver<Any> {
        override fun onSuccess(t: Any) {
            Log.d("hsc", t.toString())
            tv_info.text = t.toString()
        }
        
        override fun onSubscribe(d: Disposable) {
            //獲取可取消對象,方便后續(xù)取消請求
        }

        override fun onError(e: Throwable) {
            Log.d("hsc", "請求失敗")
            tv_info.text = "請求失敗"
        }
    })

框架結(jié)構(gòu)

RxJava的整體結(jié)構(gòu)是一條鏈

  1. 鏈的最上游:生產(chǎn)者(被觀察者) Observable/Single/Flowable/Maybe
  2. 鏈的最下游:消費(fèi)者(觀察者) Observer/SingleObserver/Subscriber/MaybeObserver
  3. 鏈的中間:各個(gè)中介節(jié)點(diǎn),即是下游的 Observable,又是上游的 Observer,連接(訂閱) Subscribe

原理分析

先來看看簡單的使用 Single.just(xxx)

Single.just(1)
    .subscribe(object : SingleObserver<Int> {
        override fun onSuccess(t: Int) {
            tv_info.text = "$t"
        }

        override fun onSubscribe(d: Disposable) {
            tv_info.text = "開始"
        }

        override fun onError(e: Throwable) {
            tv_info.text = "出錯(cuò)"
        }
    })

這里我們利用Single.just(),在上游發(fā)送了一個(gè)簡單的1,下游訂閱這個(gè)事件,在onSuccess()中接收到事件后然后打印出來;由于Single沒有后續(xù)事件,所以只有開始訂閱onSubscribe,成功onSuccess,失敗onError三個(gè)方法;可以看到上面的代碼并沒有進(jìn)行線程的切換,所有事件的發(fā)生都是在當(dāng)前線程中進(jìn)行的,也就是UI線程

現(xiàn)在我們點(diǎn)進(jìn)subscribe中,看看上游和下游是怎樣連接起來的

public final void subscribe(SingleObserver<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    observer = RxJavaPlugins.onSubscribe(this, observer);
    ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

    try {
        subscribeActual(observer);
    } catch (NullPointerException ex) {
        throw ex;
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        NullPointerException npe = new NullPointerException("subscribeActual failed");
        npe.initCause(ex);
        throw npe;
    }
}

可以看到在這個(gè)方法中起主要作用的就一句代碼 subscribeActual(observer), 所以我們只需要找到這個(gè)方法,看它里面做了什么操作,就知道是怎樣連接起來的了,下面我們就點(diǎn)進(jìn)這個(gè)方法

protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);

這是一個(gè)抽象方法,沒有具體的實(shí)現(xiàn),但是我們可以發(fā)現(xiàn),這是Single這個(gè)類的抽象方法,所以們只需要找到這個(gè)類的實(shí)現(xiàn)類,也就可以找到這個(gè)抽象方法的具體實(shí)現(xiàn);但是先別忙,我們不是還有一句代碼沒有看么,那我們先看看Single.just()做了什么操作

public static <T> Single<T> just(final T item) {
    ObjectHelper.requireNonNull(item, "item is null");
    return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}

這個(gè)方法沒有做什么操作,最主要的就是最后一句代碼,返回了一個(gè)SingleJust對象,這個(gè)SingleJust應(yīng)該就是Just的實(shí)現(xiàn)類,現(xiàn)在我們點(diǎn)進(jìn)這個(gè)類

public final class SingleJust<T> extends Single<T> {
    final T value;
    
    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }

}

可以看到在這個(gè)類的subscribeActual方法中,直接調(diào)用了下游的onSubscribe()和onSuccess()方法,onError方法都不需要調(diào)用,而且在訂閱的時(shí)候調(diào)用的是Disposables.disposed(),應(yīng)該是要返回一個(gè)可取消訂閱的對象,那么點(diǎn)進(jìn)這個(gè)方法,看看返回的是什么對象

public static Disposable disposed() {
    return EmptyDisposable.INSTANCE;
}


public enum EmptyDisposable implements QueueDisposable<Object> {
    INSTANCE,NEVER;

    @Override
    public void dispose() {
        // no-op
    }

    @Override
    public boolean isDisposed() {
        return this == INSTANCE;
    }
}

通過源碼可以看到返回的是一個(gè)EmptyDisposable,這個(gè)Disposable在創(chuàng)建時(shí)就默認(rèn)已經(jīng)取消了,所以Single.just(),一個(gè)沒有后續(xù)操作的事件,流程大致如下:

23rxjava1.jpg

SingleJust just = Single.just(1); 創(chuàng)建被觀察者
just.subscribe(observer); 訂閱(連接觀察者和被觀察者)
just.subscribeActual(observer); 核心操作

接下來看一個(gè)復(fù)雜一點(diǎn)的,有后續(xù)操作的事件 Observable.interval()

Observable.interval(1, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object : Observer<Long> {
        override fun onComplete() {
            tv_info.text = "結(jié)束"
        }

        override fun onSubscribe(d: Disposable) {
            Log.d("hsc", " 線程: " + Thread.currentThread().name)
            tv_info.text = "開始"
        }

        override fun onNext(t: Long) {
            Log.d("hsc", " 線程: " + Thread.currentThread().name)
            tv_info.text = "$t"
        }

        override fun onError(e: Throwable) {
            tv_info.text = "出錯(cuò)"
        }

    })

上面代碼的功能是每個(gè)一秒發(fā)送一個(gè)事件,下游接收到這個(gè)事件后打印出來;那么我們分析源碼還是先從subscribe()方法切入

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);       
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

可以發(fā)現(xiàn),起主要作用的還是 subscribeActual(observer)方法,這個(gè)方法同樣是Observable的抽象方法,所以,下面需要從另一方向切入,看看Observable.interval()做了什么操作

public static Observable<Long> interval(long period, TimeUnit unit) {
    return interval(period, period, unit, Schedulers.computation());
}

可以看到這里進(jìn)行了一層包裝,而且為我們切換了線程,這也是為什么上面我們調(diào)用了observeOn(),主動(dòng)進(jìn)行了線程切換的原因,繼續(xù)點(diǎn)進(jìn)去

public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
    ObjectHelper.requireNonNull(unit, "unit is null");
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");

    return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}

在這個(gè)interval()方法中,就和上面的Single.just()類似了,給我們返回了一個(gè)ObservableInterval類,而且對我們的傳進(jìn)來的參數(shù)進(jìn)行了兼容性處理,現(xiàn)在我們就點(diǎn)進(jìn)去看subscribeActual()所做的處理

public void subscribeActual(Observer<? super Long> observer) {
    IntervalObserver is = new IntervalObserver(observer);
    observer.onSubscribe(is);

    Scheduler sch = scheduler;

    if (sch instanceof TrampolineScheduler) {
        Worker worker = sch.createWorker();
        is.setResource(worker);
        worker.schedulePeriodically(is, initialDelay, period, unit);
    } else {
        Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
        is.setResource(d);
    }
}

首先將下游observer進(jìn)行了包裝,創(chuàng)建了一個(gè)IntervalObserver,這個(gè)IntervalObserver是一個(gè)可取消對象,實(shí)現(xiàn)了Disposable, Runnable接口, 然后調(diào)用了下游的訂閱方法,把這個(gè)可取消對象傳了過去;然后就分支判斷,除非主動(dòng)設(shè)置,一般情況下都會走默認(rèn)的else分支;在else分支中先調(diào)用Scheduler的方法進(jìn)行了線程切換,后面有專門講Scheduler的原理,這里只簡單的講一下這行代碼的作用,就是進(jìn)行線程切換,最后調(diào)用is.setResource(d)方法,這個(gè)方法的作用先不分析,現(xiàn)在只需要記住有這個(gè)方法,后面會返回來分析這個(gè)方法的作用

現(xiàn)在我們點(diǎn)進(jìn)IntervalObserver,看看它的后臺任務(wù)(run)是怎樣執(zhí)行的

static final class IntervalObserver extends AtomicReference<Disposable> implements Disposable, Runnable {
    private static final long serialVersionUID = 346773832286157679L;
    final Observer<? super Long> downstream;
    long count;

    IntervalObserver(Observer<? super Long> downstream) {
        this.downstream = downstream;
    }
    
    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }
    
    @Override
    public boolean isDisposed() {
        return get() == DisposableHelper.DISPOSED;
    }
    
    @Override
    public void run() {
        if (get() != DisposableHelper.DISPOSED) {
            downstream.onNext(count++);
        }
    }
    
    public void setResource(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }
}

可以看到IntervalObserver繼承自AtomicReference,實(shí)現(xiàn)了Disposable和Runnable接口,在run方法中先判斷是否已經(jīng)取消了訂閱,若沒有取消訂閱,會調(diào)用下游的onNext()方法,然后count加1;可以看到可取消對象都是通過DisposableHelper來管理的,包括我們剛才的setResource(d),這個(gè)setResource()方法只是一層包裝,里面是設(shè)置給DisposableHelper的,然后取消時(shí)也是通過DisposableHelper來取消的,這個(gè)可取消對象繼承自AtomicReferenc,是線程安全的,總結(jié)Observable.interval()流程如下:

訂閱過程:Observable.interval() -> ObservableInterval.subscribe(observer) -> subscribeActual(observer) -> IntervalObserver.run() -> observer.onNext()

取消訂閱過程:IntervalObserver.setResource(d) -> DisposableHelper處理

操作符分析

先來看一下沒有后續(xù)操作事件的操作符

Single.just(1)
    .map { it + 3 }
    .subscribe(object : SingleObserver<Int> {
        override fun onSuccess(t: Int) {
            tv_info.text = "$t"
        }

        override fun onSubscribe(d: Disposable) {
            tv_info.text = "開始"
        }

        override fun onError(e: Throwable) {
            tv_info.text = "出錯(cuò)"
        }
    })

前面已經(jīng)分析了just和subscribe所做的事情,現(xiàn)在我們點(diǎn)進(jìn)map

public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}

可以看到創(chuàng)建了一個(gè)新的對象SingleMap,傳進(jìn)去了Single對象和map的操作邏輯函數(shù),當(dāng)下游的Observer調(diào)用subscribeActual()方法時(shí),就會調(diào)用SingleMap的subscribeActual()方法

public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;
    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }
}

可以看到subscribeActual()方法中,調(diào)用了source的subscribe()方法,這個(gè)Source就是我們Single.just()創(chuàng)建的SingleJust,而且將下游的Observer進(jìn)行了一層包裝,創(chuàng)建了一個(gè)MapSingleObserver,在SingleJust的subscribe()方法中會調(diào)用subscribeActual()方法,然后流程就和上面沒有操作符的流程一樣了

現(xiàn)在還需要弄清楚是如何將數(shù)據(jù)傳遞給下游的Observer的,那么就需要弄清楚MapSingleObserver做了什么操作

static final class MapSingleObserver<T, R> implements SingleObserver<T> {
    final SingleObserver<? super R> t;
    final Function<? super T, ? extends R> mapper;
        
    MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
        this.t = t;
        this.mapper = mapper;
    }

    @Override
    public void onSubscribe(Disposable d) {
        t.onSubscribe(d);
    }

    @Override
    public void onSuccess(T value) {
        R v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
        t.onSuccess(v);
    }

    @Override
    public void onError(Throwable e) {
        t.onError(e);
    }
}

可以看到MapSingleObserver中,除了在onSuccess()中將數(shù)據(jù)進(jìn)行轉(zhuǎn)換外,其它都是將事件直接傳遞給下游的Observer的,下面簡單總結(jié)一下事件的流程:

23rxjava2.jpg

上游:
Single.just() -> 創(chuàng)建SingleJust對象
SingleJust.map(mapper) -> 創(chuàng)建SingleMap(SingleJust, mapper)對象
SingleMap.subscribe(Observer) -> 鏈接上下游發(fā)生訂閱
SingleMap.subscribeActual() -> 在訂閱方法subscribe()中調(diào)用
SingleJust.subscribe(MapSingleObserver(Observer,mapper))
SingleJust.subscribeActual(Observer) -> 發(fā)送事件到下游

下游:
SingleMap.subscribe(Observer) -> 鏈接上游和下游
SingleJust.subscribe(MapSingleObserver(Observer,mapper))
SingleJust.subscribeActual(MapSingleObserver) -> 發(fā)送事件到中轉(zhuǎn)Observer
MapSingleObserver 中轉(zhuǎn)SingleJust的事件到下游Observer

如果有多個(gè)操作會怎么樣呢?有了上面的分析,其實(shí)我們可以發(fā)現(xiàn)當(dāng)存在多個(gè)操作符時(shí),首先上游基于每個(gè)操作符都會創(chuàng)建一個(gè)新的Observable,在新的Observable的subscribeActual()方法中回調(diào)用source的subscribe()方法;然后下游Observer會封裝自己subscribe()方法傳進(jìn)來的Observer,創(chuàng)建一個(gè)新的Observer,這個(gè)新的Observer充當(dāng)?shù)氖且粋€(gè)中轉(zhuǎn)的角色,它會把自己source傳遞過來的事件傳遞給通過subscribe傳遞過來的下游Observer;所以Observable是一層一層的往上傳,而Observer是一層一層的往下傳:

23rxjava3.jpg

對于有后續(xù)操作的事件,其流程也是一樣的,每一個(gè)操作符都會創(chuàng)建新的Observable對象和Observer對象,用于鏈接上游和下游,傳遞事件

Disposable 原理分析

這個(gè)模塊主要分析一下訂閱的取消流程,訂閱的取消要分多種情況,根據(jù)下面幾種分類來分析一下

沒延遲,沒后續(xù)操作

這種情況是最簡單的,直接就是Single.just(xxx).subscribe(Observer)

在這種情況下,當(dāng)發(fā)生訂閱時(shí),會調(diào)用Disposables.disposed()返回一個(gè)已經(jīng)取消訂閱的Disposable對象

沒延遲,有后續(xù)操作

這種情況就是我們上面提到的Observable.interval()

在這種情況下,會創(chuàng)建一個(gè)IntervalObserver和一個(gè)后臺執(zhí)行onNext操作的Worker對象,當(dāng)發(fā)生訂閱時(shí)會把這個(gè)可取消對象傳遞給下游,下游調(diào)用dispose()取消訂閱時(shí),會調(diào)用DisposableHelper.dispose()來處理取消訂閱操作;首先會把DisposableHelper置為DISPOSED的狀態(tài),然后把自己內(nèi)部創(chuàng)建的worker取消掉(這個(gè)Worker是執(zhí)行后續(xù)onNext操作的Worker)

有延遲,沒后續(xù)操作

這種情況相當(dāng)于給第一種情況加上了delay()操作符(會自動(dòng)切換線程)

在這種情況下,會創(chuàng)建一個(gè)SequentialDisposable對象,然后在訂閱時(shí)把這個(gè)可取消對象傳給下游,下游拿到這個(gè)可取消對象就可以自由操作了;當(dāng)觸發(fā)成功和失敗事件時(shí),會創(chuàng)建一個(gè)DisposeTask,利用Scheduler延時(shí)發(fā)給下游Observer,并調(diào)用SequentialDisposable的replace()來替換掉之前的Disposable可取消對象,當(dāng)下游調(diào)用dispose()時(shí),會交由DisposableHelper.dispose()來處理取消訂閱操作;由于有延時(shí),會創(chuàng)建Worker對象來處理延時(shí)操作,當(dāng)調(diào)用DisposableHelper.dispose()時(shí),首先會把DisposableHelper置為DISPOSED的狀態(tài),然后把處理延時(shí)操作的Worker取消掉

有延時(shí),有后續(xù)操作

這種情況相當(dāng)于給第二種情況加上了delay()操作符

在這種情況下,默認(rèn)會創(chuàng)建一個(gè)SerializedObserver和一個(gè)后臺執(zhí)行onNext操作的Worker對象;當(dāng)發(fā)生訂閱時(shí)直接調(diào)用下游的onSubscribe(),onNext(),onError(),onComplete()都會交由Worker進(jìn)行延遲下發(fā);當(dāng)調(diào)用dispose()取消訂閱時(shí),會調(diào)用上游的dispose()和自己內(nèi)部Worker的取消

線程切換原理分析

RxJava有兩個(gè)線程切換方法,subscribeOn()observerOn(),這兩個(gè)方法各有用處,下面就來分析一下這兩個(gè)方法所做的事情

subscribeOn

功能:在Scheduler指定的線程里啟動(dòng)訂閱 subscribe()

效果:

  • 切換起源的 Observable 線程
  • 當(dāng)多次調(diào)用 subscribeOn() 的時(shí)候,只有第一個(gè)subscribeOn()會對起源的 Observable 起作用;后續(xù)的subscribeOn()會影響onSubscribe()的調(diào)用線程

單次調(diào)用subscribeOn()的大致流程如下:

23rxjava4.jpg

多次調(diào)用subscribeOn()的大致流程如下:

23rxjava5.jpg

當(dāng)帶有操作符的多次調(diào)用subscribeOn()的大致流程如下:

23rxjava6.jpg
observeOn

功能: 在內(nèi)部創(chuàng)建的 Observer的 onNext(), onError(), onSuccess()/onComplete() 等回調(diào)方法里,通過Scheduler 指定的線程來調(diào)用下級Observer的對應(yīng)回調(diào)方法

效果:

  • 切換 observeOn() 下面的 Observer 的回調(diào)所在的線程
  • 當(dāng)多次調(diào)用 observeOn() 的時(shí)候,每個(gè)都會進(jìn)行一次線程切換,影響范圍是它下面的每個(gè)Observer

流程大致如下:

23rxjava7.jpg

Scheduler原理

這里主要涉及到兩個(gè)類,Schedulers 和 AndroidSchedulers

  1. Schedulers.newThread() 和 Schedulers.io(),Schedulers.computation()

    • 當(dāng)scheduleDirect() 被調(diào)用時(shí),會創(chuàng)建一個(gè)Worker,Worker的內(nèi)部會有一個(gè)Executor,由Executor來完成實(shí)際的線程切換操作
    • scheduleDirect() 還會創(chuàng)建出一個(gè)Disposable 對象,交給外層的Observer,讓它能夠執(zhí)行dispose()操作,取消訂閱鏈
    • newThread() 和 io() 的區(qū)別在于,io是創(chuàng)建的緩存池,可能會對Executor進(jìn)行重用;computation創(chuàng)建的是線程池
  2. AndroidSchedulers.mainThread()

    • 創(chuàng)建Handler,獲取到mainLooper,發(fā)送消息到主線程執(zhí)行
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容