文章首發(fā)于我建立的一個(gè)筆記倉庫
1. 背景
RxJava是一個(gè)基于事件流、實(shí)現(xiàn)異步操作的庫。
官方介紹: RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
(RxJava 是一個(gè)在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫)
文中用到的RxJava源碼版本為3.0.13,文中的demo源碼 https://github.com/xfhy/AllInOne/tree/master/app/src/main/java/com/xfhy/allinone/opensource/rxjava
2. 基礎(chǔ)使用
簡單介紹一下如何與Retrofit結(jié)合使用。引入:
implementation "io.reactivex.rxjava3:rxjava:3.0.13"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation "com.github.akarnokd:rxjava3-retrofit-adapter:3.0.0"
//Retrofit
implementation "com.squareup.retrofit2:retrofit:2.9.0"
//可選
implementation "com.squareup.retrofit2:converter-gson:2.9.0"
構(gòu)建Retrofit實(shí)例
private val retrofit by lazy {
Retrofit.Builder()
.baseUrl("https://www.wanandroid.com")
//使用Gson解析
.addConverterFactory(GsonConverterFactory.create())
//轉(zhuǎn)換器 RxJava3 每次執(zhí)行的時(shí)候在IO線程
.addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io()))
.build()
}
定義Retrofit的API:
interface WanAndroidService {
@GET("wxarticle/chapters/json")
fun listReposByRxJava(): Single<WxList?>
}
class WxList {
var errorMsg = ""
var errorCode = -1
var data = mutableListOf<Wx>()
class Wx {
var id: Int = 0
var name: String = ""
}
}
請(qǐng)求網(wǎng)絡(luò):
fun reqNet() {
val request = retrofit.create(WanAndroidService::class.java)
val call = request.listReposByRxJava()
call.observeOn(AndroidSchedulers.mainThread()).subscribe(object : SingleObserver<WxList?> {
override fun onSubscribe(d: Disposable?) {
tvContent.text = "開始請(qǐng)求網(wǎng)絡(luò)"
}
override fun onSuccess(t: WxList?) {
t?.let {
tvContent.text = it.data[0].name
}
}
override fun onError(e: Throwable?) {
tvContent.text = "網(wǎng)絡(luò)出錯(cuò)"
}
})
}
這樣,一個(gè)簡單的Retrofit與OKHttp的結(jié)合案例就完成了?,F(xiàn)在請(qǐng)求網(wǎng)絡(luò)的時(shí)候就可以使用RxJava那些鏈?zhǔn)讲僮髁恕?/p>
3. just : 最簡單的訂閱關(guān)系
先從最簡單的just開始,看一下RxJava的訂閱關(guān)系是怎么樣的。
val just: Single<Int> = Single.just(1)
just.subscribe(object : SingleObserver<Int> {
override fun onSubscribe(d: Disposable?) {
}
override fun onSuccess(t: Int) {
}
override fun onError(e: Throwable?) {
}
})
Single.just(1)會(huì)構(gòu)建一個(gè)SingleJust實(shí)例出來,
//Single.java
public static <@NonNull T> Single<T> just(T item) {
Objects.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}
其中RxJavaPlugins.onAssembly是一個(gè)鉤子,不用在意,這段代碼就是返回一個(gè)SingleJust對(duì)象。
點(diǎn)進(jìn)去看一下subscribe是怎么走的
//Single.java
@Override
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
...
subscribeActual(observer);
...
}
核心代碼就一句,調(diào)用subscribeActual方法,從名字看是進(jìn)行實(shí)際地訂閱。那么我們將目光聚焦到subscribeActual里面,它是一個(gè)抽象方法,就上面的demo而言其實(shí)際實(shí)現(xiàn)是剛才創(chuàng)建出來的SingleJust。
//Single.java
protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);
//SingleJust.java
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
}
SingleJust里面的代碼非常簡潔,在實(shí)際訂閱(調(diào)用subscribeActual)時(shí),直接將傳進(jìn)來的觀察者(也就是上面?zhèn)魅氲腟ingleObserver)回調(diào)onSubscribe和onSuccess就完事了。此處沒有onError,因?yàn)椴粫?huì)失敗。
4. map 操作符
4.1 原理
我們知道,RxJava中map可以轉(zhuǎn)換數(shù)據(jù),看一下它是怎么做到的
val singleInt = Single.just(1)
val singleString = singleInt.map(object : Function<Int, String> {
override fun apply(t: Int): String {
return t.toString()
}
})
singleString.subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable?) {
}
override fun onSuccess(t: String) {
}
override fun onError(e: Throwable?) {
}
})
點(diǎn)進(jìn)去map看一下:
//Single.java
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
構(gòu)建了一個(gè)SingleMap,有了上面just的經(jīng)驗(yàn),訂閱的時(shí)候是走的SingleMap的subscribeActual方法。直接去看:
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
}
注意一下這個(gè)source,它是啥?在構(gòu)造方法里面?zhèn)魅氲模簿褪窃赟ingle.java的map方法那里傳入的this,這個(gè)this也就是Single.just(1)所構(gòu)建出來的SingleJust對(duì)象。這個(gè)SingleJust也就是此處map的上游,上游把事件給下游。
此處訂閱時(shí),就是調(diào)一下上游的subscribe與自己綁定起來,完成訂閱關(guān)系。現(xiàn)在生產(chǎn)者是上游,而此處的SingleMap就是下游的觀察者。
MapSingleObserver,也就是map的觀察者,來看一下它是怎么實(shí)現(xiàn)的
public final class SingleMap<T, R> extends Single<R> {
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
//mapper是demo中傳入的object : Function<Int, String>
v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}
其實(shí)t是下游的觀察者,通過subscribeActual傳入。在上游調(diào)用map的onSubscribe同時(shí),map也向下傳遞這個(gè)事件,調(diào)用下游觀察者的onSubscribe。在上游調(diào)用map的onSuccess時(shí),map自己進(jìn)行轉(zhuǎn)換一下,再交給下游的onSuccess。同理,onError也是一樣的路線。
到這里就理清楚了。
4.2 框架結(jié)構(gòu)
RxJava的整體結(jié)構(gòu)是一條鏈,其中:
- 鏈的最上游:生產(chǎn)者Observable
- 鏈的最下游:觀察者Observer
- 鏈的中間:各個(gè)中介節(jié)點(diǎn),既是下游的Observable,又是上游的Observer
4.3 操作符Operator(map等)的本質(zhì)
- 基于原Observable創(chuàng)建一個(gè)新的Observable
- Observable內(nèi)部創(chuàng)建一個(gè)Observer
- 通過定制Observable的subscribeActual()方法和Observer的onXxx()方法,來實(shí)現(xiàn)自己的中介角色(例如數(shù)據(jù)轉(zhuǎn)換、線程切換等)
5. dispose工作原理
可以通過dispose()方法來讓上游或內(nèi)部調(diào)度器(或兩者都有)停止工作,達(dá)到「丟棄」的效果。
下面分別講一下這幾種情況:
- Single.just 無后續(xù),無延遲
- Observable.interval 有后續(xù),有延遲
- Single.map 無后續(xù),無延遲,有上下游
- Single.delay 無后續(xù),有延遲
- Observable.map 有后續(xù),無延遲
- Observable.delay 無后續(xù),有延遲
這幾種情況已經(jīng)足夠把所有dispose的情況都說明完整了。
5.1 Single.just 無后續(xù),無延遲
對(duì)于Single.just,情況比較簡單,在SingleJust的subscribeActual中,給觀察者一個(gè)全局共享的Disposable對(duì)象。下游不能對(duì)其進(jìn)行取消,因?yàn)殚g隔太短了,馬上就調(diào)用onSuccess了。
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
5.2 Observable.interval 有后續(xù),有延遲
先來一段示例代碼:
val longObservable: Observable<Long> = Observable.interval(0, 1, TimeUnit.SECONDS)
longObservable.subscribe(object : Observer<Long> {
override fun onSubscribe(d: Disposable?) {
}
override fun onNext(t: Long?) {
}
override fun onError(e: Throwable?) {
}
override fun onComplete() {
}
})
這里Observable.interval構(gòu)建的是ObservableInterval對(duì)象。有了前面的經(jīng)驗(yàn),直接進(jìn)去看ObservableInterval的subscribeActual方法。
//ObservableInterval.java
@Override
public void subscribeActual(Observer<? super Long> observer) {
//1. 創(chuàng)建觀察者(該觀察者還實(shí)現(xiàn)了Disposable)
IntervalObserver is = new IntervalObserver(observer);
observer.onSubscribe(is);
//線程調(diào)度器
Scheduler sch = scheduler;
...
//將is(它實(shí)現(xiàn)了Runnable)這個(gè)任務(wù)交給線程調(diào)度器去執(zhí)行,同時(shí)返回一個(gè)Disposable對(duì)象
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
is.setResource(d);
...
}
首先是創(chuàng)建了一個(gè)觀察者,該觀察者很明顯是實(shí)現(xiàn)了Disposable接口,因?yàn)閷⒃撚^察者順著onSubscribe傳遞給了下游,方便下游取消。隨后,將該觀察者交給線程調(diào)度器去執(zhí)行,顯然它還實(shí)現(xiàn)了Runnable接口,緊接著將調(diào)度器返回的Disposable對(duì)象設(shè)置給該觀察者。
static final class IntervalObserver
extends AtomicReference<Disposable>
implements Disposable, Runnable {
private static final long serialVersionUID = 346773832286157679L;
final Observer<? super Long> downstream;
long count;
//傳入的Observer是下游的
IntervalObserver(Observer<? super Long> downstream) {
this.downstream = downstream;
}
@Override
public void dispose() {
//取消自己
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
@Override
public void run() {
//通知下游
if (get() != DisposableHelper.DISPOSED) {
downstream.onNext(count++);
}
}
public void setResource(Disposable d) {
//設(shè)置Disposable給自己
DisposableHelper.setOnce(this, d);
}
}
IntervalObserver繼承自AtomicReference(AtomicReference類提供了一個(gè)可以原子讀寫的對(duì)象引用變量,避免出現(xiàn)線程安全問題),泛型是Disposable。同時(shí)它也實(shí)現(xiàn)了Disposable和Runnable。在構(gòu)造方法里面?zhèn)魅胂掠蔚挠^察者,方便待會(huì)兒把事件傳給下游。
當(dāng)事件一開始時(shí),將IntervalObserver傳遞給下游,因?yàn)樗鼘?shí)現(xiàn)了Disposable,可以被下游取消。然后將IntervalObserver傳遞給調(diào)度器,調(diào)度器會(huì)執(zhí)行里面的run方法,run方法里面是將數(shù)據(jù)傳遞給下游。在交給調(diào)度器的時(shí)候,返回了一個(gè)Disposable對(duì)象,意味著可以隨時(shí)取消調(diào)度器里面的該任務(wù)。然后將該Disposable對(duì)象設(shè)置給IntervalObserver的內(nèi)部,通過setResource方法,其實(shí)就是設(shè)置給IntervalObserver自己的,它本身就是一個(gè)AtomicReference<Disposable>。當(dāng)下游調(diào)用dispose時(shí),即調(diào)用IntervalObserver的dispose,然后IntervalObserver內(nèi)部隨即調(diào)用自己的dispose方法,完成了取消。
這里為什么設(shè)計(jì)的這么繞?直接將調(diào)度器返回的Disposable對(duì)象返回給下游不就可以了么,下游也可以對(duì)其進(jìn)行取消啊?這樣設(shè)計(jì)的好處是上游傳遞給下游的永遠(yuǎn)是IntervalObserver對(duì)象,下游直接拿著這個(gè)實(shí)現(xiàn)了Disposable的IntervalObserver對(duì)象可以直接調(diào)用它的dispose進(jìn)行取消。而不用管它內(nèi)部當(dāng)前是握著哪個(gè)Disposable對(duì)象,即使IntervalObserver內(nèi)部的Disposable被更換了也絲毫不影響下游對(duì)上游的取消操作。
5.3 Single.map 無后續(xù),無延遲,有上下游
先來個(gè)簡單例子
val singleInt = Single.just(1)
val singleString = singleInt.map(object : Function<Int, String> {
override fun apply(t: Int): String {
return t.toString()
}
})
singleString.subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable?) {
}
override fun onSuccess(t: String) {
}
override fun onError(e: Throwable?) {
}
})
singleInt.map點(diǎn)進(jìn)去
//Single.java
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
通過上面的例子我們知道,上游是創(chuàng)建了一個(gè)SingleJust對(duì)象。在調(diào)用map時(shí),將自己(也就是SingleJust)傳給下游SingleMap里面去了。
//SingleMap.java
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
//source是上游,通過構(gòu)造方法傳入進(jìn)來
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
//t是下游
//訂閱
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}
一開場就直接調(diào)用上游source訂閱MapSingleObserver這個(gè)觀察者。在MapSingleObserver的邏輯也比較簡單,就是實(shí)現(xiàn)了onSubscribe、onSuccess、onError這些方法。然后在上游調(diào)用onSubscribe時(shí)調(diào)用下游的onSubscribe;在上游調(diào)用onSuccess時(shí)自己做了一下mapper.apply(value)轉(zhuǎn)換操作,將數(shù)據(jù)轉(zhuǎn)換成下游所需要的,然后再調(diào)用下游的onSuccess傳遞給下游;onError同onSubscribe原理是一樣的。
5.4 Single.delay 無后續(xù),有延遲
來段示例代碼:
val singleInt: Single<Int> = Single.just(1)
val singleDelay: Single<Int> = singleInt.delay(1, TimeUnit.SECONDS)
val observer = object : SingleObserver<Int> {
override fun onSubscribe(d: Disposable?) {
log("onSubscribe")
}
override fun onSuccess(t: Int?) {
log("onSuccess")
}
override fun onError(e: Throwable?) {
log("onError")
}
}
singleDelay.subscribe(observer)
直搗黃龍,Single.delay背后的對(duì)象是SingleDelay?,F(xiàn)在有經(jīng)驗(yàn)了,直接看它的subscribeActual
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
//可以確定的是這是一個(gè)Disposable
final SequentialDisposable sd = new SequentialDisposable();
//將這個(gè)Disposable通過onSubscribe傳遞給下游
observer.onSubscribe(sd);
//讓上游訂閱Delay這個(gè)觀察者
source.subscribe(new Delay(sd, observer));
}
看下SequentialDisposable是什么玩意兒
public final class SequentialDisposable
extends AtomicReference<Disposable>
implements Disposable {
public SequentialDisposable() {
// nothing to do
}
public SequentialDisposable(Disposable initial) {
lazySet(initial);
}
public boolean update(Disposable next) {
return DisposableHelper.set(this, next);
}
public boolean replace(Disposable next) {
return DisposableHelper.replace(this, next);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
似曾相識(shí),上面的IntervalObserver也是這種思想。只不過這里多了2個(gè)update和replace方法,可以隨時(shí)更換AtomicReference里面的Disposable對(duì)象。這就體現(xiàn)出了這種設(shè)計(jì)的好處,不管里面的Disposable怎么更換,傳遞給下游的是這個(gè)SequentialDisposable,下游只需要調(diào)SequentialDisposable的dispose就將其里面的Disposable給取消掉了,而不用管里面的Disposable究竟是誰。
下面咱們來看SingleDelay里面的內(nèi)部類Delay(觀察者)
final class Delay implements SingleObserver<T> {
//傳遞給下游的Disposable
private final SequentialDisposable sd;
//下游的觀察者
final SingleObserver<? super T> downstream;
Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {
this.sd = sd;
this.downstream = observer;
}
@Override
public void onSubscribe(Disposable d) {
//開始訂閱的時(shí)候,sd內(nèi)部的Disposable是上游給過來的
sd.replace(d);
}
@Override
public void onSuccess(final T value) {
//上游把數(shù)據(jù)給過來之后,就不用管上游了,直接把sd里面Disposable 設(shè)置成線程調(diào)度器給回來那個(gè)
//因?yàn)榇藭r(shí)下游調(diào)用dispose的話,直接取消調(diào)度器里面的任務(wù)就行了
//巧妙地將sd里面的Disposable掉包了
sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
}
@Override
public void onError(final Throwable e) {
sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
}
final class OnSuccess implements Runnable {
private final T value;
OnSuccess(T value) {
this.value = value;
}
@Override
public void run() {
//調(diào)度器執(zhí)行到該任務(wù)時(shí),將數(shù)據(jù)傳遞給下游
downstream.onSuccess(value);
}
}
final class OnError implements Runnable {
private final Throwable e;
OnError(Throwable e) {
this.e = e;
}
@Override
public void run() {
downstream.onError(e);
}
}
}
這段代碼比較精彩,首先在上游訂閱Delay的時(shí)候,觸發(fā)onSubscribe,Delay內(nèi)部隨即將該Disposable存入SequentialDisposable對(duì)象(需要注意的是下游拿到的Disposable始終是這個(gè)SequentialDisposable)中。此時(shí)如果下游調(diào)用dispose,也就是調(diào)用SequentialDisposable的dispose,也就是上游的dispose,dispose流程在這個(gè)節(jié)點(diǎn)上就完成了,向上傳遞。
上游有數(shù)據(jù)了,通過onSuccess傳遞給觀察者Delay的時(shí)候,SequentialDisposable就可以不用管上游的那個(gè)Disposable了,此時(shí)要關(guān)心的是傳遞給線程調(diào)度器里面的任務(wù)的取消事件了。所以直接將調(diào)度器返回的Disposable替換到SequentialDisposable內(nèi)部,此時(shí)下游進(jìn)行取消時(shí),就直接把任務(wù)給取消掉了。
當(dāng)調(diào)度器執(zhí)行到任務(wù)OnSuccess時(shí),就把數(shù)據(jù)傳遞給下游,這個(gè)節(jié)點(diǎn)的任務(wù)就完成了。
5.5 Observable.map 有后續(xù),無延遲
Observable.map所對(duì)應(yīng)的是ObservableMap,直接上代碼:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//t是下游的觀察者
//source是上游
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Throwable {
T t = qd.poll();
return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
在subscribeActual中并沒有直接調(diào)用onSubscribe,而MapObserver中又沒有這個(gè)方法,那onSubscribe肯定是在其父類中完成的。在看onSubscribe之前咱干脆先把onNext理一下,這里通過mapper.apply轉(zhuǎn)一下之后馬上就交給下游的onNext去了。
//BasicFuseableObserver.java
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
public BasicFuseableObserver(Observer<? super R> downstream) {
this.downstream = downstream;
}
@Override
public final void onSubscribe(Disposable d) {
//驗(yàn)證上游 d是上游的Disposable upstream是當(dāng)前類的字段,還沒有被賦值
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
//onSubscribe之前想做點(diǎn)什么事情的話,在beforeDownstream里面做
if (beforeDownstream()) {
//調(diào)用下游的onSubscribe
downstream.onSubscribe(this);
//onSubscribe之后想做點(diǎn)什么事情的話,在afterDownstream里面做
afterDownstream();
}
}
}
protected boolean beforeDownstream() {
return true;
}
protected void afterDownstream() {
}
@Override
public void dispose() {
upstream.dispose();
}
}
//DisposableHelper.java
public static boolean validate(Disposable current, Disposable next) {
if (next == null) {
RxJavaPlugins.onError(new NullPointerException("next is null"));
return false;
}
if (current != null) {
next.dispose();
reportDisposableSet();
return false;
}
return true;
}
還是先調(diào)用下游的onSubscribe,不過,并沒有將上游的Disposable直接傳給下游,而是將中間節(jié)點(diǎn)BasicFuseableObserver自己傳給了下游,同時(shí)將上游的Disposable存儲(chǔ)起來,方便待會(huì)兒dispose。
5.6 Observable.delay 無后續(xù),有延遲
Observable.delay 對(duì)應(yīng)的是ObservableDelay
public final class ObservableDelay<T> extends AbstractObservableWithUpstream<T, T> {
@Override
@SuppressWarnings("unchecked")
public void subscribeActual(Observer<? super T> t) {
Observer<T> observer;
if (delayError) {
observer = (Observer<T>)t;
} else {
observer = new SerializedObserver<>(t);
}
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new DelayObserver<>(observer, delay, unit, w, delayError));
}
}
在subscribeActual沒有調(diào)用下游的onSubscribe,那說明是在DelayObserver中完成的
static final class DelayObserver<T> implements Observer<T>, Disposable {
final Scheduler.Worker w;
Disposable upstream;
DelayObserver(Observer<? super T> actual, long delay, TimeUnit unit, Worker w, boolean delayError) {
super();
this.downstream = actual;
this.w = w;
...
}
@Override
public void onSubscribe(Disposable d) {
//1. 先驗(yàn)證一下上游 然后將上游的Disposable賦值給upstream
//2. 調(diào)用下游的onSubscribe,把自己傳給下游
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
@Override
public void onNext(final T t) {
//OnNext任務(wù)提交給調(diào)度器執(zhí)行->在執(zhí)行任務(wù)時(shí)調(diào)用下游的onNext方法
w.schedule(new OnNext(t), delay, unit);
}
@Override
public void onError(final Throwable t) {
w.schedule(new OnError(t), delayError ? delay : 0, unit);
}
@Override
public void onComplete() {
w.schedule(new OnComplete(), delay, unit);
}
@Override
public void dispose() {
//同時(shí)取消上游的Disposable和自己執(zhí)行的調(diào)度器任務(wù)
upstream.dispose();
w.dispose();
}
final class OnNext implements Runnable {
private final T t;
OnNext(T t) {
this.t = t;
}
@Override
public void run() {
downstream.onNext(t);
}
}
...
}
onXxx的所有操作都放到了DelayObserver里面來完成,在上游調(diào)用到這節(jié)的onSubscribe時(shí),先驗(yàn)證一下上游 然后將上游的Disposable賦值給upstream,調(diào)用下游的onSubscribe,把自己傳給下游。
當(dāng)下游調(diào)用dispose時(shí),在DelayObserver的dispose方法中將上游的Disposable給取消掉,然后把自己的調(diào)度器任務(wù)也給取消掉。
事件的傳遞:當(dāng)上游調(diào)用到這一節(jié)的onNext時(shí),OnNext任務(wù)(Runnable)提交給調(diào)度器執(zhí)行->在執(zhí)行任務(wù)時(shí)調(diào)用下游的onNext方法。
6. 線程切換
線程切換是RxJava的另一個(gè)重要功能。
6.1 subscribeOn
subscribeOn在Single場景下對(duì)應(yīng)的是SingleSubscribeOn這個(gè)類
public final class SingleSubscribeOn<T> extends Single<T> {
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
observer.onSubscribe(parent);
//切線程
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
}
直接看subscribeActual方法,很明顯是將parent這個(gè)任務(wù)交給了線程調(diào)度器去執(zhí)行。那我們直接看SubscribeOnObserver的run方法即可
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
@Override
public void run() {
source.subscribe(this);
}
}
在scheduleDirect那里切線程,然后在另一個(gè)線程中去執(zhí)行source.subscribe(this),也就是在Scheduler指定的線程里啟動(dòng)subscribe(訂閱)。
- 切換起源Observable的線程
- 當(dāng)多次調(diào)用subscribeOn()的時(shí)候,只有最上面的會(huì)對(duì)起源Observable起作用
observeOn
observeOn在Single場景下的類是SingleObserveOn。它的subscribeActual方法如下:
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
}
上游訂閱了ObserveOnSingleObserver這個(gè)觀察者,核心就在這個(gè)觀察者里面。
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> downstream;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
downstream.onSubscribe(this);
}
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e) {
this.error = e;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onSuccess(value);
}
}
...
}
我們重點(diǎn)關(guān)注一下onSuccess和onError方法,核心就是將當(dāng)前這個(gè)Runnable任務(wù)交給scheduler進(jìn)行執(zhí)行,而這里的scheduler是由使用者傳入的,比如說是AndroidSchedulers.mainThread()。那么在run方法執(zhí)行時(shí),就會(huì)在主線程中,那么在主線程中執(zhí)行下游的onError和onSuccess。 這里通過Scheduler指定的線程來調(diào)用下級(jí)Observer的對(duì)應(yīng)回調(diào)方法。
- 切換observeOn下面的Observer的回調(diào)所在的線程
- 當(dāng)多次調(diào)用observerOn()的時(shí)候,每個(gè)都好進(jìn)行一次線程切換,影響范圍是它下面的每個(gè)Observer(除非又遇到新的obServeOn())
6.2 Scheduler的原理
上面我們多次提到Scheduler,但是一直不知道它具體是什么。其實(shí)它就是用來控制控制線程的,用于將指定的邏輯在指定的線程中執(zhí)行。這里就不帶著大家讀源碼了,篇幅過于長了,這塊源碼也比較簡單,感興趣的讀者可以去翻閱一下。下面是幾個(gè)核心點(diǎn)。
其中Schedulers.newThread()里面是創(chuàng)建了一個(gè)線程池Executors.newScheduledThreadPool(1, factory)來執(zhí)行任務(wù),但是這個(gè)線程池里面的線程不會(huì)得到重用,每次都是新建的線程池。當(dāng) scheduleDirect() 被調(diào)用的時(shí)候,會(huì)創(chuàng)建一個(gè) Worker,Worker 的內(nèi)部 會(huì)有一個(gè) Executor,由 Executor 來完成實(shí)際的線程切換;scheduleDirect() 還會(huì)創(chuàng)建出一個(gè) Disposable 對(duì)象,交給外層的 Observer,讓它能執(zhí)行 dispose() 操作,取消訂閱鏈;
Schedulers.io()和Schedulers.newThread()差別不大,但是io()這兒線程可能會(huì)被重用,所以一般io()用得多一些。
AndroidSchedulers.mainThread()就更簡單了,直接使用Handler進(jìn)行線程切換,將任務(wù)放到主線程去做,不管再怎么花里胡哨的庫,最后要切到主線程還得靠Handler。
7. 小結(jié)
Rxjava由于其基于事件流的鏈?zhǔn)秸{(diào)用、邏輯簡潔 & 使用簡單的特點(diǎn),深受各大 Android開發(fā)者的歡迎。平時(shí)在項(xiàng)目中也使用得比較多,所以本文對(duì)RxJava3中的訂閱流程、取消流程、線程切換進(jìn)行了核心源碼分析,希望能幫助到各位。