寫給自己的RxJava — 線程切換

線程切換

只要使用RxJava肯定對下面的代碼特別熟悉

Observable.from(list)
  .subcribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Action1(){
      public void call(Object obj){
          
      }
  });

其中subscribeOn()可以把事件發(fā)生的線程切換到io線程,observeOn()可以把處理事件的線程切換到Android應(yīng)用程序主線程。

那他是怎么做到這么簡潔的切換呢?

Rxjava的所有變換都基于一個(gè)lift模型,我們接下來介紹一下這個(gè)模型。

首先回顧一下Observable通知Subscriber的原理。

在生成Observable的時(shí)候我們會(huì)傳入一個(gè)OnSubscribe的實(shí)例,在發(fā)生訂閱關(guān)系subscribe方法中,OnSubscribe實(shí)例的call(Subscriber)方法中就會(huì)調(diào)用傳入的Subscriber的的相關(guān)方法,從而實(shí)現(xiàn)通知,消息發(fā)送。

流程大概如下

Observable—(實(shí)例化時(shí))—>OnSubscribe—(訂閱關(guān)系發(fā)生subscribe時(shí)) —>Subscriber

那首先考慮一下我們怎么實(shí)現(xiàn)在Observable中發(fā)送的是 圓形 事件,但是在Subscriber中接收到 方形事件并處理呢。

如果Observable和Subscriber的事件都不一樣,這都不能發(fā)生訂閱關(guān)系的,因?yàn)樵诰幾g檢查的時(shí)候就無法通過。

RxJava采用的方法是提供一個(gè)變換方法lift(Operater),該方法返回一個(gè)Observable對象。該Observable對象會(huì)發(fā)送方形 事件,這樣就可以用這個(gè)新的Observable對象來訂閱原始的Subscriber了。

在新生成的Obaservable對象的時(shí)候,我們也會(huì)生成該Observable對應(yīng)的OnSubscribe對象,并實(shí)現(xiàn)新的OnSubscribe對象的call(Subscriber)方法。由于我們用新生成的Observable對象去訂閱原始的Subscriber,所以新生成的OnSubscribe的call方法中的參數(shù)就是原始的Subscriber了。

接下來就是lift(Operator)中Operator接口發(fā)揮作用的時(shí)候了,這個(gè)Operator接口的call方法實(shí)現(xiàn)把一個(gè)Subscriber變換成另一個(gè)Subscrber的功能。

在這里就是把原始的響應(yīng)方形事件的Subscriber轉(zhuǎn)換成響應(yīng)圓形事件的Subscriber,這樣就可以調(diào)用原始的OnSubscribe的call方法,把圓形事件變成發(fā)送給這個(gè)新的響應(yīng)圓形事件的Subscriber.

這樣新的Observable中就同時(shí)包含了:

  • 原始的OnSubscribe(它能發(fā)送圓形事件)
  • 新的Subscriber(它能接受圓形事件)
  • 原始的Subscriber(它能接受 方形事件)

這樣,在發(fā)生訂閱關(guān)系時(shí),原始的Observable(原始的OnSubscribe)會(huì)發(fā)送圓形事件給新的Subscriber,新的Subscriber在處理的時(shí)候,就把這個(gè)事件轉(zhuǎn)換一下傳遞給原始的Subscriber。

基本流程就是這樣,Observable.Operator就是實(shí)現(xiàn)新老Subscriber關(guān)聯(lián)的紐帶。

借用扔物線的snippet

//這個(gè)lift的作用就是把只發(fā)送圓形事件的Observable轉(zhuǎn)換成發(fā)送方形事件的Observable
public <Rectangle> Observable<Rectangle> lift(Operator<? extends Rectangle, ? super Round> operator) {
    return Observable.create(new OnSubscribe<Rectangle>() {
        @Override
       //這個(gè)參數(shù)subscriber是原始的subscriber,只接受方形事件
        public void call(Subscriber subscriber) {
          
          //這個(gè)newSubscriber是operator生成的新的Subscriber,它可以接收圓形事件
          //新的subscriber會(huì)調(diào)用原始的sunscriber的相關(guān)方法
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
          
          //這個(gè)onSubscribe是原始的OnSubscribe,它發(fā)送圓形事件
          //所以可以用新生成的newSubscriber來接收
            onSubscribe.call(newSubscriber);
        }
    });
}

