Rxjava

以下都是本人收集和總結(jié)的內(nèi)容:

1. 什么是Rxjava

Rx含義

ReactiveX是Reactive Extensions的縮寫,一般簡寫為Rx。微軟給的定義是,Rx是一個函數(shù)庫,讓開發(fā)者可以利用可觀察序列和LINQ風(fēng)格查詢操作符來編寫異步和基于事件的程序,使用Rx,開發(fā)者可以用Observables表示異步數(shù)據(jù)流,用LINQ操作符查詢異步數(shù)據(jù)流, 用Schedulers參數(shù)化異步數(shù)據(jù)流的并發(fā)處理,Rx可以這樣定義:Rx = Observables + LINQ + Schedulers。

Rxjava含義

簡單而言:RxJava就是一種用Java語言實現(xiàn)的響應(yīng)式編程,來創(chuàng)建基于事件的異步程序

2. Rxjava理解與擴展(觀察者模式)

簡單概括就是,觀察者(Observer)需要在被觀察者(Observable)變化的一瞬間做出反應(yīng)。

而兩者通過注冊(Register)或者訂閱(Subscribe)的方式進行綁定。

觀察者模式

其中這個Button就是被觀察者(Observable),OnClickListener就是觀察者(Observer),兩者通過setOnClickListener達成訂閱(Subscribe)關(guān)系,之后當Button產(chǎn)生OnClick事件的時候,會直接發(fā)送給OnClickListener,它做出相應(yīng)的響應(yīng)處理。

而RxJava的觀察者模式呢,跟這個差不多,但是也有幾點差別:

  • Observer與Observable是通過 subscribe() 來達成訂閱關(guān)系。
  • RxJava中事件回調(diào)有三種:onNext() 、 onCompleted() 、 onError() 。
  • 如果一個Observerble沒有任何的Observer,那么這個Observable是不會發(fā)出任何事件的。

關(guān)于RxJava的回調(diào)事件:

onNext():基本事件。
onCompleted(): 事件隊列完結(jié)。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規(guī)定,當不會再有新的 onNext() 發(fā)出時,需要觸發(fā) onCompleted() 方法作為標志。
onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發(fā),同時隊列自動終止,不允許再有事件發(fā)出。
值得注意的是在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,并且是事件序列中的最后一個。如果在隊列中調(diào)用了其中一個,就不應(yīng)該再調(diào)用另一個。

好了,那我們也附一張圖對比一下吧:


觀察者模式

3. 如何實現(xiàn)RxJava

3.1創(chuàng)建Observer

在Java中,一想到要創(chuàng)建一個對象,我們馬上就想要new一個。沒錯,這里我們也是要new一個Observer出來,其實就是實現(xiàn)Observer的接口,注意String是接收參數(shù)的類型:

//創(chuàng)建
Observer<String> observer = new Observer<String>() {
@Override 
 public void onNext(String s)  { 
     Log.i("onNext ---> ", "Item: " + s);
   }
 @Override 
public void onCompleted() {
     Log.i("onCompleted ---> ", "完成"); 
  } 
@Override 
public void onError(Throwable e) {
     Log.i("onError ---> ", e.toString()); 
  }
};

當然這里也要提另外一個接口:Subscriber ,它跟Observer接口幾乎完全一樣,只是多了兩個方法,總結(jié):

  • onStart(): 它會在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準備工作,例如數(shù)據(jù)的清零或重置。這是一個可選方法,默認情況下它的實現(xiàn)為空。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行), onStart() 就不適用了,因為它總是在 subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程。

  • unsubscribe(): 用于取消訂閱。在這個方法被調(diào)用后,Subscriber 將不再接收事件。一般在這個方法調(diào)用前,可以使用 isUnsubscribed() 先判斷一下狀態(tài)。 要在不再使用的時候盡快在合適的地方(例如 onPause() onStop() 等方法中)調(diào)用 unsubscribe() 來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生。

雖然多了兩個方法,但是基本實現(xiàn)方式跟Observer是一樣的,所以暫時可以不考慮兩者的區(qū)別。不過值得注意的是

實質(zhì)上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉(zhuǎn)換成一個 Subscriber 再使用。

3.2創(chuàng)建Observable

與Observer不同的是,Observable是通過 create() 方法來創(chuàng)建的。注意String是發(fā)送參數(shù)的類型:

