本系列文章如下:
- 拆輪子系列--RxJava前奏篇
- 拆輪子系列--RxJava理解(一)--Map解析
- 拆輪子系列--RxJava理解(二)--subscribeOn
- 拆輪子系列--RxJava理解(三)--observeOn
上一篇文章主要介紹了RxJava中基本的調(diào)用流程以及常用的操作符--map。本文主要介紹RxJava中線程調(diào)度的核心操作符之一subscribeOn,本文源碼分析基于RxJava2。
本文的大綱如下:
- 一個具體的例子
- subscribeOn源碼分析
- 總結(jié)
1 . 一個具體的例子
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onNext("2");
e.onNext("3");
e.onComplete();
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String str) throws Exception {
Log.e("TAG", "-map-thread-" + Thread.currentThread().getName() + "-string-" + s);
return Integer.valueOf(s);
}
}).subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("TAG", "-sub-" + Thread.currentThread().getName() + "-integer-" + integer);
}
});
}
運行下上面的代碼,我們先看看結(jié)果會是怎么樣的:
E/TAG: -map-thread-RxCachedThreadScheduler-1-string-1
E/TAG: -sub-RxCachedThreadScheduler-1-integer-1
E/TAG: -map-thread-RxCachedThreadScheduler-1-string-2
E/TAG: -sub-RxCachedThreadScheduler-1-integer-2
E/TAG: -map-thread-RxCachedThreadScheduler-1-string-3
E/TAG: -sub-RxCachedThreadScheduler-1-integer-3
如果我們調(diào)用多次的subscribeOn()會是怎么樣子的呢?我們改下代碼看看:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onNext("2");
e.onNext("3");
e.onComplete();
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
Log.e("TAG", "-map-thread-" + Thread.currentThread().getName() + "-string-" + s);
return Integer.valueOf(s);
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("TAG", "-sub-" + Thread.currentThread().getName() + "-integer-" + integer);
}
});
}
我們再來看看打印的信息:
E/TAG: -map-thread-main-string-1
E/TAG: -sub-main-integer-1
E/TAG: -map-thread-main-string-2
E/TAG: -sub-main-integer-2
E/TAG: -map-thread-main-string-3
E/TAG: -sub-main-integer-3
有沒有發(fā)現(xiàn),當(dāng)我們調(diào)用兩次的subscribeOn()這個操作符,只有最上面的那個起了作用,后面的subscribeOn()操作符無效。這個是為什么呢?接下來進(jìn)行源碼分析。
2. subscribeOn源碼分析
首先,我們來看看subscribeOn()這個操作符到底干了些什么事情,然后再來解釋為什么多次調(diào)用subscribeOn()這個操作符,只有第一個操作符有效。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
我們首先看看onAssembly()這個里面到底干了些什么事情:
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
從源碼中我們可以看出,它并沒有干什么事情,就是返回了一個Observable的對象,接下來我們看看ObservableSubscribeOn()這部分的源碼:
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
在上面的代碼中,真正的執(zhí)行操作是在subscribeActual()方法中,該方法中首先執(zhí)行的操作就是通過代理的方式使用SubscribeOnObserver將Observer進(jìn)行封裝,設(shè)置Disposable來將subscribe進(jìn)行線程切換到scheduler的線程中。
簡單點來說,subscribeOn這個操作符就是將上一層的ObservableSource(就是上一層的Observable)放到一個新的線程去發(fā)射元素。
ok,這里也就解釋了為什么調(diào)用多個subscribeOn()時,只有最上面的subscribeOn()起作用,那么根據(jù)上面的分析,當(dāng)我們在操作的時候加入多個map()操作符的時候,情況會是怎么樣子的?如下示例:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onNext("2");
e.onComplete();
}
})
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
Log.e("TAG", "-map-thread-" + Thread.currentThread().getName() + "-string-" + s);
return Integer.valueOf(s);
}
})
.subscribeOn(Schedulers.io())
.map(new Function<Integer, Long>() {
@Override
public Long apply(Integer integer) throws Exception {
Log.e("TAG", "-map-thread-" + Thread.currentThread().getName() + "-Integer-" + integer);
return Long.valueOf(integer);
}
})
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long longValue) throws Exception {
Log.e("TAG", "-sub-" + Thread.currentThread().getName() + "-longValue-" + longValue);
}
});
結(jié)果如下:
E/TAG: -map-thread-RxCachedThreadScheduler-1-string-1
E/TAG: -map-thread-RxCachedThreadScheduler-1-Integer-1
E/TAG: -sub-RxCachedThreadScheduler-1-longValue-1
E/TAG: -map-thread-RxCachedThreadScheduler-1-string-2
E/TAG: -map-thread-RxCachedThreadScheduler-1-Integer-2
E/TAG: -sub-RxCachedThreadScheduler-1-longValue-2
從結(jié)果中我們發(fā)現(xiàn),不管執(zhí)行了多少次的map()操作,每個Observable只有第一個subscribeOn()方法起作用,而且多次的map()操作不會影響線程的變換。
3. 總結(jié)
首先,用一張圖來展示subscribeOn()這個操作符基本運行的流程:

subscribeOn這個操作符主要的執(zhí)行邏輯現(xiàn)在總結(jié)如下:
- 代理
Observable做的事情就是將Observable1發(fā)射到scheduler指定的線程中進(jìn)行subscribe(),并且調(diào)用下一層傳上來的observer1來接收事件;
- 代理
- 當(dāng)我們多次調(diào)用
subscribeOn()方法后,其實此時的事件并沒有發(fā)射出去,當(dāng)執(zhí)行到第一個subscribeOn()方法時,后面subscribeOn()切換線程就會被第一個subscribeOn()切換線程的方法截斷,因此,多次調(diào)用subscribeOn()對整個流程是不產(chǎn)生影響的,從而這里可以解釋前面的問題,當(dāng)使用多個subscribeOn()的時候,也只有第一個subscribeOn()起作用。
- 當(dāng)我們多次調(diào)用
再舉個淺顯的栗子來說明下多次調(diào)用subscribeOn的情況:
new Thread(new Runnable() {
@Override
public void run() {
new Thread(new Runnable() {
@Override
public void run() {
new Thread(new Runnable() {
@Override
public void run() {
Log.e("TAG", "which thread?=" + Thread.currentThread().getName());
}
}, "線程3").start();
}
}, "線程2").start();
}
}, "線程1").start();
上面代碼執(zhí)行的結(jié)果:
E/TAG: which thread?=線程3
- 當(dāng)我們使用
subscribeOn()且使用多個map()操作符的時候,subscribeOn會將上一層Observable切換到一個指定的線程進(jìn)行事件的發(fā)射,而上一層Observable拿到的observer已經(jīng)是由下層observer一路封裝上來的,也就是該observer經(jīng)歷了所有的轉(zhuǎn)換。因此,不管執(zhí)行了多少次的map()操作,
不管該map()操作是執(zhí)行在subscribeOn()之前還是之后,其最后的發(fā)射事件會在subscribeOn()指定的線程執(zhí)行。
- 當(dāng)我們使用
RxJava中線程調(diào)度方法subscribeOn()方法這里就介紹完畢,下一篇繼續(xù)分析RxJava中另一個線程調(diào)度方法observableOn()。
如果文章中有什么疏漏或者錯誤的地方,還望各位指正,你們的監(jiān)督是我最大的動力,謝謝!