一.項(xiàng)目引入依賴
在開始之前,首先給出GitHub上的鏈接
Github
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
引入依賴
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
二.RxJava是什么
RxJava是一個(gè)在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫。
RxJava本質(zhì)上是一個(gè)實(shí)現(xiàn)異步操作的庫
三.RxJava的優(yōu)勢
簡潔,在程序邏輯變得越來越復(fù)雜時(shí),可以保持程序的簡潔。
四.API 介紹和原理
觀察者模式
RxJava 的異步實(shí)現(xiàn),是通過一種擴(kuò)展的觀察者模式來實(shí)現(xiàn)的。
觀察者設(shè)計(jì)模式:對象間的一種一對多的依賴關(guān)系,以便一個(gè)對象的狀態(tài)發(fā)生變化時(shí),所有依賴于它的對象都得到通知并自動(dòng)刷新。
RxJava 的觀察者模式
RxJava 有四個(gè)基本概念:Observable (被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。被觀察者和觀察者通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系,從而被觀察者可以在需要的時(shí)候發(fā)出事件來通知觀察者。
RxJava 的事件回調(diào)方法 onNext()、onCompleted() 和 onError()。
- onNext():RxJava把每個(gè)事件單獨(dú)處理,當(dāng)一個(gè)事件隊(duì)列完成后,會(huì)調(diào)用onNext()方法。
- onCompleted(): 事件隊(duì)列結(jié)束,當(dāng)不會(huì)再有新的 onNext() 發(fā)出時(shí),需要觸發(fā) onCompleted() 方法作為完成標(biāo)志。
- onError(): 事件隊(duì)列異常。在事件處理過程中出異常時(shí),onError() 會(huì)被觸發(fā),同時(shí)隊(duì)列自動(dòng)終止,不允許再有事件發(fā)出。
注意:onCompleted() 和 onError() 二者也是互斥的,即在隊(duì)列中調(diào)用了其中一個(gè),就不應(yīng)該再調(diào)用另一個(gè)。
基本實(shí)現(xiàn)
RxJava 的基本實(shí)現(xiàn)主要有三點(diǎn):
- 創(chuàng)建 Observer(觀察者)
Observer<String> observer = new Observer<String>()
{
@Override
public void onNext(String s) {
Log.i(TAG, "onNext方法執(zhí)行了: " + s);
}
@Override
public void onCompleted() {
Log.i(TAG, "Completed方法執(zhí)行了");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError方法執(zhí)行了!");
}
};
除了 Observer 接口之外,RxJava 還內(nèi)置了一個(gè)實(shí)現(xiàn)了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進(jìn)行了一些擴(kuò)展,但他們的基本使用方式是完全一樣的
Subscriber<String> subscriber = new Subscriber<String>()
{
@Override
public void onCompleted()
{
Log.i(TAG, "Completed方法執(zhí)行了");
}
@Override
public void onError(Throwable e)
{
Log.i(TAG, "onError方法執(zhí)行了!");
}
@Override
public void onNext(String s)
{
Log.i(TAG, "onNext方法執(zhí)行了: " + s);
}
};
兩者的使用方式是一樣的,實(shí)質(zhì)上,在 RxJava 的 subscribe 過程中,Observer 也總是會(huì)先被轉(zhuǎn)換成一個(gè) Subscriber 再使用,它們的區(qū)別主要有兩點(diǎn):
第一:onStart(),這是 Subscriber 增加的方法。它會(huì)在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作,例如數(shù)據(jù)的清零或重置。它總是在 subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程。要在指定的線程來做準(zhǔn)備工作,可以使用 doOnSubscribe() 方法。
第二:unsubscribe(),這是 Subscriber 所實(shí)現(xiàn)的另一個(gè)接口 Subscription 的方法,用于取消訂閱。在這個(gè)方法被調(diào)用后,Subscriber 將不再接收事件。一般在這個(gè)方法調(diào)用前,可以使用 isUnsubscribed() 先判斷一下狀態(tài),可以在onPause() onStop() 等方法中調(diào)用 unsubscribe() 來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生。
- 創(chuàng)建Observable(被觀察者)
Observable observable = Observable.create(new Observable.OnSubscribe<String>()
{
@Override
public void call(Subscriber<? super String> subscriber)
{
subscriber.onNext("Hello");
subscriber.onNext("Rxjava");
subscriber.onNext("RxAndroid");
subscriber.onCompleted();
}
});
可以看到,這里傳入了一個(gè) OnSubscribe 對象作為參數(shù)。OnSubscribe 會(huì)被存儲(chǔ)在返回的 Observable 對象中,它的作用相當(dāng)于一個(gè)計(jì)劃表,當(dāng) Observable 被訂閱的時(shí)候,OnSubscribe 的 call() 方法會(huì)自動(dòng)被調(diào)用,事件序列就會(huì)依照設(shè)定依次觸發(fā)。
create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法。 RxJava 還提供了一些快捷方法用來創(chuàng)建事件隊(duì)列
just(T...): 將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "Rxjava", "RxAndroid");
from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對象后,依次發(fā)送出來。
String[] strings = {"Hello", "Rxjava", "RxAndroid"};
Observable observable = Observable.from(strings);
- Subscribe (訂閱)
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
基本實(shí)現(xiàn)的完整代碼:
//觀察者Observer
Observer<String> observer = new Observer<String>()
{
@Override
public void onNext(String s) {
Log.i(TAG, "onNext方法執(zhí)行了: " + s);
}
@Override
public void onCompleted() {
Log.i(TAG, "Completed方法執(zhí)行了");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError方法執(zhí)行了!");
}
};
//被觀察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>()
{
@Override
public void call(Subscriber<? super String> subscriber)
{
subscriber.onNext("Hello");
subscriber.onNext("Rxjava");
subscriber.onNext("RxAndroid");
subscriber.onCompleted();
}
});
//被觀察者訂閱觀察者
observable.subscribe(observer);
或者這樣寫
Subscriber<String> subscriber = new Subscriber<String>()
{
@Override
public void onCompleted()
{
Log.i(TAG, "Completed方法執(zhí)行了");
}
@Override
public void onError(Throwable e)
{
Log.i(TAG, "onError方法執(zhí)行了!");
}
@Override
public void onNext(String s)
{
Log.i(TAG, "onNext方法執(zhí)行了: " + s);
}
};
//被觀察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>()
{
@Override
public void call(Subscriber<? super String> subscriber)
{
subscriber.onNext("Hello");
subscriber.onNext("Rxjava");
subscriber.onNext("RxAndroid");
subscriber.onCompleted();
}
});
//被觀察者訂閱觀察者
observable.subscribe(subscriber);
在或者這樣寫
//觀察者Observer
Observer<String> observer = new Observer<String>()
{
@Override
public void onNext(String s) {
Log.i(TAG, "onNext方法執(zhí)行了: " + s);
}
@Override
public void onCompleted() {
Log.i(TAG, "Completed方法執(zhí)行了");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError方法執(zhí)行了!");
}
};
Observable observable = Observable.just("Hello","Rxjava","RxAndroid");
//String[] strings = {"Hello","Rxjava","RxAndroid"};
//Observable observable = Observable.from(strings);
//被觀察者訂閱觀察者
observable.subscribe(observer);
執(zhí)行結(jié)果:
01-16 13:35:02.734 12509-12509/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了: Hello
01-16 13:35:02.734 12509-12509/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了: Rxjava
01-16 13:35:02.734 12509-12509/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了: RxAndroid
01-16 13:35:02.734 12509-12509/com.zhoujian.rxjava I/MainActivity: Completed方法執(zhí)行了
自定義回調(diào)
- 被觀察者(Observable)
Observable observable = Observable.just("Hello","Rxjava","RxAndroid");
- 自定義回調(diào)的觀察者
//自定義回調(diào)
Action1<String> nextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "自定義回調(diào)----onNext方法執(zhí)行了: " + s);
}
};
Action1<Throwable> errorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.i(TAG, "自定義回調(diào)----onError方法執(zhí)行了!");
}
};
Action0 completeAction = new Action0() {
@Override
public void call() {
Log.i(TAG, "自定義回調(diào)----Completed方法執(zhí)行了");
}
};
- 訂閱(subscribe)
//observable.subscribe(nextAction);
//observable.subscribe(nextAction,errorAction);
observable.subscribe(nextAction,errorAction,completeAction);
注意: Action1 和Action0 是 RxJava 的接口,Action1表示有一個(gè)返回值,Action0表示沒有返回值。
執(zhí)行結(jié)果:
01-16 13:50:26.964 12509-12509/com.zhoujian.rxjava I/MainActivity: 自定義回調(diào)----onNext方法執(zhí)行了: Hello
01-16 13:50:26.964 12509-12509/com.zhoujian.rxjava I/MainActivity: 自定義回調(diào)----onNext方法執(zhí)行了: Rxjava
01-16 13:50:26.964 12509-12509/com.zhoujian.rxjava I/MainActivity: 自定義回調(diào)----onNext方法執(zhí)行了: RxAndroid
01-16 13:50:26.964 12509-12509/com.zhoujian.rxjava I/MainActivity: 自定義回調(diào)----Completed方法執(zhí)行了
舉個(gè)例子a:打印數(shù)組中的元素
String[] names = {"周杰倫","周星馳","周潤發(fā)"};
//利用自定義回調(diào)
Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "姓名:=="+s);
}
});
執(zhí)行結(jié)果:
01-16 13:55:42.937 12509-12509/com.zhoujian.rxjava I/MainActivity: 姓名:==周杰倫
01-16 13:55:42.937 12509-12509/com.zhoujian.rxjava I/MainActivity: 姓名:==周星馳
01-16 13:55:42.937 12509-12509/com.zhoujian.rxjava I/MainActivity: 姓名:==周潤發(fā)
舉個(gè)例子b:加載顯示圖片
final ImageView img = (ImageView) findViewById(R.id.img);
//被觀察者
Observable observable = Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getResources().getDrawable(R.mipmap.my);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
});
//觀察者
Observer<Drawable> observer= new Observer<Drawable>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted方法執(zhí)行了");
}
@Override
public void onError(Throwable e) {
Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(Drawable drawable) {
Log.d(TAG, "onNext方法執(zhí)行了");
img.setImageDrawable(drawable);
Log.d(TAG, "圖片加載完成了!");
}
};
//被觀察者訂閱觀察者
observable.subscribe(observer);
執(zhí)行結(jié)果:
01-16 14:09:06.752 10355-10355/com.zhoujian.rxjava D/MainActivity: onNext方法執(zhí)行了
01-16 14:09:06.752 10355-10355/com.zhoujian.rxjava D/MainActivity: 圖片加載完成了!
01-16 14:09:06.753 10355-10355/com.zhoujian.rxjava D/MainActivity: onCompleted方法執(zhí)行了