//創(chuàng)建
Observable observable = Observable.create(new  Observable.OnSubscribe<String>() { 
  @Override 
  public void call(Subscriber<? super String> subscriber) { 
        subscriber.onNext("Hello"); 
        subscriber.onNext("World");
        subscriber.onCompleted();
    }
});

3.3訂閱(Subscribe)

在之前,我們創(chuàng)建了 Observable 和 Observer ,現(xiàn)在就需要用 subscribe() 方法來將它們連接起來,形成一種訂閱關(guān)系:

//訂閱
observable.subscribe(observer);

這里其實確實有點奇怪,為什么是Observable(被觀察者)訂閱了Observer(觀察者)呢?其實我們想一想之前Button的點擊事件:

Button.setOnClickListener(new View.OnClickListener())

Button是被觀察者,OnClickListener是觀察者,setOnClickListener是訂閱。我們驚訝地發(fā)現(xiàn),也是被觀察者訂閱了觀察者,所以應(yīng)該是一種流式API的設(shè)計吧,也沒啥影響。

完整代碼如下:

//創(chuàng)建Observable
Observable observable = Observable.create(new  Observable.OnSubscribe<String>() { 
  @Override 
  public void call(Subscriber<? super String> subscriber) { 
        subscriber.onNext("Hello"); 
        subscriber.onNext("World");
        subscriber.onCompleted();
    }
});

//創(chuàng)建Observe
Observer<String> observer = new Observer<String>() {
@Override 
 public void onNext(String s)  { 
     Log.i("onNext ---> ", "Item: " + s);
   }
 @Override 
public void onCompleted() {
     Log.i("onCompleted ---> ", "完成"); 
  } 
@Override 
public void onError(Throwable e) {
     Log.i("onError ---> ", e.toString()); 
  }
};

//訂閱
observable.subscribe(observer);

運行的結(jié)果如下,可以看到Observable中發(fā)送的String已經(jīng)被Observer接收并打印了出來:

運行結(jié)果

3.4線程控制——Scheduler

Scheduler是RxJava的精髓之一了。

在RxJava中,Scheduler相當于線程控制器,可以通過它來指定每一段代碼運行的線程。
RxJava已經(jīng)內(nèi)置了幾個Scheduler,總結(jié):

  1. Schedulers.immediate(): 直接在當前線程運行,相當于不指定線程。這是默認的Scheduler。

  2. Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。

  3. Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的Scheduler。行為模式和newThread()差不多,區(qū)別在于io()的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下io()比newThread()更有效率。不要把計算工作放在io()中,可以避免創(chuàng)建不必要的線程。

  4. Schedulers.computation(): 計算所使用的Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個Scheduler使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在computation()中,否則 I/O 操作的等待時間會浪費 CPU。

  5. AndroidSchedulers.mainThread(),Android專用線程,指定操作在主線程運行。

那我們?nèi)绾吻袚Q線程呢?RxJava中提供了兩個方法:subscribeOn()observeOn() ,兩者的不同點在于:

subscribeOn(): 指定subscribe()訂閱所發(fā)生的線程,即 call() 執(zhí)行的線程?;蛘呓凶鍪录a(chǎn)生的線程。
observeOn(): 指定Observer所運行在的線程,即onNext()執(zhí)行的線程?;蛘呓凶鍪录M的線程。

這里確實不好理解,沒關(guān)系,下面我們在具體例子中觀察現(xiàn)象。

//創(chuàng)建被觀察者
Observable.create(new Observable.OnSubscribe<Bitmap>() {
    /**
    * 復(fù)寫call方法
    *
    * @param subscriber 觀察者對象
    */
    @Override
    public void call(Subscriber<? super Bitmap> subscriber) {
        //通過URL得到圖片的Bitmap對象
        Bitmap bitmap = GetBitmapForURL.getBitmap(url);
        //回調(diào)觀察者方法
        subscriber.onNext(bitmap);
        subscriber.onCompleted();
        Log.i(" call ---> ", "運行在 " + Thread.currentThread().getName() + " 線程");
    }
})
.subscribeOn(Schedulers.io()) // 指定subscribe()發(fā)生在IO線程
.observeOn(AndroidSchedulers.mainThread()) // 指定Subscriber的回調(diào)發(fā)生在UI線程
.subscribe(new Observer<Bitmap>() {   //訂閱觀察者(其實是觀察者訂閱被觀察者)

    @Override
    public void onNext(Bitmap bitmap) {
        mainImageView.setImageBitmap(bitmap);
        Log.i(" onNext ---> ", "運行在 " + Thread.currentThread().getName() + " 線程");
    }

    @Override
    public void onCompleted() {
        mainProgressBar.setVisibility(View.GONE);
        Log.i(" onCompleted ---> ", "完成");
    }

    @Override
    public void onError(Throwable e) {
        Log.e(" onError --->", e.toString());
    }
 });

