RxAndroid 1.0,2.0版本以及主要原理

參考學(xué)習(xí)資料 1.0版本

http://gank.io/post/560e15be2dca930e00da1083 扔物線
https://github.com/lzyzsd/Awesome-RxJava 大頭鬼

一、基礎(chǔ)
 Observables(被觀察者,事件源)和Subscribers(觀察者)Observables發(fā)出一系列事件,Subscribers處理這些事件

一個(gè)Observable可以發(fā)出零個(gè)或者多個(gè)事件,知道結(jié)束或者出錯(cuò)。每發(fā)出一個(gè)事件,就會(huì)調(diào)用它的Subscriber的onNext方法,最后調(diào)用Subscriber.onNext()或者Subscriber.onError()結(jié)束。

建議先看大頭鬼的hello world 的例子 (建議碼一遍)
http://blog.csdn.net/lzyzsd/article/details/41833541

Action1<String> onNextAction=new Action1<String>() {   
 @Override    public void call(String s) {   
     System.out.println(s);   
 }};

應(yīng)用場(chǎng)景

1.Observable和Subscriber可以做任何事情Observable可以是一個(gè)數(shù)據(jù)庫(kù)查詢,Subscriber用來顯示查詢結(jié)果;Observable可以是屏幕上的點(diǎn)擊事件,Subscriber用來響應(yīng)點(diǎn)擊事件;Observable可以是一個(gè)網(wǎng)絡(luò)請(qǐng)求,Subscriber用來顯示請(qǐng)求結(jié)果。

 left原理:都是基于一個(gè)lift(operator)
 *  Observable 在執(zhí)行了left之后, 會(huì)返回一個(gè)新的Observable,這個(gè)Observable就像一個(gè)代理一樣,負(fù)責(zé)接受原始的Observable發(fā)出的事件。并在處理后發(fā)送給Subscriber。更像是一種代理機(jī)制,通過事件攔截和處理實(shí)現(xiàn)時(shí)間序列的變換。
  subscribeOn()和 observeOn區(qū)別:
 *  1.subscribeOn()線程切換發(fā)生在Onsubscribe中,即在它通知上一級(jí)OnSubscribe時(shí),這時(shí)事件還沒有開始發(fā)送,因此subscribeOn()的線程控制,
 * 可以從事件發(fā)出的開端就做成了影響。
 * 2.observeOn()的線程切換發(fā)送在它內(nèi)建Subscriber中,即發(fā)生在它即將給下一級(jí)Subscriber發(fā)送事件時(shí),因此observeOn()控制的是它后面的線程。
 *

Observable.doOnSubscribe() 和 Subscriber.onStart() 同樣是在 subscribe() 調(diào)用后而且在事件發(fā)送前執(zhí)行, 但是Observable.doOnSubscribe()可以指定線程,默認(rèn)情況下, doOnSubscribe() 執(zhí)行在 subscribe() 發(fā)生的線程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程。

2.0版本###

導(dǎo)包給我搞吐
包名開頭 最舊 rx-->io 最新

2.0 最核心的是Publisher 和 Subscriber。 Publisher 發(fā)出一系列的時(shí)間,Subscriber負(fù)責(zé)和處理事件。

具體講解

背壓#####

在rxjava中有多重控制流以及背壓(backpressure)策略用來應(yīng)對(duì)當(dāng)一個(gè)快速發(fā)送消息的被觀察者遇到一個(gè)處理消息緩慢的觀察者。

Flowable的三種Backpressure策略:

BackpressureStrategy.BUFFER####

onBackpressureBuffer是不丟棄數(shù)據(jù)的處理方式。把上游收到的全部緩存下來,等下游來請(qǐng)求再發(fā)給下游。相當(dāng)于一個(gè)水庫(kù)。但上游太快,水庫(kù)(buffer)就會(huì)溢出。

BackpressureStrategy.DROP####

BackpressureStrategy.LATEST####

