線程切換
只要使用RxJava肯定對下面的代碼特別熟悉
Observable.from(list)
.subcribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1(){
public void call(Object obj){
}
});
其中subscribeOn()可以把事件發(fā)生的線程切換到io線程,observeOn()可以把處理事件的線程切換到Android應(yīng)用程序主線程。
那他是怎么做到這么簡潔的切換呢?
Rxjava的所有變換都基于一個(gè)lift模型,我們接下來介紹一下這個(gè)模型。
首先回顧一下Observable通知Subscriber的原理。
在生成Observable的時(shí)候我們會(huì)傳入一個(gè)OnSubscribe的實(shí)例,在發(fā)生訂閱關(guān)系subscribe方法中,OnSubscribe實(shí)例的call(Subscriber)方法中就會(huì)調(diào)用傳入的Subscriber的的相關(guān)方法,從而實(shí)現(xiàn)通知,消息發(fā)送。
流程大概如下
Observable—(實(shí)例化時(shí))—>OnSubscribe—(訂閱關(guān)系發(fā)生subscribe時(shí)) —>Subscriber
那首先考慮一下我們怎么實(shí)現(xiàn)在Observable中發(fā)送的是 圓形 事件,但是在Subscriber中接收到 方形事件并處理呢。
如果Observable和Subscriber的事件都不一樣,這都不能發(fā)生訂閱關(guān)系的,因?yàn)樵诰幾g檢查的時(shí)候就無法通過。
RxJava采用的方法是提供一個(gè)變換方法lift(Operater),該方法返回一個(gè)Observable對象。該Observable對象會(huì)發(fā)送方形 事件,這樣就可以用這個(gè)新的Observable對象來訂閱原始的Subscriber了。
在新生成的Obaservable對象的時(shí)候,我們也會(huì)生成該Observable對應(yīng)的OnSubscribe對象,并實(shí)現(xiàn)新的OnSubscribe對象的call(Subscriber)方法。由于我們用新生成的Observable對象去訂閱原始的Subscriber,所以新生成的OnSubscribe的call方法中的參數(shù)就是原始的Subscriber了。
接下來就是lift(Operator)中Operator接口發(fā)揮作用的時(shí)候了,這個(gè)Operator接口的call方法實(shí)現(xiàn)把一個(gè)Subscriber變換成另一個(gè)Subscrber的功能。
在這里就是把原始的響應(yīng)方形事件的Subscriber轉(zhuǎn)換成響應(yīng)圓形事件的Subscriber,這樣就可以調(diào)用原始的OnSubscribe的call方法,把圓形事件變成發(fā)送給這個(gè)新的響應(yīng)圓形事件的Subscriber.
這樣新的Observable中就同時(shí)包含了:
- 原始的OnSubscribe(它能發(fā)送
圓形事件) - 新的Subscriber(它能接受
圓形事件) - 原始的Subscriber(它能接受
方形事件)
這樣,在發(fā)生訂閱關(guān)系時(shí),原始的Observable(原始的OnSubscribe)會(huì)發(fā)送圓形事件給新的Subscriber,新的Subscriber在處理的時(shí)候,就把這個(gè)事件轉(zhuǎn)換一下傳遞給原始的Subscriber。
基本流程就是這樣,Observable.Operator就是實(shí)現(xiàn)新老Subscriber關(guān)聯(lián)的紐帶。
借用扔物線的snippet
//這個(gè)lift的作用就是把只發(fā)送圓形事件的Observable轉(zhuǎn)換成發(fā)送方形事件的Observable
public <Rectangle> Observable<Rectangle> lift(Operator<? extends Rectangle, ? super Round> operator) {
return Observable.create(new OnSubscribe<Rectangle>() {
@Override
//這個(gè)參數(shù)subscriber是原始的subscriber,只接受方形事件
public void call(Subscriber subscriber) {
//這個(gè)newSubscriber是operator生成的新的Subscriber,它可以接收圓形事件
//新的subscriber會(huì)調(diào)用原始的sunscriber的相關(guān)方法
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
//這個(gè)onSubscribe是原始的OnSubscribe,它發(fā)送圓形事件
//所以可以用新生成的newSubscriber來接收
onSubscribe.call(newSubscriber);
}
});
}
記不住以上內(nèi)容也沒有關(guān)系,只要記住在變換中,會(huì)成成一個(gè)新的Observable和Subscriber就可以了,我們在變換完之后所進(jìn)行的操作都是針對新生成的Observable和新Subscriber。
經(jīng)過變換之后,我們擁有兩個(gè)Observable(OnSubscribe),也擁有兩個(gè)Subscriber.
那我們想要切換事件處理的線程怎么辦呢?我們可以在Operator中生成新的Subscriber的時(shí)候進(jìn)行處理,在newSubscriber和原始subscriber進(jìn)行映射的時(shí)候進(jìn)行切換,所以可以知道observeOn()切換的是所有它下游的線程。
所以如果我們想要切換事件發(fā)生的線程,會(huì)怎么辦呢?根據(jù)上面的代碼,可以知道,只需要讓最后的onSubscribe.call(newSubscriber)運(yùn)行在新線程就可以了。從這句代碼也可以看出,subscribeOn()方法切換的是它的上游線程,這種線程切換一直會(huì)影響到最原始的observable。
但是如果在一條鏈?zhǔn)秸{(diào)用中出現(xiàn)了多個(gè)subscribeOn()方法,由于鏈?zhǔn)秸{(diào)用最上游的第一個(gè)subscribeOn方法會(huì)直接影響到最原始的observable,而在接下來的的鏈?zhǔn)秸{(diào)用中消息的發(fā)送是有newSubscriber來控制的,所以第二個(gè)subscribeOn方法不會(huì)影響線程的切換。
多說無益,直接上代碼
Observable.just("hello","world","rxjava","rxandroid")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//此處由于受到下面第一個(gè)subscribeOn的影響,輸出RxComputationScheduler-1
Log.v("chicodong","thread 1 is: "+Thread.currentThread().getName());
return s.toUpperCase();
}
})
.subscribeOn(Schedulers.computation())
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
//此處由于上一個(gè)subscribeOn切換過線程,新生成的subscriber是在RxComputationScheduler-1發(fā)送事件的,所以仍然輸出RxComputationScheduler-1
Log.v("chicodong","thread 2 is: "+Thread.currentThread().getName());
return s.length();
}
})
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
//此處由于上一個(gè)subscribeOn切換過線程,新生成的subscriber是在RxComputationScheduler-1發(fā)送事件的,所以仍然輸出RxComputationScheduler-1
Log.v("chicodong","thread 3 is: "+Thread.currentThread().getName());
return integer+100;
}
})
//此處的第二個(gè)subscribeOn相當(dāng)于沒有起作用
.subscribeOn(Schedulers.io())
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
//此處由于上一個(gè)subscribeOn切換過線程,新生成的subscriber是在RxComputationScheduler-1發(fā)送事件的,所以仍然輸出RxComputationScheduler-1
Log.v("chicodong","thread 4 is: "+Thread.currentThread().getName());
return integer.toString();
}
})
.observeOn(Schedulers.newThread())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//此處受到obServeOn的影響,輸出RxNewThreadScheduler-1
Log.v("chicodong","thread 5 is: "+Thread.currentThread().getName());
return s+" lalala";
}
})
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//此處受到第二個(gè)observeOn的影響,輸出main
Log.v("chicodong","thread 6 is: "+Thread.currentThread().getName());
return s.toUpperCase();
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//此處受到第二個(gè)observeOn的影響,輸出main
Log.v("chicodong","final result is "+s+" thread is "+Thread.currentThread().getName());
}
});
雖然多余一個(gè)subscribeOn對于線程切換沒有影響,但是它可以在事件還沒有發(fā)生時(shí)起作用,最常見的就是doOnSubscribe()方法了
Observable.from(list)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0(){
public void call(){
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidScheduler.mainThread())
.subscribe(new Action1(){
public void call(Object obj){
}
});
上面的代碼中第二個(gè)subscribeOn對于線程切換沒有影響,但是卻可以使doOnSubscribe()運(yùn)行在主線程中。