現(xiàn)在來看一下運行的Log日志:

Log

可以看到,call方法(事件產(chǎn)生)執(zhí)行在IO線程,而onNext方法(事件消費)執(zhí)行在main線程。說明之前分析的是對的。

3.5操作符

所謂操作符(Operators),簡單來說就是一種指令,表示需要執(zhí)行什么樣的操作。Rx中的每種編程語言實現(xiàn)都實現(xiàn)了一組操作符的集合。RxJava也不例外。

RxJava中有大量的操作符,比如創(chuàng)建操作符、變換操作符、過濾操作符等等,這些操作符要全部講解完幾乎是不可能也沒必要的事情。所以我們只介紹常見的、有用的、重要的操作符。其他的如果用到直接到文檔查找就行了。

下面就針對前篇文章的創(chuàng)建(create)來說明一下另外兩種常見的創(chuàng)建操作符。

Observable.just()
首先給出定義:

Just操作符是創(chuàng)建一個將參數(shù)依次發(fā)送出來的Observable

具體一點來說就是, just() 中會接收1~9個參數(shù),它會返回一個按照傳入?yún)?shù)的順序依次發(fā)送這些參數(shù)的Observable。

這樣說可能還是不夠清晰,所以畫個圖來看:

JUST流程圖

從圖中可以看出,其實就是依次發(fā)送單個數(shù)據(jù),它的具體寫法是這樣的,非常簡單:

Observable.just("Hello","world");
//其實就相當于依次調(diào)用:
//subscriber.onNext("Hello");
//subscriber.onNext("World");

但是這里要注意一點,如果你傳遞null給just,它會返回一個發(fā)送null值的Observable,而不是返回一個空Observable(完全不發(fā)送任何數(shù)據(jù)的Observable)。后面會講到,如果需要空Observable應(yīng)該使用 Empty 操作符。
現(xiàn)在來看完整的代碼,代碼本身很簡單,注意看Log日志:

//創(chuàng)建Observable
Observable.just("Hello", "World", null) .subscribe(new Observer<String>() {
  @Override 
  public void onNext(String s) {
     if (s == null) {
         Log.i("onNext ---> ", "null"); 
     }else {
         Log.i("onNext ---> ", s); 
     }
  } 
  @Override 
  public void onCompleted() { 
    Log.i("onCompleted ---> ", "完成"); 
  }
  @Override 
  public void onError(Throwable e) {
     Log.i("onError ---> ", "出錯 --->" + e.toString()); 
  }
});
log

這里因為我們要打印字符串,所以不能為null,我就處理了一下,可以看到當發(fā)送 null 的時候,s確實等于null。

Observable.from()
盡管與just一樣是創(chuàng)建操作符,但是from操作符稍微強大點。因為from操作符的作用是:

將傳入的數(shù)組或 Iterable 拆分成具體對象后,依次發(fā)送出來。

注意,這里不再是發(fā)送單個對象,而是直接發(fā)送一組對象。為了與just對比,也來畫個圖描述一下:

from流程圖

它的具體寫法是這樣的,也非常簡單:

String[] str = new String[]{"Hello", "World"};
//創(chuàng)建Observable
Observable.from(str);

4. 結(jié)合Rxjava源碼深度學(xué)習(xí)

基礎(chǔ)源碼

實現(xiàn)RxJava的代碼,這里我打上了Log日志,來看一下每個方法執(zhí)行的順序。

//創(chuàng)建Observable
Observable.create(new Observable.OnSubscribe<String>() { 
  @Override 
  public void call(Subscriber<? super String> subscriber) {
    subscriber.onNext("Hello"); 
    subscriber.onNext("World"); 
    subscriber.onCompleted();
    Log.i("執(zhí)行順序 ---> ", " call "); 
   }
 }).subscribe(new Observer<String>() {
  @Override 
  public void onNext(String s) { 
    Log.i("onNext ---> ", s); 
    Log.i("執(zhí)行順序 ---> ", " subscribe onNext"); 
  } 
  @Override 
  public void onCompleted() {
     Log.i("onCompleted ---> ", "完成"); 
     Log.i("執(zhí)行順序 ---> ", " subscribe onCompleted"); 
  } 
  @Override 
  public void onError(Throwable e) { 
    Log.i("onError ---> ", "出錯 --->" + e.toString()); 
  }
});

