Rxjava到底是什么
一個(gè)詞:異步
一個(gè)可以在java VM上使用可觀測(cè)的序列來(lái)組成異步的、基本事件的程序庫(kù)
一個(gè)實(shí)現(xiàn)異步操作的庫(kù)
RXJava優(yōu)缺點(diǎn)
簡(jiǎn)潔
隨著程序的邏輯變得越來(lái)越復(fù)雜,它依然能夠保持簡(jiǎn)潔。
API介紹和原理解析
1.概念:擴(kuò)展的觀察者模式
RXjava的異步實(shí)現(xiàn),是通過(guò)一種擴(kuò)展的觀察者模式
觀察者模式
觀察者模式的面向需求是:對(duì)象A(觀察者)對(duì)對(duì)象B(被觀察者)的某種變化,高度敏感,需要在B變化的一瞬間做出反應(yīng)。
觀察者模式采用注冊(cè)(register)或者成為訂閱(subscrible)的方式,告訴被觀察者,我需要你的某某狀態(tài),你要在它變化的時(shí)候告訴我。
Android開(kāi)發(fā)中典型的例子就是view的點(diǎn)擊監(jiān)聽(tīng)器OnClickLinstener()。對(duì)設(shè)置onClickListener來(lái)說(shuō),view是被觀察者,OnClickListener是觀察者,二者通過(guò)setOnClickListener完成訂閱關(guān)系。訂閱完成之后,用戶點(diǎn)擊view的瞬間,Android Framework就會(huì)將點(diǎn)擊事件交給已經(jīng)注冊(cè)的onClickListener采取這樣被動(dòng)的觀察方式,既省去了反復(fù)檢索狀態(tài)的資源消耗,也能夠得到最高的反饋速度。
OnClickListener 的模式大致如下圖:
[圖片上傳失敗...(image-2917dc-1512961567324)]
如圖所示,通過(guò)setOnClickListener方法Button持有OnClickListener的引用,當(dāng)用戶點(diǎn)擊Button,自動(dòng)調(diào)用OnclickListener里的onClick方法。
把圖片抽象出來(lái) Button->被觀察者 OnClickListener->觀察者 setOnClickListener->訂閱 onClick->事件。就由專用的觀察者模式(例如只用于監(jiān)聽(tīng)控件點(diǎn)擊)轉(zhuǎn)變成了通用的觀察者模式。
[圖片上傳失敗...(image-ed19bb-1512961567324)]
而 RxJava 作為一個(gè)工具庫(kù),使用的就是通用形式的觀察者模式。
RXJava的觀察者模式
RXjava有4個(gè)感念:
- Observable 被觀察者
- Observer 觀察者
- subscribe 訂閱
- 事件
Observable和Observer通過(guò)Subscribe方法實(shí)現(xiàn)訂閱,從而Observable可以在需要的時(shí)候發(fā)出事件通知Observer。
與傳統(tǒng)的觀察者模式不同,除了普通事件onNext() (相當(dāng)于onClick/OnEvent),還定義兩個(gè)特殊的事件onCompleted(),onError | completed 完成 完整的|
- onComplete() 事件隊(duì)列完結(jié)。RXJava不僅把每個(gè)事件單獨(dú)處理,還會(huì)把他們看成一個(gè)隊(duì)列RXJava規(guī)定,如果沒(méi)有新的onNext()方法發(fā)出時(shí),必須出發(fā)onCompleted方法作為完成標(biāo)志。
- onError() 事件隊(duì)列異常,在事件處理過(guò)程中出現(xiàn)異常,會(huì)觸發(fā)onError(),并且整個(gè)事件終止,不允許在有事件發(fā)出。
- 在一個(gè)正確運(yùn)行事件序列中,onCompleted,onError有且只有一個(gè)會(huì)被調(diào)用,而且是事件中最后一個(gè)方法,兩個(gè)方法是互斥的。即在隊(duì)列中調(diào)用了其中一個(gè),就不應(yīng)該再調(diào)用另一個(gè)。
RxJava 的觀察者模式大致如下圖:
[圖片上傳失敗...(image-f381dc-1512961567324)]
基本實(shí)現(xiàn)
基于以上的概念, RxJava 的基本實(shí)現(xiàn)主要有三點(diǎn):
創(chuàng)建Observer 觀察者
決定著事件觸發(fā)將有怎樣的行為
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
除了 Observer 接口之外,RxJava 還內(nèi)置了一個(gè)實(shí)現(xiàn)了 Observer 的抽象類:Subscriber。 Subscriber 對(duì) Observer 接口進(jìn)行了一些擴(kuò)展,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
不僅基本使用方式一樣,實(shí)質(zhì)上,在RxJava的subscribe過(guò)程中,Observer也總是會(huì)先被轉(zhuǎn)換成Subscriber在使用。所以使用基本功能,選擇Observer或者subscriber都是一樣的。他們的區(qū)別有兩點(diǎn)。
- onStart() 這是subscriber新增方法,他會(huì)在subscribe剛開(kāi)始,但是事件還沒(méi)有發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作,例如數(shù)據(jù)清零或者重置,這是一個(gè)可選的方法。默認(rèn)實(shí)現(xiàn)為空。但是如果對(duì)工作線程有要求的話(例如彈出一個(gè)對(duì)話框,需要在Ui線程執(zhí)行),就不能使用onStart(),因?yàn)樗偸钦{(diào)用在subscribe所發(fā)生的線程調(diào)用,而不能指定線程。如果指定線程來(lái)做準(zhǔn)備工作,可以使用doOnSubscribe()方法。
- unSubscribe 這是Subscriber所實(shí)現(xiàn)的另一個(gè)接口Subscription()方法,用于取消訂閱,在這個(gè)方法調(diào)用后Subscriber將不接受任何事件。一般在調(diào)用之前先使用isUnSubscribed先判斷一下?tīng)顟B(tài),unSubscribe()這個(gè)方法很重要,因?yàn)樵趕ubscribe之后,Observable會(huì)持有Subscriber的引用,這個(gè)引用如果不能及時(shí)被釋放,將有內(nèi)存泄露的風(fēng)險(xiǎn)。所以要保持良好的原則,要在不再使用的時(shí)候盡快在合適的地方(例如 onPause() onStop() 等方法中)調(diào)用 unsubscribe() 來(lái)解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生。
創(chuàng)建Observeable
Observable是被觀察者,決定在什么時(shí)候被觸發(fā),和觸發(fā)什么事件。
RXJava使用create()方法來(lái)創(chuàng)建一個(gè)Observable,并為他設(shè)置事件觸發(fā)規(guī)則。
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
這里傳入OnSubScribe對(duì)象作為參數(shù),OnSubscribe會(huì)被存入染回的Observable對(duì)象中,相當(dāng)于一個(gè)計(jì)劃表,當(dāng)OnSubscribeObservable被訂閱時(shí),OnSubscriable的call方法會(huì)被自動(dòng)調(diào)用,事件序列會(huì)按照設(shè)定依次調(diào)用onNext方法和OnCompleted方法,這樣,由被觀察者調(diào)用了觀察者的回調(diào)方法,就實(shí)現(xiàn)了被觀察者向觀察者的事件傳遞,即觀察者模式。
create方法是RXJava中最基本的創(chuàng)造事件序列的方法。RXJava還提供了一些方法用來(lái)快捷創(chuàng)建事件隊(duì)列,例如:
- just<T...> 將傳入的參數(shù)依次發(fā)送出來(lái)。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會(huì)依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
- from(T...) / from(Iterable<? extends T> 將傳入的數(shù)組 或者 Iterable 拆分成具體對(duì)象后,依次發(fā)送出來(lái)。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會(huì)依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等價(jià)的。
Subcsribe訂閱
創(chuàng)建了Observable和Observe之后,用subscribe方法將他們鏈接起來(lái),整條鏈子就可以工作了。
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
Observable.subscribe(Subscriber) 的內(nèi)部實(shí)現(xiàn)是這樣的(僅核心代碼):
// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉(cāng)庫(kù)下載。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
subscribe做了三件事
- 調(diào)用的Subscriber的onStart方法,可選的準(zhǔn)備方法。
- 調(diào)用observable中的Call方法。在這里,事件發(fā)送的邏輯開(kāi)始運(yùn)行。在RXjava中Observable不是在創(chuàng)建時(shí)候就立即發(fā)送事件,而是在他訂閱的時(shí)候,即放subscribe執(zhí)行的時(shí)候。
- 將傳入的subscribe作為Subscription返回,為了方便unSubscribe。
整個(gè)關(guān)系如下
[圖片上傳失敗...(image-17e6e0-1512961567324
或者
[圖片上傳失敗...(image-7845b8-1512961567324)]
除了subscribe(Observer) 或者 subscribe(subscriable),subscribe還支持不完整定義的回調(diào),RXJava會(huì)自動(dòng)創(chuàng)建出Subscriber
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction 來(lái)定義 onNext()
observable.subscribe(onNextAction);
// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來(lái)定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來(lái)定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
Action0是RxJava中的一個(gè)接口,它只有一個(gè)方法Call,這個(gè)方法是一個(gè)無(wú)參無(wú)返回值的方法,由于onCompleted也是無(wú)參無(wú)返回值的,因此action可以當(dāng)成一個(gè)包裝對(duì)象,將onCompleted內(nèi)容打包起來(lái)將自己作為一個(gè)參數(shù)傳入subscribe中,以實(shí)現(xiàn)不完整定義的回調(diào)。也可以看做將onCompleted方法傳遞進(jìn)了subscribe,相當(dāng)于某些語(yǔ)言中的閉包。
Action1也是一個(gè)接口,他同樣也只有一個(gè)方法Call(T param),這個(gè)方法無(wú)返回值,但是有一個(gè)參數(shù),與Action0同理,由于onNext onError也是只有一個(gè)單參數(shù),且沒(méi)有返回值,因此Action1可以將OnNext(obj)和onError(error)打包起來(lái)傳入subscribe中,以實(shí)現(xiàn)不完整定義的回調(diào),事實(shí)上,雖然 Action0 和 Action1 在 API 中使用最廣泛,但 RxJava 是提供了多個(gè) ActionX 形式的接口 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無(wú)返回值的方法。
場(chǎng)景事例
打印字符數(shù)組
將字符串?dāng)?shù)組 names 中的所有字符串依次打印出來(lái):
String[] names = {"馮星","曹操","趙云","馬超"};
rx.Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "call: " +s);
}
});
由 id 取得圖片并顯示
由指定的一個(gè) drawable 文件 id drawableRes 取得圖片,并顯示在 ImageView 中,并在出現(xiàn)異常的時(shí)候打印 Toast 報(bào)錯(cuò):
Observable<Drawable> observable = Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
subscriber.onNext(getResources().getDrawable(R.mipmap.water));
subscriber.onCompleted();
}
});
observable.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(MainActivity.this,e.getMessage(),Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(Drawable drawable) {
iv_demo.setImageDrawable(drawable);
}
});
正如上面兩個(gè)例子這樣,創(chuàng)建出 Observable 和 Subscriber ,再用 subscribe() 將它們串起來(lái),一次 RxJava 的基本使用就完成了。非常簡(jiǎn)單。
[圖片上傳失敗...(image-9074f5-1512961567324)]
在RXjava默認(rèn)規(guī)則里,事件的發(fā)出和消費(fèi)都在同一個(gè)線程里。也就是說(shuō)上面是一個(gè)同步的觀察者模式。
而觀察者模式本身的目的在于 后臺(tái)處理,前臺(tái)調(diào)用 的異步機(jī)制。因此異步對(duì)于 RxJava 是至關(guān)重要的。而要實(shí)現(xiàn)異步,則需要用到 RxJava 的另一個(gè)概念: Scheduler 。
線程控制 -- Scheduler |si gan diu le| 調(diào)度
在不指定線程的情況下,RXjava遵循的是線程不變得原則,
即,在那個(gè)線程調(diào)用Subscribe,就在哪個(gè)線程生產(chǎn)事件;在哪個(gè)線程生產(chǎn)的事件,就在那個(gè)線程消費(fèi)事件,
如果需要切換線程,就需要用到 Scheduler (調(diào)度器)。
Schedule的API
在RXjava中,schedule--調(diào)度器,相當(dāng)于線程控制器,RXJava通過(guò)它來(lái)指定每一段代碼應(yīng)該運(yùn)行哪一個(gè)線程。
- Schedulers.immediate() 運(yùn)行在當(dāng)前線程,相當(dāng)于不指定線程。這是默認(rèn)的schedule。 |ai mi dei rui te| 立即的 立刻的
- Schedulers.newThread() 總是啟用新線程。并在新線程執(zhí)行操作。
- Schedulers.io() io操作(讀寫(xiě)文件,讀寫(xiě)數(shù)據(jù)庫(kù),網(wǎng)絡(luò)信息交互等)所使用的schedule。行為模式和newThread差不多。區(qū)別在于Io的內(nèi)部實(shí)現(xiàn)是用一個(gè)無(wú)數(shù)量上線的線程池,可以重復(fù)利用閑置的線程。因此多數(shù)情況下,io要比newThread更有效。不要把計(jì)算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
- Schedulers.computation() 計(jì)算時(shí)使用的schedule。這個(gè)計(jì)算指的是CPU密集型計(jì)算,即不會(huì)被I/O等操作限制性能的操作,例如圖形計(jì)算。這個(gè)Scheduler使用固定的線程池,大小為CPU的核數(shù),不要把I/O放在computation中,否則I/O操作的等待時(shí)間會(huì)浪費(fèi)CPU。
- AndroidSchedules.mainThread() Android專用線程,他指定的操作將在主線程中運(yùn)行。
subscribeOn() 指定subscribe()所發(fā)生的線程,即Observable.OnSubscribe()被激活時(shí)所發(fā)生的線程?;蛘呓凶鍪录a(chǎn)生的線程。
ObserveOn()指定subscriber所發(fā)生的線程或者叫做事件消費(fèi)的線程。
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
由于subscribeOn(Schedulers.io())的指定,被創(chuàng)建的事件1,2,3,4,將會(huì)在在Io線程發(fā)出。
由于observeOn(AndroidSchedulers.mainThread())的指定,因此subscriber的數(shù)字打印將發(fā)生在主線程。
事實(shí)上,這種在 subscribe() 之前寫(xiě)上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見(jiàn),它適用于多數(shù)的 『后臺(tái)線程取數(shù)據(jù),主線程顯示』的程序策略。
Schedule的原理
下面呢
變換
RxJava 提供了對(duì)事件序列進(jìn)行變換的支持,這是它的核心功能之一
所謂變換,就是將事件序列中的對(duì)象或者整個(gè)序列進(jìn)行加工處理,轉(zhuǎn)換成不同的事件或者事件序列。
API
map()
Observable.just(R.mipmap.water)
.map(new Func1<Integer, Bitmap>() {
@Override
public Bitmap call(Integer s) {
return BitmapFactory.decodeResource(getResources(),s);
}
})
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
iv_demo.setImageBitmap(bitmap);
}
});
Func1和Action1非常相似。也是RXJava中的一個(gè)接口。用于包裝有一個(gè)參數(shù)的方法。Func1和Action1的區(qū)別在于,Func1是由返回值得。另外,和 ActionX 一樣, FuncX 也有多個(gè),用于不同參數(shù)個(gè)數(shù)的方法。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。
map()方法將參數(shù)中的string對(duì)象轉(zhuǎn)換成BitMap對(duì)象后返回,經(jīng)過(guò)map方法后,事件的參數(shù)也隨之變成的bitmap
- map():事件對(duì)象的直接變換,是最常見(jiàn)的轉(zhuǎn)換。
[圖片上傳失敗...(image-2a65d2-1512961567324)]
- flatMap() 這是一個(gè)很有用但非常難理解的變換,因此我決定花多些篇幅來(lái)介紹它。 首先假設(shè)這么一種需求:假設(shè)有一個(gè)數(shù)據(jù)結(jié)構(gòu)『學(xué)生』,現(xiàn)在需要打印出一組學(xué)生的名字。實(shí)現(xiàn)方式很簡(jiǎn)單:
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "call: "+s);
}
});
很簡(jiǎn)單。那么再假設(shè):如果要打印出每個(gè)學(xué)生所需要修的所有課程的名稱呢?(需求的區(qū)別在于,每個(gè)學(xué)生只有一個(gè)名字,但卻有多個(gè)課程。)首先可以這樣實(shí)現(xiàn):
Observable.from(students)
.subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
List<Course> courses = student.getCourses();
for(Course course : courses){
Log.d(TAG, "onNext: "+ course.getName() + " Student : " +student.getName());
}
}
});
依然很簡(jiǎn)單。那么如果我不想在 Subscriber 中使用 for 循環(huán),而是希望 Subscriber 中直接傳入單個(gè)的 Course 對(duì)象呢(這對(duì)于代碼復(fù)用很重要)?用 map() 顯然是不行的,因?yàn)?map() 是一對(duì)一的轉(zhuǎn)化,而我現(xiàn)在的要求是一對(duì)多的轉(zhuǎn)化。那怎么才能把一個(gè) Student 轉(zhuǎn)化成多個(gè) Course 呢? 這時(shí)候就需要flatMap了
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
}).subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
Log.d(TAG, "call: " + course.getName());
}
});
flatMap和map有個(gè)相同點(diǎn):也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個(gè)對(duì)象。但是需要注意的是flatMap返回的對(duì)象是Observable對(duì)象。并且這個(gè)Observable對(duì)象不是直接發(fā)送到了Subscriber的回調(diào)方法中。
flatMap的原理是這樣的
- 使用傳入的事件對(duì)象創(chuàng)建一個(gè)Observable對(duì)象。
- 并不發(fā)送這個(gè)Observable對(duì)象,而是將它激活,于是它開(kāi)始發(fā)送事件。
- 每一個(gè)創(chuàng)建出來(lái)的Observable發(fā)送的事件,都匯入同一個(gè)Observable對(duì)象,而這個(gè)observable對(duì)象負(fù)責(zé)將這些事件傳入Subscriber對(duì)象。
這三個(gè)步驟吧事件分成了兩級(jí),通過(guò)一組新創(chuàng)建的Observable將初始的對(duì)象鋪平,之后通過(guò)統(tǒng)一路徑分發(fā)下去,而這個(gè)『鋪平』就是 flatMap() 所謂的 flat
flatMap()示意圖
[圖片上傳失敗...(image-a79136-1512961567324)]