Drop 和Latest 類似,都會(huì)丟棄數(shù)據(jù),下游通過request請(qǐng)求產(chǎn)生令牌給上游,上游接收到多少令牌,就發(fā)送多少,當(dāng)令牌為0的時(shí)候,上游開始丟棄數(shù)據(jù)。區(qū)別在于,drop直接丟棄數(shù)據(jù)不緩存數(shù)據(jù)。而latest緩存最新的一條數(shù)據(jù),當(dāng)上游收到令牌,就把緩存的上一條“最新”數(shù)據(jù)發(fā)送給下游。

何時(shí)用Observable####

當(dāng)上游在一段時(shí)間發(fā)送的數(shù)據(jù)量不大(以1000為界限)的時(shí)候優(yōu)先選擇使用Observable;
在處理GUI相關(guān)的事件,比如鼠標(biāo)移動(dòng)或觸摸事件,這種情況下很少會(huì)出現(xiàn)backpressured的問題,用Observable就足以滿足需求;
獲取數(shù)據(jù)操作是同步的,但你的平臺(tái)不支持Java流或者相關(guān)特性。使用Observable的開銷低于Flowable。

何時(shí)用Flowable####

當(dāng)上游在一段時(shí)間發(fā)送的數(shù)據(jù)量過大的時(shí)候(這個(gè)量我們往往無法預(yù)計(jì)),此時(shí)就要使用Flowable以限制它所產(chǎn)生的量的元素10K +處理。
當(dāng)你從本地磁盤某個(gè)文件或者數(shù)據(jù)庫(kù)讀取數(shù)據(jù)時(shí)(這個(gè)數(shù)據(jù)量往往也很大),應(yīng)當(dāng)使用Flowable,這樣下游可以根據(jù)需求自己控制一次讀取多少數(shù)據(jù);
以讀取數(shù)據(jù)為主且有阻塞線程的可能時(shí)用Flowable,下游可以根據(jù)某種條件自己主動(dòng)讀取數(shù)據(jù)。

在RxJava2.0中,有五種觀察者模式:####

Observable/Observer
Flowable/Subscriber

Single/SingleObserver
Completable/CompletableObserver
Maybe/MaybeObserver

Observable 寫法:####

Observable-->Observer

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

Flowable寫法###

Flowable-->subscriber

Flowable.range(0, 10)
        .subscribe(new Subscriber<Integer>() {
            Subscription subscription;

            //當(dāng)訂閱后,會(huì)首先調(diào)用這個(gè)方法,其實(shí)就相當(dāng)于onStart(),
            //傳入的Subscription s參數(shù)可以用于請(qǐng)求數(shù)據(jù)或者取消訂閱
            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onsubscribe start");
     ## 這里需要特別注意。s.request()去請(qǐng)求資源,參數(shù)就是要請(qǐng)求的數(shù)量,一般如果不限制,寫成Long.MAX_VALUE。如果不調(diào)用request, onNext()和onComplete方法將不會(huì)被調(diào)用。
                subscription = s;
                subscription.request(1);
                Log.d(TAG, "onsubscribe end");
            }

            @Override
            public void onNext(Integer o) {
## onNext 方法里面?zhèn)魅氲膮?shù)就是Flowable 中發(fā)射出來的。
                Log.d(TAG, "onNext--->" + o);
                subscription.request(3);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

如果只關(guān)注onNext(),可以

Consumer consumer=new Consumer<String>(){
  @Override
  public void accept(String s) throws Exception{
    System.out.println(s);
  }
}
Actions

為了減少組件數(shù)量,2.x中沒有定義Action3-Action9和ActionN。

保留的action接口按照J(rèn)ava 8 functional風(fēng)格命名。 無參數(shù)的Action0 被操作符io.reactivex.functions.Action和Scheduler代替。

Action1被重命名為Consumer。Action2 被重命名為BiConsumer。 ActionN 被Consumer<Object[]> 代替。##

Functions

我們按照J(rèn)ava 8的命名風(fēng)格定義了io.reactivex.functions.Function 和io.reactivex.functions.BiFunction, 把Func3 - Func9 分別改成了 Function3 - Function9 。FuncN被Function<Object[], R>代替。

此外,操作符不再使用Func1<T, Boolean>但原始返回類型為Predicate<T>。

io.reactivex.functions.Functions類提供了常見的轉(zhuǎn)換功能Function<Object[], R>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容