Scheduler(線程控制)
在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個(gè)線程調(diào)用 subscribe(),就在哪個(gè)線程生產(chǎn)事件;在哪個(gè)線程生產(chǎn)事件,就在哪個(gè)線程消費(fèi)事件。如果需要切換線程,就需要用到 Scheduler。
- Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程。這是默認(rèn)的 Scheduler。
- Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。
- Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。
- Schedulers.computation(): 計(jì)算所使用的 Scheduler。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算,即不會(huì)被 I/O 等操作限制性能的操作,例如圖形的計(jì)算
- Android 有一個(gè)專用AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運(yùn)行。
使用 subscribeOn() 和 observeOn() 兩個(gè)方法來對線程進(jìn)行控制。
subscribeOn(): 指定事件產(chǎn)生所在線程。
observeOn(): 指定事件消費(fèi)所在的線程。
還是以加載顯示圖片的例子來說明
final ImageView img = (ImageView) findViewById(R.id.img);
Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getResources().getDrawable(R.mipmap.my);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())//上面的call方法發(fā)生在io線程
. observeOn(AndroidSchedulers.mainThread())//下面顯示圖片發(fā)生在主線程
. subscribe(new Observer<Drawable>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted方法執(zhí)行了");
}
@Override
public void onError(Throwable e) {
Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(Drawable drawable) {
Log.d(TAG, "onNext方法執(zhí)行了");
img.setImageDrawable(drawable);
Log.d(TAG, "圖片加載完成了!");
}
});
說明:被觀察者的代碼運(yùn)行在io線程中,觀察者的代碼運(yùn)行在主線程中,符合Android的在子線程中獲取數(shù)據(jù),在主線程中顯示數(shù)據(jù),更新界面。
變換
變換,就是將事件序列中的對象或整個(gè)序列進(jìn)行加工處理,轉(zhuǎn)換成不同的事件或事件序列。
- 變換之map():一對一的轉(zhuǎn)換
例子a:把String類型轉(zhuǎn)換成Integer
//被觀察者
Observable.just("100")
//變換----map
.map(new Func1<String, Integer>()
{
@Override
public Integer call(String s)
{
return Integer.valueOf(s);
}
})
//自定義觀察者
.subscribe(new Action1<Integer>()
{
@Override
public void call(Integer integer)
{
Log.d(TAG, "String轉(zhuǎn)換成Integer完成了,integer的值為:"+integer.intValue());
}
});
執(zhí)行結(jié)果:
01-16 14:56:59.117 10355-10355/com.zhoujian.rxjava D/MainActivity: String轉(zhuǎn)換成Integer完成了,integer的值為:100
Func1 和 Action1 非常相似,也是 RxJava 的一個(gè)接口,用于包裝含有一個(gè)參數(shù)的方法。 Func1 和 Action 的區(qū)別在于, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個(gè),用于不同參數(shù)個(gè)數(shù)的方法。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。
可以看到,map() 方法將參數(shù)中的 String 對象轉(zhuǎn)換成一個(gè) Integer 對象后返回,而在經(jīng)過 map() 方法后,事件的參數(shù)類型也由 String 轉(zhuǎn)為了 Integer。這種直接變換對象并返回的,是最常見的也最容易理解的變換。
例子b:把JavaBean對象轉(zhuǎn)換成String類型輸出
首先封裝一個(gè)演員Actor的JavaBean
package com.zhoujian.rxjava.bean;
/**
* Created by zhoujian on 2016/12/29.
*/
public class Actor
{
private String name;
private String sex;
public Actor(String name, String sex)
{
this.name = name;
this.sex = sex;
}
public String getName()
{
return name;
}
public void setName(String name)
{
this.name = name;
}
public String getSex()
{
return sex;
}
public void setSex(String sex)
{
this.sex = sex;
}
@Override
public String toString()
{
return "Actor{" + "name='" + name + '\'' + ", sex='" + sex + '\'' + '}';
}
}
再封裝一個(gè)電影Movie的JavaBean,一個(gè)電影中有多個(gè)演員
package com.zhoujian.rxjava.bean;
import java.util.List;
/**
* Created by zhoujian on 2016/12/29.
*/
public class Movie
{
private String name;
private int id;
private String data;
private List<Actor> mactorList;
public Movie(String data, int id, List<Actor> mactorList, String name)
{
this.data = data;
this.id = id;
this.mactorList = mactorList;
this.name = name;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public List<Actor> getMactorList() {
return mactorList;
}
public void setMactorList(List<Actor> mactorList) {
this.mactorList = mactorList;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Movie{" + "data='" + data + '\'' + ", name='" + name + '\'' +", id=" + id + ", mactorList=" + mactorList + '}';
}
}
準(zhǔn)備數(shù)據(jù)
Actor actor1 = new Actor("周星馳", "男");
Actor actor2 = new Actor("張柏芝", "女");
ArrayList<Actor> movie1List = new ArrayList<Actor>();
movie1List.add(actor1);
movie1List.add(actor2);
Movie movie1 = new Movie("1998-10-14", 1, movie1List, "喜劇之王");
Actor actor3 = new Actor("羅志祥", "男");
Actor actor4 = new Actor("張雨綺", "女");
ArrayList<Actor> movie2List = new ArrayList<Actor>();
movie2List.add(actor3);
movie2List.add(actor4);
Movie movie2 = new Movie("2016-05-01", 2, movie2List, "美人魚");
Movie[] movies = {movie1, movie2};
把JavaBean對象轉(zhuǎn)換成String類型輸出
//定義被觀察者
Observable observable= Observable.from(movies).map(new Func1<Movie, String>() {
@Override
public String call(Movie movie) {
return movie.toString();
}
});
//定義觀察者
Observer observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "Completed方法執(zhí)行了");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError方法執(zhí)行了");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext方法執(zhí)行了");
Log.i(TAG,"s==="+s);
}
};
//被觀察者訂閱觀察者
observable.subscribe(observer);
執(zhí)行結(jié)果:
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: s===Movie{data='1998-10-14', name='喜劇之王', id=1, mactorList=[Actor{name='周星馳', sex='男'}, Actor{name='張柏芝', sex='女'}]}
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: s===Movie{data='2016-05-01', name='美人魚', id=2, mactorList=[Actor{name='羅志祥', sex='男'}, Actor{name='張雨綺', sex='女'}]}
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: Completed方法執(zhí)行了
- 變換之flatMap():一對多的轉(zhuǎn)換
還是上述電影的例子,如果把一個(gè)電影轉(zhuǎn)換成多個(gè)演員輸出,這就要用到flatMap(),一對多的轉(zhuǎn)換。
Observer<Actor> observer = new Observer<Actor>() {
@Override
public void onCompleted() {
Log.i(TAG, "Completed方法執(zhí)行了");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError方法執(zhí)行了");
}
@Override
public void onNext(Actor actor) {
Log.i(TAG, "onNext方法執(zhí)行了");
Log.i(TAG, "actor===" + actor.toString());
}
};
Observable.from(movies).flatMap(new Func1<Movie, Observable<Actor>>() {
@Override
public Observable<Actor> call(Movie movie) {
return Observable.from(movie.getMactorList());
}
}).subscribe(observer);
執(zhí)行結(jié)果:
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: actor===Actor{name='周星馳', sex='男'}
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: actor===Actor{name='張柏芝', sex='女'}
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: actor===Actor{name='羅志祥', sex='男'}
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: actor===Actor{name='張雨綺', sex='女'}
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: Completed方法執(zhí)行了
從上面的代碼可以看出, flatMap() 和 map() 有一個(gè)相同點(diǎn):它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個(gè)對象。但需要注意,和 map() 不同的是, flatMap() 中返回的是個(gè) Observable 對象,并且這個(gè) Observable 對象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中。
flatMap() 的原理:
第一,使用傳入的事件對象創(chuàng)建一個(gè) Observable 對象。
第二, 并不發(fā)送這個(gè) Observable, 而是將它激活,于是它開始發(fā)送事件。
第三,每一個(gè)創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個(gè) Observable ,而這個(gè) Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法。
變換的原理:lift()
這些變換雖然功能各有不同,但實(shí)質(zhì)上都是針對事件序列的處理和再發(fā)送。而在 RxJava 的內(nèi)部,它們是基于同一個(gè)基礎(chǔ)的變換方法: lift(Operator)
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
Observable 執(zhí)行了 lift(Operator) 方法之后,會(huì)返回一個(gè)新的 Observable,這個(gè)新的 Observable 會(huì)像一個(gè)代理一樣,負(fù)責(zé)接收原始的 Observable 發(fā)出的事件,并在處理后發(fā)送給 Subscriber。像一種代理機(jī)制,通過事件攔截和處理實(shí)現(xiàn)事件序列的變換。
compose對 Observable 整體的變換
除了 lift() 之外, Observable 還有一個(gè)變換方法叫做 compose(Transformer)。它和 lift() 的區(qū)別在于, lift() 是針對事件項(xiàng)和事件序列的,而 compose() 是針對 Observable 自身進(jìn)行變換。
public class LiftTransformer implements Observable.Transformer<Integer, String> {
@Override
public Observable<String> call(Observable<Integer> observable) {
return observable
.lift1()
.lift2()
.lift3()
}
}
...
Transformer liftAll = new LiftTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
線程的自由控制
還是原來的例子,String轉(zhuǎn)Integer
Observable.just("100")//主線程中執(zhí)行,由subscribeOn指定
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.map(new Func1<String, Integer>()//IO 線程,由 observeOn() 指定
{
@Override
public Integer call(String s)
{
return Integer.valueOf(s);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>()// Android 主線程,由 observeOn() 指定
{
@Override
public void call(Integer integer)
{
Log.d(TAG, "String轉(zhuǎn)換成Integer完成了,integer的值為:"+integer.intValue());
}
});
有關(guān)更加詳細(xì)的介紹,請參考: