星球話題:用過RxJava和RxAndroid嗎?RxAndroid切換線程是怎么實(shí)現(xiàn)的呢?
去年知乎上參加了玉剛的Live,聽大神講解職業(yè)規(guī)劃。隨后入了微信群,去年11月份也加入主席的星球。由于去年十一月份剛好接了外包工作,比較忙就忽略星球的任務(wù),說來慚愧,到現(xiàn)在還沒有交過一次作業(yè)。再加上今年年初想換工作,就忙于復(fù)習(xí),星球的作業(yè)就落下,希望從今天開始,把作業(yè)補(bǔ)回來。
年初也去試水,發(fā)覺現(xiàn)在android的要求真的是高。可能也是自己比較菜吧,試了三家沒有拿到offer。今年計(jì)劃,好好復(fù)習(xí)安卓知識(shí),學(xué)點(diǎn)RN、小程序、PWA,爭(zhēng)取拿到好的offer。女朋友說我,晚上想了千萬條路,隔天起來走原路。哎,反正還是得腳踏實(shí)地,一步一步學(xué)習(xí)。說干就干,下邊我們開始學(xué)習(xí)。
(1)RxJava 基本概念:
1、Observable (可觀察者,即被觀察者)
2、Observer (觀察者)
3、subscribe (訂閱)、事件
4、Scheduler 調(diào)度器,相當(dāng)于線程控制器
Rxjava 實(shí)現(xiàn):
1、創(chuàng)建Observable(被觀察者):
mObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("2018年");
subscriber.onNext("CBA");
subscriber.onNext("遼寧隊(duì)");
subscriber.onNext("奪冠");
subscriber.onCompleted();
}
});
這里傳入了一個(gè)OnSubscribe對(duì)象作為參數(shù)。OnSubscribe會(huì)被存儲(chǔ)在返回的 Observable對(duì) 象中,它的作用相當(dāng)于一個(gè)計(jì)劃表,當(dāng)Observable被訂閱的時(shí)候,OnSubscribe的call()方法會(huì)自動(dòng)被調(diào)用,事件序列就會(huì)依照設(shè)定依次觸發(fā)(對(duì)于上面的代碼,就是觀察者Subscriber 將會(huì)被調(diào)用四次 onNext() 和一次 onCompleted())。這樣,由被觀察者調(diào)用了觀察者的回調(diào)方法,就實(shí)現(xiàn)了由被觀察者向觀察者的事件傳遞。
2、創(chuàng)建Observer(觀察者):
mObserver = new Observer<String>() {
@Override
public void onCompleted() {
LogUtil.d("onCompleted:");
}
@Override
public void onError(Throwable e) {
LogUtil.d("onError:"+e);
}
@Override
public void onNext(String s) {
LogUtil.d("onNext:"+s);
}
};
Subscriber是實(shí)現(xiàn)Observer的抽象類,用法也一樣:
mSubscriber = new Subscriber() {
@Override
public void onCompleted() {
LogUtil.d("onCompleted:");
}
@Override
public void onError(Throwable e) {
LogUtil.d("onError:"+e);
}
@Override
public void onNext(Object o) {
LogUtil.d("onNext:"+o);
}
};
Subscirber與Observe 的區(qū)別是:
1、onStart(): 這是 Subscriber 增加的方法。它會(huì)在 subscribe 剛開始,而事件還未發(fā)送之 前被調(diào)用,可以用于做一些準(zhǔn)備工作。
2、unsubscribe(): 這是 Subscriber 所實(shí)現(xiàn)的另一個(gè)接口 Subscription 的方法,用于取消訂閱。
3、subscribe()訂閱:
mObservable.subscribe(mObserver);
或者
mObservable.subscribe(mSubscriber);
以上1、2、3過程也可以寫成:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("2018年");
subscriber.onNext("CBA");
subscriber.onNext("遼寧隊(duì)");
subscriber.onNext("奪冠");
subscriber.onCompleted();
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
LogUtil.d("onCompleted:");
}
@Override
public void onError(Throwable e) {
LogUtil.d("onError:"+e);
}
@Override
public void onNext(String s) {
LogUtil.d("onNext:"+s);
}
});
結(jié)果:

