目錄
異步
在該系列第一篇最開(kāi)始,我們已經(jīng)說(shuō)了RxJava是一個(gè)異步編程框架,之所以這么說(shuō),就是因?yàn)樗诰€程的切換方面非常方便。
介紹異步之前,我們先看看下面幾個(gè)方法
subscribe(Observer<? super T> observer)
subscribe(Consumer<? super T> onNext)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe)
在基本使用中,我們用的是第一個(gè)方法,但此方法需要重寫所有的事件,但有的時(shí)候我們并不需要對(duì)所有的事件進(jìn)行處理,因此就有了下面幾種方法,看參數(shù)我們就知道每個(gè)方法分別處理的是什么事件,比如第二個(gè)方法,只處理next事件,第三個(gè)方法,只處理next和error事件等等。
為了代碼的簡(jiǎn)潔性,接下來(lái)我將使用Consumer作為觀察者。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("TAG","subscribe:"+Thread.currentThread().getName());
emitter.onNext("1");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("TAG","accept:"+Thread.currentThread().getName());
}
});
輸出結(jié)果
D/TAG: subscribe:main
D/TAG: accept:main
結(jié)果分析
如果我將上述代碼,放在一個(gè)子線程中去,發(fā)現(xiàn)結(jié)果線程的名字將不再是main。說(shuō)明在哪創(chuàng)建上述代碼,則上游和下游就會(huì)處于那個(gè)線程,并且它們處于同一個(gè)線程。
如果我們要在子線程中發(fā)送交易,主線程更新UI,這種情況就滿足不了我們的需求了。我們需要的是上線處于子線程,負(fù)責(zé)發(fā)送網(wǎng)絡(luò)請(qǐng)求,下游處于主線程,負(fù)責(zé)更新UI。通過(guò)RxJava的線程調(diào)度器可以輕松實(shí)現(xiàn)上述需求。
在Observable中有兩個(gè)方法
subscribeOn(Scheduler scheduler) //指定上游所在的線程
observeOn(Scheduler scheduler) //指定下游所在的線程
先來(lái)看看下面的代碼
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("TAG", "subscribe:" + Thread.currentThread().getName());
emitter.onNext("1");
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
}
};
//關(guān)注點(diǎn)
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
輸出結(jié)果
D/TAG: subscribe:RxNewThreadScheduler-1
D/TAG: accept:1:main
我們來(lái)看看Schedulers和AndroidSchedulers
這兩個(gè)類并無(wú)繼承關(guān)系,是相互獨(dú)立的兩個(gè)final類
AndroidSchedulers
/** A {@link Scheduler} which executes actions on the Android main thread.*/
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
前者指定方法在主線程中執(zhí)行
后者指定方法在哪個(gè)線程執(zhí)行,由Looper所在的線程決定
Schedulers
該調(diào)度器里面有下面幾個(gè)主要方法
//新的子線程
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
//計(jì)算密集型任務(wù)
public static Scheduler computation() {
return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}
//io密集型任務(wù)
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
public static Scheduler trampoline() {
return TRAMPOLINE;
}
public static Scheduler single() {
return RxJavaPlugins.onSingleScheduler(SINGLE);
}
其他兩個(gè)暫時(shí)沒(méi)用到,就先不說(shuō)明了。
回到開(kāi)始的異步代碼
修改關(guān)注點(diǎn)
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(consumer);
輸出結(jié)果
D/TAG: subscribe:RxNewThreadScheduler-1
D/TAG: accept:1:RxCachedThreadScheduler-2
結(jié)果分析
修改后的代碼指定了2次上游發(fā)送事件的線程,下游也指定了2次線程,通過(guò)輸出結(jié)果,我們可以得出結(jié)論:上游線程只有第一次指定的有效,下游線程最終會(huì)切換至最后一個(gè)指定的線程。
為了更加清晰的知道下游線程的切換過(guò)程,我們修改代碼如下
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("TAG", "subscribe:" + Thread.currentThread().getName());
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
}
};
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
}
})
.subscribe(consumer);
輸出結(jié)果如下
D/TAG: subscribe:RxNewThreadScheduler-1
D/TAG: accept:1:main
D/TAG: accept:2:main
D/TAG: accept:3:main
D/TAG: accept:1:RxCachedThreadScheduler-2
D/TAG: accept:1:RxCachedThreadScheduler-2
D/TAG: accept:2:RxCachedThreadScheduler-2
D/TAG: accept:2:RxCachedThreadScheduler-2
D/TAG: accept:3:RxCachedThreadScheduler-2
D/TAG: accept:3:RxCachedThreadScheduler-2
結(jié)果分析
從輸出結(jié)果,我們可以看出,每一個(gè)doOnNext都會(huì)接受到全部事件,并且每一個(gè)observeOn指定的是它下面的那個(gè)事件所處的線程。
線程切換原理分析

來(lái)一段RxJava的調(diào)用鏈代碼
Observable.just(1)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
LogUtil.d("rxjava", "map1: " + Thread.currentThread().getId());
return integer.toString();
}
})
.subscribeOn(Schedulers.newThread()) // s1
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
LogUtil.d("rxjava", "map2: " + Thread.currentThread().getId());
return s.hashCode();
}
})
.observeOn(Schedulers.newThread()) // o1
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
LogUtil.d("rxjava", "map3: " + Thread.currentThread().getId());
return integer.toString();
}
})
.subscribeOn(Schedulers.newThread()) // s2
.observeOn(Schedulers.newThread()) // o2
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtil.d("rxjava", "accept: " + Thread.currentThread().getId());
}
});
結(jié)合圖和代碼來(lái)分析線程的切換過(guò)程
這段代碼中包含了很多操作符,每一個(gè)點(diǎn)后面的都是RxJava的操作符,如just,map,subscribe等等,對(duì)應(yīng)圖中的lift
在這些操作符中,每調(diào)用一次操作符,都返回Observable,這就像Builder模式,只有subscribe返回的不是Observable,而是Disposable
subscribe意味著RxJava調(diào)用鏈開(kāi)始啟動(dòng),對(duì)應(yīng)圖中的底端的actual-subscriber。
自下而上找subscribeOn,每經(jīng)過(guò)一個(gè)subscribeOn就切換一次線程(如果一個(gè)都沒(méi)有,則線程默認(rèn)為當(dāng)前線程),直到到達(dá)頂端的Observable,對(duì)應(yīng)圖中的onSubscribe
自上而下找observeOn(圖中的Subscribe),同樣是每經(jīng)過(guò)一個(gè)observeOn就切換一次線程
通過(guò)這個(gè)思路,大家想一想上述程序的打印結(jié)果
just,map1和map2處于一個(gè)線程,并且是s1所指定的線程
map3處于o1所處的線程
accept處于o2所處的線程
打印驗(yàn)證一下
map1: 732
map2: 732
map3: 733
accept: 734
結(jié)果一致
[]