參考學(xué)習(xí)資料 1.0版本
http://gank.io/post/560e15be2dca930e00da1083 扔物線
https://github.com/lzyzsd/Awesome-RxJava 大頭鬼
一、基礎(chǔ)
Observables(被觀察者,事件源)和Subscribers(觀察者)Observables發(fā)出一系列事件,Subscribers處理這些事件
一個(gè)Observable可以發(fā)出零個(gè)或者多個(gè)事件,知道結(jié)束或者出錯(cuò)。每發(fā)出一個(gè)事件,就會(huì)調(diào)用它的Subscriber的onNext方法,最后調(diào)用Subscriber.onNext()或者Subscriber.onError()結(jié)束。
建議先看大頭鬼的hello world 的例子 (建議碼一遍)
http://blog.csdn.net/lzyzsd/article/details/41833541
Action1<String> onNextAction=new Action1<String>() {
@Override public void call(String s) {
System.out.println(s);
}};
應(yīng)用場(chǎng)景
1.Observable和Subscriber可以做任何事情Observable可以是一個(gè)數(shù)據(jù)庫(kù)查詢,Subscriber用來顯示查詢結(jié)果;Observable可以是屏幕上的點(diǎn)擊事件,Subscriber用來響應(yīng)點(diǎn)擊事件;Observable可以是一個(gè)網(wǎng)絡(luò)請(qǐng)求,Subscriber用來顯示請(qǐng)求結(jié)果。
left原理:都是基于一個(gè)lift(operator)
* Observable 在執(zhí)行了left之后, 會(huì)返回一個(gè)新的Observable,這個(gè)Observable就像一個(gè)代理一樣,負(fù)責(zé)接受原始的Observable發(fā)出的事件。并在處理后發(fā)送給Subscriber。更像是一種代理機(jī)制,通過事件攔截和處理實(shí)現(xiàn)時(shí)間序列的變換。
subscribeOn()和 observeOn區(qū)別:
* 1.subscribeOn()線程切換發(fā)生在Onsubscribe中,即在它通知上一級(jí)OnSubscribe時(shí),這時(shí)事件還沒有開始發(fā)送,因此subscribeOn()的線程控制,
* 可以從事件發(fā)出的開端就做成了影響。
* 2.observeOn()的線程切換發(fā)送在它內(nèi)建Subscriber中,即發(fā)生在它即將給下一級(jí)Subscriber發(fā)送事件時(shí),因此observeOn()控制的是它后面的線程。
*
Observable.doOnSubscribe() 和 Subscriber.onStart() 同樣是在 subscribe() 調(diào)用后而且在事件發(fā)送前執(zhí)行, 但是Observable.doOnSubscribe()可以指定線程,默認(rèn)情況下, doOnSubscribe() 執(zhí)行在 subscribe() 發(fā)生的線程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程。
2.0版本###
導(dǎo)包給我搞吐
包名開頭 最舊 rx-->io 最新
2.0 最核心的是Publisher 和 Subscriber。 Publisher 發(fā)出一系列的時(shí)間,Subscriber負(fù)責(zé)和處理事件。
背壓#####
在rxjava中有多重控制流以及背壓(backpressure)策略用來應(yīng)對(duì)當(dāng)一個(gè)快速發(fā)送消息的被觀察者遇到一個(gè)處理消息緩慢的觀察者。
Flowable的三種Backpressure策略:
BackpressureStrategy.BUFFER####
onBackpressureBuffer是不丟棄數(shù)據(jù)的處理方式。把上游收到的全部緩存下來,等下游來請(qǐng)求再發(fā)給下游。相當(dāng)于一個(gè)水庫(kù)。但上游太快,水庫(kù)(buffer)就會(huì)溢出。
BackpressureStrategy.DROP####
BackpressureStrategy.LATEST####
Drop 和Latest 類似,都會(huì)丟棄數(shù)據(jù),下游通過request請(qǐng)求產(chǎn)生令牌給上游,上游接收到多少令牌,就發(fā)送多少,當(dāng)令牌為0的時(shí)候,上游開始丟棄數(shù)據(jù)。區(qū)別在于,drop直接丟棄數(shù)據(jù)不緩存數(shù)據(jù)。而latest緩存最新的一條數(shù)據(jù),當(dāng)上游收到令牌,就把緩存的上一條“最新”數(shù)據(jù)發(fā)送給下游。
何時(shí)用Observable####
當(dāng)上游在一段時(shí)間發(fā)送的數(shù)據(jù)量不大(以1000為界限)的時(shí)候優(yōu)先選擇使用Observable;
在處理GUI相關(guān)的事件,比如鼠標(biāo)移動(dòng)或觸摸事件,這種情況下很少會(huì)出現(xiàn)backpressured的問題,用Observable就足以滿足需求;
獲取數(shù)據(jù)操作是同步的,但你的平臺(tái)不支持Java流或者相關(guān)特性。使用Observable的開銷低于Flowable。
何時(shí)用Flowable####
當(dāng)上游在一段時(shí)間發(fā)送的數(shù)據(jù)量過大的時(shí)候(這個(gè)量我們往往無法預(yù)計(jì)),此時(shí)就要使用Flowable以限制它所產(chǎn)生的量的元素10K +處理。
當(dāng)你從本地磁盤某個(gè)文件或者數(shù)據(jù)庫(kù)讀取數(shù)據(jù)時(shí)(這個(gè)數(shù)據(jù)量往往也很大),應(yīng)當(dāng)使用Flowable,這樣下游可以根據(jù)需求自己控制一次讀取多少數(shù)據(jù);
以讀取數(shù)據(jù)為主且有阻塞線程的可能時(shí)用Flowable,下游可以根據(jù)某種條件自己主動(dòng)讀取數(shù)據(jù)。
在RxJava2.0中,有五種觀察者模式:####
Observable/Observer
Flowable/Subscriber
Single/SingleObserver
Completable/CompletableObserver
Maybe/MaybeObserver
Observable 寫法:####
Observable-->Observer
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Flowable寫法###
Flowable-->subscriber
Flowable.range(0, 10)
.subscribe(new Subscriber<Integer>() {
Subscription subscription;
//當(dāng)訂閱后,會(huì)首先調(diào)用這個(gè)方法,其實(shí)就相當(dāng)于onStart(),
//傳入的Subscription s參數(shù)可以用于請(qǐng)求數(shù)據(jù)或者取消訂閱
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onsubscribe start");
## 這里需要特別注意。s.request()去請(qǐng)求資源,參數(shù)就是要請(qǐng)求的數(shù)量,一般如果不限制,寫成Long.MAX_VALUE。如果不調(diào)用request, onNext()和onComplete方法將不會(huì)被調(diào)用。
subscription = s;
subscription.request(1);
Log.d(TAG, "onsubscribe end");
}
@Override
public void onNext(Integer o) {
## onNext 方法里面?zhèn)魅氲膮?shù)就是Flowable 中發(fā)射出來的。
Log.d(TAG, "onNext--->" + o);
subscription.request(3);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
如果只關(guān)注onNext(),可以
Consumer consumer=new Consumer<String>(){
@Override
public void accept(String s) throws Exception{
System.out.println(s);
}
}
Actions
為了減少組件數(shù)量,2.x中沒有定義Action3-Action9和ActionN。
保留的action接口按照J(rèn)ava 8 functional風(fēng)格命名。 無參數(shù)的Action0 被操作符io.reactivex.functions.Action和Scheduler代替。
Action1被重命名為Consumer。Action2 被重命名為BiConsumer。 ActionN 被Consumer<Object[]> 代替。##
Functions
我們按照J(rèn)ava 8的命名風(fēng)格定義了io.reactivex.functions.Function 和io.reactivex.functions.BiFunction, 把Func3 - Func9 分別改成了 Function3 - Function9 。FuncN被Function<Object[], R>代替。
此外,操作符不再使用Func1<T, Boolean>但原始返回類型為Predicate<T>。
io.reactivex.functions.Functions類提供了常見的轉(zhuǎn)換功能Function<Object[], R>