好了,來看一下Log日志:

執(zhí)行l(wèi)og

從圖中可以看到,subscribe方法先執(zhí)行,等執(zhí)行完成后再執(zhí)行call方法。

好了,這就是結(jié)論。先在腦子里產(chǎn)生個印象,方便后面追溯。

4.1 create()

進入Observable的create()方法做了些什么:

public class Observable<T> {
  .....省略代碼......
 static final RxJavaObservableExecutionHook hook = 
              RxJavaPlugins.getInstance().getObservableExecutionHook();
  public static <T> Observable<T> create(OnSubscribe<T> f) {
     return new Observable<T>(hook.onCreate(f));
  }
  .....省略代碼......
}

直接返回一個 Observable,接下里繼續(xù)看看它的構(gòu)造函數(shù):

public class Observable<T> {
  .....省略代碼......
  final OnSubscribe<T> onSubscribe;
  protected Observable(OnSubscribe<T> f) {
     this.onSubscribe = f;
  }
  .....省略代碼......
}

返回繼續(xù)查看 hook.onCreate(f)hook 是什么鬼?

hook是一個代理對象, 僅僅用作調(diào)試的時候可以插入一些測試代碼。如單元測試

static final RxJavaObservableExecutionHook hook = 
         RxJavaPlugins.getInstance().getObservableExecutionHook();

繼續(xù)查看hook.onCreate(f)

public abstract class RxJavaObservableExecutionHook {
  .....省略代碼......
  public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
     return f;
  }
  .....省略代碼......
}

直接把OnSubscribe 這個對象返回了一下。

創(chuàng)建時做了三件事情:
返回了一個Observable(假設(shè)為ObservableA)
返回了一個OnSubscribe(假設(shè)為OnSubscribeA)
把返回的OnSubscribeA在ObservableA構(gòu)造函數(shù)中保存為ObservableA的 .onSubscribe 屬性

create創(chuàng)建流程圖

create()方法創(chuàng)建了一個Observable,且在這個Observable中有個OnSubscribe。

所以就畫個簡圖就如下圖所示這樣:

create簡圖

4.2 subscribe()

subscribe() 這個是將觀察者(Observer)與被觀察者(Observable)聯(lián)系到一起的操作,也就是產(chǎn)生一種訂閱(Subcribe)關(guān)系。

先查看源碼:

public class Observable<T> {
  .....省略代碼......
  public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        return subscribe(new Subscriber<T>() {

            @Override
            public void onCompleted() {
                observer.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                observer.onError(e);
            }

            @Override
            public void onNext(T t) {
                observer.onNext(t);
            }

        });
    }
  .....省略代碼......
}

實質(zhì)上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉(zhuǎn)換成一個 Subscriber 再使用。

在這里就能夠看出,首先 if 中的語句意思是如果這個Observer已經(jīng)是Subscriber類型,那就直接返回。如果不是的話 new了一個Subscriber ,再點進去看看:

public abstract class Subscriber<T> implements Observer<T>, Subscription {
   .....省略代碼......
}

果然,它還是轉(zhuǎn)成了Subscriber類型,剛好印證了之前的話。所以為了方便起見,之后文章中,所有的觀察者(Observer)我都用Subscriber來代替。
繼續(xù)看 subscribe 源碼:

public class Observable<T> {
   .....省略代碼......
  static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
  final OnSubscribe<T> onSubscribe;
  public final Subscription subscribe(Subscriber<? super T>  subscriber) { 
    return Observable.subscribe(subscriber, this);
  }
  private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
   .....省略代碼......
    hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
   .....省略代碼......
  }
   .....省略代碼......
}

把一些暫時無關(guān)的代碼省略掉來看,其實就是執(zhí)行了一句 hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); 。
而這個 hook.onSubscribeStart 方法再點進去看看:

public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
    // pass through by default 
    return onSubscribe; 
}

可以看到,竟然直接返回了一個 onSubscribe ,由于之前說過這個hook沒什么作用,直接刪掉,那就等于整個 subscribe 做了一件事就是 onSubscribe.call(subscriber) ,當然這個call里面的參數(shù)subscriber是我們代碼中傳遞進去的。

而onSubscribe在create源碼解析中我們已經(jīng)知道是新建 ObservableA 的一個屬性,所以總結(jié)來說,subscribe()方法做的事情就是這樣:

ObservableA.onSubscribe.call(subscriber);