擴(kuò)展
由于可以在嵌套的 Observable 中添加異步代碼, flatMap() 也常用于嵌套的異步操作,例如嵌套的網(wǎng)絡(luò)請(qǐng)求。示例代碼(Retrofit + RxJava):
networkClient.token() // 返回 Observable<String>,在訂閱時(shí)請(qǐng)求 token,并在響應(yīng)后發(fā)送 token
.flatMap(new Func1<String, Observable<Messages>>() {
@Override
public Observable<Messages> call(String token) {
// 返回 Observable<Messages>,在訂閱時(shí)請(qǐng)求消息列表,并在響應(yīng)后發(fā)送請(qǐng)求到的消息列表
return networkClient.messages();
}
})
.subscribe(new Action1<Messages>() {
@Override
public void call(Messages messages) {
// 處理顯示消息列表
showMessages(messages);
}
});
傳統(tǒng)的嵌套請(qǐng)求需要使用嵌套的 Callback 來(lái)實(shí)現(xiàn)。而通過(guò) flatMap() ,可以把嵌套的請(qǐng)求寫(xiě)在一條鏈中,從而保持程序邏輯的清晰。
- throttleFirst(): 在每次事件觸發(fā)后的一定時(shí)間間隔內(nèi)丟棄新的事件。常用作去抖動(dòng)過(guò)濾,例如按鈕的點(diǎn)擊監(jiān)聽(tīng)器: RxView.clickEvents(button) // RxBinding 代碼,后面的文章有解釋 .throttleFirst(500, TimeUnit.MILLISECONDS) // 設(shè)置防抖間隔為 500ms .subscribe(subscriber); 媽媽再也不怕我的用戶手抖點(diǎn)開(kāi)兩個(gè)重復(fù)的界面啦。 |si rou te| 喉嚨 壓制 節(jié)流 減速 窒息
變換的原理 lift()
這些變化雖然功能不一樣,但實(shí)質(zhì)上都是針對(duì)事件的處理在發(fā)送。而在RXjava的內(nèi)部,他們都是基于同一個(gè)基礎(chǔ)的方法變化,lift(Operator)。
// 注意:這不是 lift() 的源碼,而是將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉(cāng)庫(kù)下載。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber); // 這個(gè)onSubscribe是原始的OnSubScribe對(duì)象!!
}
});
}
這段代碼,它生成了一個(gè)新的Observable并且返回,新創(chuàng)建的Observable中的參數(shù)OnSubscribe的回調(diào)方法call()的實(shí)現(xiàn),和Observable.Subscribe()基本一樣,但是是由區(qū)別的。
不一樣的地方在與OnSubscrvable中call(subscribe)所指代了對(duì)象不同
- 當(dāng)使用lift方法時(shí),
- 假設(shè)有一個(gè)Observable<T>調(diào)用了lift()并創(chuàng)建Observable后,一共有個(gè)Observable。
- 同樣的Observable的參數(shù)OnSubscribe,加上之前原始的Observable里面的原始OnSubscribe,也就有了兩個(gè) OnSubscribe;
- 然后調(diào)用Observable.create傳入Observable<R>,觸發(fā)onSubscribe的Call方法,也是就override的方法,
- 在該方法中 調(diào)用了OnSubscribe.call()方法,注意:這個(gè)OnSubscribe方法是原始的Observable<T>的onSubscribe<T>對(duì)象。他需要傳入一個(gè)Subscriber對(duì)象,這個(gè)對(duì)象是通過(guò)Subscriber newSubscriber = operator.call(subscriber);operator.call()方法生成的新的Subscribe。正是這個(gè)operator對(duì)象將兩個(gè)Subscriber對(duì)象聯(lián)系起來(lái)的。OnSubscribe<T>在執(zhí)行Subscriber<R>.onNext(R r),而這里從T變成R,正好用到了傳到Operator中的參數(shù)Func1<T, R>。
這樣就實(shí)現(xiàn)了 lift() 過(guò)程,有點(diǎn)像一種代理機(jī)制,通過(guò)事件攔截和處理實(shí)現(xiàn)事件序列的變換。
也可以這個(gè)說(shuō):在Observable執(zhí)行了lift(Operator)方法后,會(huì)返回一個(gè)新的Observable,這個(gè)新的Observable會(huì)象一個(gè)代理一樣,負(fù)責(zé)接受原始的Observable發(fā)出的事件,并在處理后發(fā)送給Subscriber
[圖片上傳失敗...(image-8fc12c-1512961567324)]
[圖片上傳失敗...(image-925ff1-1512961567324)]
多次調(diào)用
[圖片上傳失敗...(image-eb30f8-151296156732
舉個(gè)例子
Observable.just(1.34f, 8.3453f, -534.34f, 392.99f)
.map(new Func1<Float, Integer>() {
@Override
public Integer call(Float aFloat) {
return Math.round(aFloat);
}
})
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return Integer.toBinaryString(integer);
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
log("2 map onNext->" + s);
}
});
// outputs
// 2 map onNext->1
// 2 map onNext->1000
// 2 map onNext->11111111111111111111110111101010
// 2 map onNext->110001001
該例子是一個(gè)Float->Integer->String的轉(zhuǎn)換。我們按上面的流程來(lái)分析。
- 生成一個(gè)Observable<Float>
- 調(diào)用map生成Observable<Integer>
- 調(diào)用map生成Observable<String>
- subscribe()傳入一個(gè)Subscribe(String),至此流的前半部分全部完成。
- 執(zhí)行開(kāi)始,Subscribe<String>發(fā)送事件,先生成一個(gè)Subscrver<Integer>傳給Observable<Integer>(Observable<Integer>.onSubscribe.call())。
- Observable<Interger>開(kāi)始發(fā)送事件,同樣生成一個(gè)Subscriber<Float>傳給Observable<Float>(Observable<Float>.onSubscribe.call())。
- 真正的發(fā)送事件開(kāi)始,Observable<Float>調(diào)用Subscriber<Float>.onNext(Float)等方法,同時(shí)Subscriber<Integer>.onNext(Integer)被調(diào)用,同時(shí)Subscriber<String>.onNext(String)被調(diào)用,事件發(fā)送完成。
compose對(duì)Observable整體的變換 |com pou si| 構(gòu)成 組成
除了lift方法外,Observable還有一個(gè)變換方法叫 compose(Transformer)它和lift的區(qū)別在于lift是針對(duì)事件項(xiàng)和事件序列,而compose是針對(duì)observable自身進(jìn)行變換。
假設(shè)在程序中有多個(gè) Observable ,并且他們都需要應(yīng)用一組相同的 lift() 變換。
public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
@Override
public Observable<String> call(Observable<Integer> observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);
像上面這樣,使用 compose() 方法,Observable 可以利用傳入的 Transformer 對(duì)象的 call 方法直接對(duì)自身進(jìn)行處理,也就不必被包在方法的里面了。
Scheduler的API(二)
利用 subscribeOn() 結(jié)合 observeOn() 來(lái)實(shí)現(xiàn)線程控制,讓事件的產(chǎn)生和消費(fèi)發(fā)生在不同的線程。
能不能可以多次切換線程
答案是能。因?yàn)镺bserveOn指定的是
操作符分類
面我按類別把常用操作符分別介紹,其實(shí)很多內(nèi)容都是來(lái)自于ReactiveX的官方網(wǎng)站,英文比較好的朋友可以參考(http://reactivex.io/)。
按照官方的分類,操作符大致分為以下幾種:
Creating Observables(Observable的創(chuàng)建操作符),比如:Observable.create()、Observable.just()、Observable.from()等等;
Transforming Observables(Observable的轉(zhuǎn)換操作符),比如:observable.map()、observable.flatMap()、observable.buffer()等等;
Filtering Observables(Observable的過(guò)濾操作符),比如:observable.filter()、observable.sample()、observable.take()等等;
Combining Observables(Observable的組合操作符),比如:observable.join()、observable.merge()、observable.combineLatest()等等;
Error Handling Operators(Observable的錯(cuò)誤處理操作符),比如:observable.onErrorResumeNext()、observable.retry()等等;
Observable Utility Operators(Observable的功能性操作符),比如:observable.subscribeOn()、observable.observeOn()、observable.delay()等等;
Conditional and Boolean Operators(Observable的條件操作符),比如:observable.amb()、observable.contains()、observable.skipUntil()等等;
Mathematical and Aggregate Operators(Observable數(shù)學(xué)運(yùn)算及聚合操作符),比如:observable.count()、observable.reduce()、observable.concat()等等;
其他如observable.toList()、observable.connect()、observable.publish()等等;