(2)Rxjava常見操作符
下邊我們來了解一下Rxjava 常見操作符:
just:將傳入的參數(shù)依次發(fā)送出來
Observable observable = Observable.just("2018年", "CBA", "遼寧隊(duì)","奪冠");
// 將會(huì)依次調(diào)用:
// onNext("2018年");
// onNext("CBA");
// onNext("遼寧隊(duì)");
// onNext("奪冠");
// onCompleted();
from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對(duì)象后,依次發(fā)送出:
String[] words = {"2018年", "CBA", "遼寧隊(duì)","奪冠"};
Observable observable = Observable.from(words);
// 將會(huì)依次調(diào)用:
// onNext("2018年");
// onNext("CBA");
// onNext("遼寧隊(duì)");
// onNext("奪冠");
// onCompleted();
(3)Rxjava如何切換線程:Scheduler
我們回到文章開頭,RxAndroid切換線程是怎么實(shí)現(xiàn)的呢?看下邊一個(gè)例子:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
LogUtil.d("subscriber:");
subscriber.onNext("2018年");
subscriber.onNext("CBA");
subscriber.onNext("遼寧隊(duì)");
subscriber.onNext("奪冠");
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
LogUtil.d("onCompleted:");
}
@Override
public void onError(Throwable e) {
LogUtil.d("onError:"+e);
}
@Override
public void onNext(String s) {
LogUtil.d("onNext:"+s);
}
});
subscribeOn(): 指定subscribe()所發(fā)生的線程,即 Observable.OnSubscribe被激活時(shí)所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。
observeOn(): 指定 Subscriber 所運(yùn)行在的線程?;蛘呓凶鍪录M(fèi)的線程。
我們來了解一下Scheduler:
在不指定線程的情況下,RxJava遵循的是線程不變的原則,即:在哪個(gè)線程調(diào)用 subscribe(),就在哪個(gè)線程生產(chǎn)事件;在哪個(gè)線程生產(chǎn)事件,就在哪個(gè)線程消費(fèi)事件。如果需要切換線程,就需要用到Scheduler(調(diào)度器),下面是Scheduler的API:
1、Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程。這是默認(rèn)的 Scheduler。
2、Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。
Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計(jì)算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
3、Schedulers.computation(): 計(jì)算所使用的 Scheduler。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算,即不會(huì)被 I/O 等操作限制性能的操作,例如圖形的計(jì)算。這個(gè) Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU。
4、Android 還有一個(gè)專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運(yùn)行。
有了這幾個(gè) Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個(gè)方法來對(duì)線程進(jìn)行控制了。
那么我們有個(gè)疑問,Rxjava內(nèi)部是如何切換線程?首先我們來看subscribeOn()
subscribeOn()源碼:
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
我們看到參數(shù)是傳入Scheduler 調(diào)度器,然后創(chuàng)建了新的Observable,我們看到OperatorSubscribeOn這個(gè)對(duì)象,OperatorSubscribeOn原始Observable對(duì)象和調(diào)度器scheduler,那么這個(gè)OperatorSubscribeOn是什么呢,我們看下源碼:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler; //調(diào)度器
final Observable<T> source; //原始Observable
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
//①.原始觀察者訂閱了新的Observable后,將執(zhí)行此call方法
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
//②. call方法中使用傳入的調(diào)度器創(chuàng)建的Worker對(duì)象的schedule方法切換線程
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
//③ .創(chuàng)建了一個(gè)新的觀察者
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
//⑤. 新的觀察者收到數(shù)據(jù)后直接發(fā)送給原始觀察者
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
//⑤. 新的觀察者收到數(shù)據(jù)后直接發(fā)送給原始觀察者
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
//④. 在切換的線程中,新的觀察者訂閱原始Observable,用來接收數(shù)據(jù)
source.unsafeSubscribe(s);
}
});
}
}
OperatorSubscribeOn是實(shí)現(xiàn)Observable的OnSubscribe 接口
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
public interface Action1<T> extends Action {
void call(T t);
}
上面源碼中注釋已經(jīng)寫的很清楚了,OperatorSubscribeOn字意上來理解,Operator操作員,相當(dāng)于媒介,為新的Observable發(fā)射數(shù)據(jù)。它創(chuàng)建了一個(gè)新的觀察者訂閱原始Observable,這樣就可以接受原始Observable發(fā)射的數(shù)據(jù),然后直接發(fā)送給原始觀察者。
所以O(shè)peratorSubscribeOn也是間接實(shí)現(xiàn)了Action1,我們來看OperatorSubscribeOn在call()方法里邊操作了什么。在call方法中通過scheduler.createWorker().schedule()完成線程的切換,這里就牽扯到兩個(gè)對(duì)象了,Scheduler和Worker。Scheduler是個(gè)抽象類,是從外邊傳進(jìn)來的。我們就看一個(gè)簡(jiǎn)單的Schedulers.newThread(),其他也是從類似,下面一步一步看源碼:
/**
* Static factory methods for creating Schedulers.
*/
public final class Schedulers {
//各種調(diào)度器對(duì)象
private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;
private static final AtomicReference<Schedulers> INSTANCE = new AtomicReference<Schedulers>();
......
//構(gòu)造方法
private Schedulers() {
RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();
......
Scheduler nt = hook.getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
//①.創(chuàng)建newThreadScheduler對(duì)象
newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
}
}
}
//②. 獲取NewThreadScheduler對(duì)象
public static Scheduler newThread() {
return getInstance().newThreadScheduler;
}
Schedulers中保存了幾個(gè)調(diào)度器對(duì)象,在Schedulers被加載的時(shí)候,他們就被初始化了,Schedulers就像是一個(gè)調(diào)度器的控制器,跟蹤newThreadScheduler,看到newThreadScheduler在RxJavaSchedulersHook.createNewScheduler()實(shí)例化。CTRL+鼠標(biāo)左鍵跟createNewScheduler()方法進(jìn)去,最終調(diào)到NewThreadScheduler(ThreadFactory threadFactory)的方法:
**
* Schedules work on a new thread.
*/
public final class NewThreadScheduler extends Scheduler {
private final ThreadFactory threadFactory;
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
NewThreadScheduler就是我們調(diào)用subscribeOn(Schedulers.newThread() )傳入的調(diào)度器對(duì)象,每個(gè)調(diào)度器對(duì)象都有一個(gè)createWorker方法用于創(chuàng)建一個(gè)Worker對(duì)象,而NewThreadScheduler對(duì)應(yīng)創(chuàng)建的Worker是一個(gè)叫NewThreadWorker的對(duì)象,在新產(chǎn)生的OperatorSubscribeOn計(jì)劃表中就是通過NewThreadWorker.schedule(Action0)實(shí)現(xiàn)線程的切換,下面我們跟蹤schedule(Action0)方法:
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
private final ScheduledExecutorService executor; //
public NewThreadWorker(ThreadFactory threadFactory) {
//創(chuàng)建一個(gè)線程池
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
executor = exec;
}
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
return scheduleActual(action, delayTime, unit);
}
//重要:worker.schedule()最終調(diào)用的是這個(gè)方法
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
//return action;
Action0 decoratedAction = schedulersHook.onSchedule(action);
//ScheduledAction就是一個(gè)Runnable對(duì)象,在run()方法中調(diào)用了Action0.call()
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run); //將Runnable對(duì)象放入線程池中
} else {
f = executor.schedule(run, delayTime, unit); //延遲執(zhí)行
}
run.add(f);
return run;
}
...
}
我們發(fā)現(xiàn)OperatorSubscribeOn計(jì)劃表中通過NewThreadWorker.schedule(Action0),將Action0放入到一個(gè)線程池中執(zhí)行,這樣就實(shí)現(xiàn)了線程的切換。
多次subscribeOn()的情況:
我們發(fā)現(xiàn),每次使用subscribeOn都會(huì)產(chǎn)生一個(gè)新的Observable,并產(chǎn)生一個(gè)新的計(jì)劃表OnSubscribe,目標(biāo)Subscriber最后訂閱的將是最后一次subscribeOn產(chǎn)生的新的Observable。在每個(gè)新的OnSubscribe的call方法中都會(huì)有一個(gè)產(chǎn)生一個(gè)新的線程,在這個(gè)新線程中訂閱上一級(jí)Observable,并創(chuàng)建一個(gè)新的Subscriber接受數(shù)據(jù),最終原始Observable將在第一個(gè)新線程中發(fā)射數(shù)據(jù),然后傳送給給下一個(gè)新的觀察者,直到傳送到目標(biāo)觀察者,所以多次調(diào)用subscribeOn只有第一個(gè)起作用(這只是表面現(xiàn)象,其實(shí)每個(gè)subscribeOn都切換了線程,只是最終目標(biāo)Observable是在第一個(gè)subscribeOn產(chǎn)生的線程中發(fā)射數(shù)據(jù)的)。
?多次subscribeOn()只有第一個(gè)會(huì)起作用,后面的只是在第一個(gè)的基礎(chǔ)上在外面套了一層殼,就像下面的偽代碼,最后執(zhí)行是在第一個(gè)新線程中執(zhí)行:
...
//第3個(gè)subscribeOn產(chǎn)生的新線程
new Thread(){
@Override
public void run() {
Subscriber s1 = new Subscriber();
//第2個(gè)subscribeOn產(chǎn)生的新線程
new Thread(){
@Override
public void run() {
Subscriber s2 = new Subscriber();
//第1個(gè)subscribeOn產(chǎn)生的新線程
new Thread(){
@Override
public void run() {
Subscriber<T> s3 = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
...
};
//①. 最后一個(gè)新觀察者訂閱原始Observable
原始Observable.subscribe(s3);
//②. 原始Observable將在此線程中發(fā)射數(shù)據(jù)
//③. 最后一個(gè)新的觀察者s3接受數(shù)據(jù)
//④. s3收到數(shù)據(jù)后,直接發(fā)送給s2,s2收到數(shù)據(jù)后傳給s1,...最后目標(biāo)觀察者收到數(shù)據(jù)
}
}.start();
}
}.start();
}
}.start();
observeOn原理:
observeOn調(diào)用的是lift操作符,lift操作符。lift有點(diǎn)難理解,簡(jiǎn)單點(diǎn)說就是在 Observable 執(zhí)行了 lift(Operator) 方法之后,會(huì)返回一個(gè)新的 Observable,這個(gè)新的 Observable 會(huì)像一個(gè)代理一樣,負(fù)責(zé)接收原始的 Observable 發(fā)出的事件,并在處理后發(fā)送給 Subscriber。
observeOn一樣創(chuàng)建了一個(gè)代理的Observable,并創(chuàng)建一個(gè)代理觀察者接受上一級(jí)Observable的數(shù)據(jù),代理觀察者收到數(shù)據(jù)之后會(huì)開啟一個(gè)線程,在新的線程中,調(diào)用下一級(jí)觀察者的onNext、onCompete、onError方法。
我們看看observeOn操作符的源碼:
public final class OperatorObserveOn<T> implements Observable.Operator<T, T> {
private final Scheduler scheduler;
//創(chuàng)建代理觀察者,用于接收上一級(jí)Observable發(fā)射的數(shù)據(jù)
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
return child;
} else if (scheduler instanceof TrampolineScheduler) {
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
//代理觀察者
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final NotificationLite<T> on;
final Queue<Object> queue;
//接受上一級(jí)Observable發(fā)射的數(shù)據(jù)
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
...
schedule();
}
@Override
public void onError(final Throwable e) {
...
schedule();
}
//開啟新線程處理數(shù)據(jù)
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
// only execute this from schedule()
//在新線程中將數(shù)據(jù)發(fā)送給目標(biāo)觀察者
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;
for (;;) {
while (requestAmount != currentEmission) {
...
localChild.onNext(localOn.getValue(v));
}
}
}
}
}
ObserveOnSubscriber代理觀察者相當(dāng)于微信代理商,讓代理商幫忙買onNext,onCompleted,onError,代理商分別代表你進(jìn)行購買。我們看到Worker類的 recursiveScheduler執(zhí)行recursiveScheduler.schedule(this),回調(diào)到方法call中。call可以由傳進(jìn)來的schedule 實(shí)現(xiàn)線程切換。就像上邊的NewThreadWorker.schedule(Action0)一樣。
代理的OnSubscribe中的call方法就是讓代理Subscriber訂閱上一級(jí)Observable,直到訂閱到原始Observable發(fā)射數(shù)據(jù),代理Subscriber收到數(shù)據(jù)后,可能對(duì)數(shù)據(jù)做一些操作,然后將數(shù)據(jù)傳送給下一級(jí)Subscriber,直到目標(biāo)觀察者接收到數(shù)據(jù),目標(biāo)觀察者在那個(gè)線程接受數(shù)據(jù)取決于上一個(gè)Subscriber在哪一個(gè)線程調(diào)用目標(biāo)觀察者。
嗯,本人技術(shù)有限,也是參考以下文章學(xué)習(xí)的。也是希望今年能夠耐心、堅(jiān)持學(xué)下去吧。希望在2018年想找個(gè)好工作的小伙伴們,共勉、堅(jiān)持!
給 Android 開發(fā)者的 RxJava 詳解
https://blog.csdn.net/xmxkf/article/details/51821940