而調(diào)用 call方法,就是調(diào)用傳入的參數(shù)subscriber的onNext/onCompleted/onError方法。這就是全部的過程。依然畫個圖來說,圖中省略了create中的創(chuàng)建步驟:

使用過程

結(jié)合圖我們最后再順一下思路:

首先創(chuàng)建過程也就是create()方法中創(chuàng)建了一個Observable,并有一個onSubscribe屬性;
其次在訂閱過程也就是subscribe()方法中,調(diào)用了create()方法中創(chuàng)建的Observable的onSubscribe屬性的call方法;
最后這個call回調(diào)的就是代碼中創(chuàng)建的Subscriber的onNext/onCompleted/onError方法。
之前Log日志可以看出,將onNext與onCompleted方法執(zhí)行完后,call方法才結(jié)束。這也印證了call方法回調(diào)Subscriber的方法這一說。

4.3 map

4.3.1map使用流程

Observable.create(new Observable.OnSubscribe<Integer>() {  
 @Override    
public void call(Subscriber<? super Integer> subscriber) {        
  subscriber.onNext(1);        
  subscriber.onCompleted();    
}
}).map(new Func1<Integer, String>() {   
   @Override  
   public String call(Integer integer) {       
       return String.valueOf(integer);    
  }
}).subscribe(new Subscriber<String>() { 
   @Override    
public void onCompleted() {          
  }   
 @Override    
public void onError(Throwable e) {    
}    
@Override    
public void onNext(String s) {   
 }
});

并且回顧Observable.create過程

final OnSubscribe<T> onSubscribe;
protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }
 
 public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) { 
        //直接返回
        return f;
    }

4.3.2map源碼

這里比較神奇的地方是這個 map,其實 map 實際上做了兩件大事:

  • (第一件)new 了一個 變形函數(shù), 保存在了 OperatorMap.transform
    查看map源碼
public class Observable<T> {
   public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }
}

點擊繼續(xù)查看OperatorMap

public final class OperatorMap<T, R> implements Operator<R, T> {
 final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }
}

可以看出就是把new Func1