記不住以上內(nèi)容也沒有關(guān)系,只要記住在變換中,會(huì)成成一個(gè)新的Observable和Subscriber就可以了,我們在變換完之后所進(jìn)行的操作都是針對新生成的Observable和新Subscriber。

經(jīng)過變換之后,我們擁有兩個(gè)Observable(OnSubscribe),也擁有兩個(gè)Subscriber.

那我們想要切換事件處理的線程怎么辦呢?我們可以在Operator中生成新的Subscriber的時(shí)候進(jìn)行處理,在newSubscriber和原始subscriber進(jìn)行映射的時(shí)候進(jìn)行切換,所以可以知道observeOn()切換的是所有它下游的線程。

所以如果我們想要切換事件發(fā)生的線程,會(huì)怎么辦呢?根據(jù)上面的代碼,可以知道,只需要讓最后的onSubscribe.call(newSubscriber)運(yùn)行在新線程就可以了。從這句代碼也可以看出,subscribeOn()方法切換的是它的上游線程,這種線程切換一直會(huì)影響到最原始的observable。

但是如果在一條鏈?zhǔn)秸{(diào)用中出現(xiàn)了多個(gè)subscribeOn()方法,由于鏈?zhǔn)秸{(diào)用最上游的第一個(gè)subscribeOn方法會(huì)直接影響到最原始的observable,而在接下來的的鏈?zhǔn)秸{(diào)用中消息的發(fā)送是有newSubscriber來控制的,所以第二個(gè)subscribeOn方法不會(huì)影響線程的切換。

多說無益,直接上代碼

        Observable.just("hello","world","rxjava","rxandroid")
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                      //此處由于受到下面第一個(gè)subscribeOn的影響,輸出RxComputationScheduler-1
                        Log.v("chicodong","thread 1 is: "+Thread.currentThread().getName());
                        return s.toUpperCase();
                    }
                })
                .subscribeOn(Schedulers.computation())
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                      //此處由于上一個(gè)subscribeOn切換過線程,新生成的subscriber是在RxComputationScheduler-1發(fā)送事件的,所以仍然輸出RxComputationScheduler-1
                        Log.v("chicodong","thread 2 is: "+Thread.currentThread().getName());
                        return s.length();
                    }
                })
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer) {
                      //此處由于上一個(gè)subscribeOn切換過線程,新生成的subscriber是在RxComputationScheduler-1發(fā)送事件的,所以仍然輸出RxComputationScheduler-1
                        Log.v("chicodong","thread 3 is: "+Thread.currentThread().getName());
                        return integer+100;
                    }
                })
                //此處的第二個(gè)subscribeOn相當(dāng)于沒有起作用
                .subscribeOn(Schedulers.io())
                .map(new Func1<Integer, String>() {

                    @Override
                    public String call(Integer integer) {
                      //此處由于上一個(gè)subscribeOn切換過線程,新生成的subscriber是在RxComputationScheduler-1發(fā)送事件的,所以仍然輸出RxComputationScheduler-1
                        Log.v("chicodong","thread 4 is: "+Thread.currentThread().getName());
                        return integer.toString();
                    }
                })
                .observeOn(Schedulers.newThread())
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                      //此處受到obServeOn的影響,輸出RxNewThreadScheduler-1
                        Log.v("chicodong","thread 5 is: "+Thread.currentThread().getName());
                        return s+" lalala";
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                      //此處受到第二個(gè)observeOn的影響,輸出main
                        Log.v("chicodong","thread 6 is: "+Thread.currentThread().getName());
                        return s.toUpperCase();
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                      //此處受到第二個(gè)observeOn的影響,輸出main
                        Log.v("chicodong","final result is "+s+" thread is "+Thread.currentThread().getName());
                    }
                });

雖然多余一個(gè)subscribeOn對于線程切換沒有影響,但是它可以在事件還沒有發(fā)生時(shí)起作用,最常見的就是doOnSubscribe()方法了

Observable.from(list)
  .subscribeOn(Schedulers.io())
  .doOnSubscribe(new Action0(){
      public void call(){
          
      }
  })
  .subscribeOn(AndroidSchedulers.mainThread())
  .observeOn(AndroidScheduler.mainThread())
  .subscribe(new Action1(){
      public void call(Object obj){
          
      }
  });

上面的代碼中第二個(gè)subscribeOn對于線程切換沒有影響,但是卻可以使doOnSubscribe()運(yùn)行在主線程中。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容