...省略代碼...
map(new Func1<Integer, String>() {   
   @Override  
   public String call(Integer integer) {       
       return String.valueOf(integer);    
  }
 ...省略代碼...

保存在了transformer中去。

  • (第二件)new了一個新的 Observable. 這個 Observable 的構(gòu)造函數(shù)中, 傳入了一個新的 OnSubscribe. 整個 lift 函數(shù)的難點就在于這個 OnSubscribe 對象中. 我們仔細看一下它做了什么. 它其實也做了兩件大事兒:
    進入lift函數(shù)
public class Observable<T> {
 .....省略代碼.......
 final OnSubscribe<T> onSubscribe;
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
 
 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        //新的Observable
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                  //hook.onLift(operator).call(o)創(chuàng)建了一個新的 Subscriber   
                        //(實際上是一個 proxy)并調(diào)用了OperatorMap中的Subscriber.onNext
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators 
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        Exceptions.throwIfFatal(e);
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }
    .....省略代碼.......
}

繼續(xù)進入OnSubscribe.call 函數(shù)中, 看一下源碼:

public final class OperatorMap<T, R> implements Operator<R, T> {
....省略代碼.....
  final Func1<? super T, ? extends R> transformer;
 @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }
          //函數(shù)transform執(zhí)行對參數(shù)t進行變形然
                //后將變形結(jié)果轉(zhuǎn)發(fā)給o.onNext
            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }

        };
    } 
  ....省略代碼.....
}
  • hook.onLift(operator).call(o)創(chuàng)建了一個新的 Subscriber (實際上是一個 proxy), 并在Subscriber.onNext中調(diào)用transform函數(shù)對參數(shù)t進行變形, 然后將變形結(jié)果轉(zhuǎn)發(fā)給o.onNext`. 這么上面的變量o是哪里的,

  • OnSubscribe.call 調(diào)用了 4.3.1中create 創(chuàng)建出來的 Observable.onSubscribe 函數(shù)!

17.png

很簡單, 該變形函數(shù)保存在了 OperatorMap.transform 中.

**總結(jié)一下 map 的行為: **

  1. 創(chuàng)建了一個新的 Observable,
  2. 創(chuàng)建了一個新的 OnSubscribe: 其中的 call 方法是整個調(diào)用鏈的關(guān)鍵. 它調(diào)用了上一級 Observable.onSubscribe.call, 同時, 還將結(jié)果通過 transform 對 4.3.1處理后的結(jié)果進行變形。

3.subscribe 觸發(fā)整個回調(diào)流程. 我們來看一下主要流程

refactor.png

這一步也很簡單, 就是通過 Observable.subscribe 調(diào)用該對象的 Observable.onSubscribe.call 方法, 然后經(jīng)過一系列調(diào)用, 最終由該對象內(nèi)部臨時創(chuàng)建的 Subscriber 對象(上文中的 proxy 對象) 調(diào)用用戶目標 Subscriber (即代碼中 .subscribe(…) 中的參數(shù)) 的方法.

4.4 Schedulers

.subscribeOn(Schedulers.io())和.observeOn(Schedulers.computation())應(yīng)用后的原理.

4.4.1基本使用

Observable.create(new Observable.OnSubscribe<Integer>() {  
 @Override    
public void call(Subscriber<? super Integer> subscriber) {        
  subscriber.onNext(1);        
  subscriber.onCompleted();    
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() { 
   @Override    
public void onCompleted() {          
  }   
 @Override    
public void onError(Throwable e) {    
}    
@Override    
public void onNext(String s) {   
 }
});

我們來拆解一下 .subscribeOn.observeOn 的作用范圍:

24.png
  • subscribeOn 將作用于 create 中的 OnSubscribe.call() 方法.
  • observeOn 作用于其語法中下一語句的 Subscriber.onNext 等函數(shù)中.

首先分析 subscribeOn

public class Observable<T> {
 public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
         //創(chuàng)建了一個 Observable 來轉(zhuǎn)發(fā) OnSubscribe.call 請求
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }
}

map 一樣, 是通過創(chuàng)建了一個 Observable 來轉(zhuǎn)發(fā) OnSubscribe.call 請求(代碼中的 OperatorSubscribeOn 繼承自 OnSubscribe. 來看看具體實現(xiàn)

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        //這里使用了Worker.schedule方法改變了source.call()方法執(zhí)行的線程
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };
                
                source.unsafeSubscribe(s);
            }
        });
    }
}
}

可見, 該函數(shù)中做了如下兩件事:

  1. 創(chuàng)建一個用于在不同線程執(zhí)行的 Worker 對象(代碼中的 inner)
  2. 使用上述 inner 在該對象所代表的線程中執(zhí)行 Observable.onSubscribe.call 方法(代碼中的 source.unsafeSubscribe(s);

再來分析 observeOn

   public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
         //返回一個新Observable對象
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }```


  繼續(xù)查看OperatorObserveOn對象及OperatorObserveOn.call` 方法是如何生成 `st` 對象的

public final class OperatorObserveOn<T> implements Operator<T, T> {

private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;


public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
    if (scheduler instanceof ImmediateScheduler) {
        // avoid overhead, execute directly
        return child;
    } else if (scheduler instanceof TrampolineScheduler) {
        // avoid overhead, execute directly
        return child;
    } else {
        ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
        parent.init();
        return parent;
    }
}

}

該方法會根據(jù) `scheduler` 的類型決定返回什么樣的`Subscriber` 對象. 可見, 如果 child 類型為 `ImmediateScheduler` 或者 `TrampolineScheduler` 等以當前線程為執(zhí)行環(huán)境的類型, 則直接返回 `child` 對象. 本例中, `child` 為 `NewThreadScheduler`, 因此將通過 `ObserveOnSubscriber` 對 `child` 進行包裝. 生成一個 proxy subscriber 對象.

返回來繼續(xù)查看

public class Observable<T> {
final OnSubscribe<T> onSubscribe;
//這段代碼熟悉吧
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
}



繼續(xù)查看OnSubscribeLift
```public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
    static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            //調(diào)用并且切換線程
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                st.onStart();
                //熟悉.....
                parent.call(st);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            o.onError(e);
        }
    }
}

至此, 我們可以知道 observeOn 是通過以下方法對其后面的 Subscriber 進行控制的:

  1. lift -> OnSubscribe.call -> proxy subscriber = new Subscriber(original subscriber) 創(chuàng)建了一個新的 Subscriber(實際上是個代理)
  2. 在上述 proxy subscriber 中對 original subscriber 對象的執(zhí)行進行轉(zhuǎn)發(fā). 轉(zhuǎn)發(fā)過程中, proxy subscriber 完全可以自由的控制 original subscriber 執(zhí)行的線程.
l1.png

5. 在項目中Rxjava應(yīng)用

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容