RxJava入門基礎(chǔ)

RecctiveX與RxJava
?在講到RxJava之前我們首先要了解什么是ReactiveX,因?yàn)镽xJava是ReactiveX的一種java實(shí)現(xiàn)。ReactiveX是Reactive Extensions的縮寫,一般簡(jiǎn)寫為Rx。微軟給的定義是,Rx是一個(gè)函數(shù)庫(kù),讓開發(fā)者可以利用可觀察序列和LINQ風(fēng)格查詢操作符來編寫異步和基于事件的程序。開發(fā)者可以用Observables表示異步數(shù)據(jù)流,用LINQ操作符查詢異部數(shù)據(jù)流,用Schedulers參數(shù)化異步數(shù)據(jù)流的并發(fā)處理,Rx可以這樣定義:Rx=Observables+LINQ+Schedulers。
?說到異步操作,我們會(huì)想到Android的AsyncTask和Handler。但是隨著請(qǐng)求數(shù)量越來越多,代碼邏輯將會(huì)變得將越來越復(fù)雜而RxJava卻依舊能保持清晰的邏輯。RxJava的原理就是創(chuàng)建一個(gè)Observable對(duì)象來干活,然后使用各種操作符建立起來的鏈?zhǔn)讲僮?,就如同流水線一樣,把你想要處理的數(shù)據(jù)一步步的加工成你想要的成品,然后發(fā)射給Subscriber處理。
?RxJava的異步操作是通過擴(kuò)展觀察者模式來實(shí)現(xiàn)的,RxJava有4個(gè)角色Observable、Observer、Subscriber和Suject,這4個(gè)角色后面會(huì)具體講解。Observable和Observer通過subscribe方法實(shí)現(xiàn)訂閱關(guān)系,Observable就可以在需要的時(shí)候通知Observer,官方文檔。

RxJava基本實(shí)現(xiàn)

?在使用RxJava前請(qǐng)先在Android Studio中配置gradle:

dependencies{
  //最新版本
   //implementation "io.reactivex.rxjava3:rxjava:3.x.y"
   implementation 'io.reactivex.rxjava:1.2.0'
   implementation 'io.reactivex.rxandroid:1.2.1'
}

?其中RxAndroid是RxJava在Android平臺(tái)的擴(kuò)展。它包含了一些能夠簡(jiǎn)化Android開發(fā)的工具,比如特殊的調(diào)度器。
?RxJava的基本用法為3個(gè)步驟。

?1.創(chuàng)建Observer(觀察者)

?它決定實(shí)踐觸發(fā)的時(shí)候?qū)⒂性鯓拥男袨?。代碼如下:

Subscriber subscriber = new  Subscriber<String>(){
      @Overrride
      public void onCompleted(){
         Log.d(TAG,"onCompleted");
      }
      @Overrride
      public void onError(Throwable e){
         Log.d(TAG,"onError");                                                                                                                                                     
      }
      @Overrride
      public void onNext(String s){
         Log.d(TAG,"onNext"+s);
      }
      @Overrride
      public void onStart(){
         Log.d(TAG,"onNext"+s);
      }
  }

?其中onCompleted、onError和onNext是必須要實(shí)現(xiàn)的方法,其含義如下。

  • onCompleted:時(shí)間隊(duì)列完結(jié)。RxJava不僅把每個(gè)事件單獨(dú)處理,其還會(huì)把它們看作一個(gè)隊(duì)列。當(dāng)不會(huì)再有新的onNext發(fā)出時(shí),需要觸發(fā)onCompleted()方法作為完成標(biāo)志。
  • onError:事件隊(duì)列異常。在事件處理過程中出現(xiàn)異常時(shí),onError()會(huì)被觸發(fā),同時(shí)隊(duì)列自動(dòng)終止,不允許再有事件發(fā)出。
  • onNext:普通的事件。將要處理的事件添加到事件隊(duì)列中。
  • onStart:它會(huì)在事件還未發(fā)送之前被調(diào)用,它可以用于做一些準(zhǔn)備工作。例如數(shù)據(jù)的清零或重置。這是一個(gè)可選的方法,默認(rèn)情況下它的實(shí)現(xiàn)為空。
    當(dāng)然,如果實(shí)現(xiàn)簡(jiǎn)單的功能,也可以用到Observer來創(chuàng)建觀察者。Observer是一個(gè)接口,而上面用到的Subscriber是在Observer的基礎(chǔ)上進(jìn)行的擴(kuò)展。在后文的Subscriber訂閱過程中Observer也會(huì)先被轉(zhuǎn)換為Subscriber來使用。用Observer創(chuàng)建觀察者,如下所示:
Observer<String> observer = new Observer<String> (){
    
      @Overrride
      public void onCompleted(){
         Log.d(TAG,"onCompleted");
      }
       @Overrride
      public void onError(Throwable e){
         Log.d(TAG,"onError");                                                                                                                                                     
      }
      @Overrride
      public void onNext(String s){
         Log.d(TAG,"onNext"+s);
      }
  }
?2.創(chuàng)建Observable(被觀察者)

?它決定什么時(shí)候出發(fā)事以及怎樣的事件。RxJava使用create方法來創(chuàng)建一個(gè)Observable,并為它定義事件觸發(fā)規(guī)則,如下所示:

Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
    @Override
    public void call(Subsccribe<? super String> subscribe){
        subscribe.onNext("楊影楓")
        subscribe.onNext("月眉兒")
        subscribe.onCompleted();
    }  
})

?通過調(diào)用Subscriber的方法,不斷地事件添加到任務(wù)隊(duì)列中。也可用just方法來實(shí)現(xiàn):

Observable observable  = Observable.just("楊影楓","月眉兒");

?上述代碼會(huì)依次調(diào)用onNext("楊影楓")、onNext("月眉兒")、onCompleted()。還可以用from方法來實(shí)現(xiàn),如下所示:

String[] words = {"楊影楓","月眉兒"};
Observable observabvle  = Observable.form(words);

?上述代碼調(diào)用的方法順序和just方法是一樣的。

?3.Subscribe(訂閱)

?訂閱只需要一行代碼就可以了,如下所示:

observable.subscribe(subscriber);

?運(yùn)行代碼查看Log:
?D/RxJava:onStart
?D/RxJava:onNext楊影楓
?D/RxJava:onNext月眉兒
?D/RxJava:onCompleted
?和預(yù)想的一樣調(diào)用onStart方法,接著調(diào)用兩個(gè)onNext方法,調(diào)用onCompleted方法。

RxJava的不完整定義回調(diào)

?前面介紹了回調(diào)的接收主要依賴于subscribe(Observer)和subscribe(Subscriber)。此外。RxJava還提供了另一種回調(diào)方式,也就是不完整回調(diào)。在講到不完整回調(diào)之前我們首先要了解Action。查看RxJava源碼,發(fā)現(xiàn)一堆Action。

public interface Action0 extends Action{
    void call();
}

?再看Action1:

public interface Action1<T> extends Action{
      void call(T  t);
}

?再看Action9:

public interface Action9<T1,T2,T3,T4,T5,T6,T7,T8,T9> extends Action{
      void call(T1  t1,T2  t2,T3  t3,T4  t4,T5  t5,T6  t6,T7  t7,T8  t8,T9  t9);
}

?很明顯,Aciton后的數(shù)字代表回調(diào)的參數(shù)類型數(shù)量,之前創(chuàng)建Observer和訂閱代碼也就可以改寫為下面的代碼:

Action1<String> onNextAction = new Action1<String>(){
    @Override
    public void call(String s){
        Log.d(TAG,"onNext"+s);
      }
};

Action1<Throwable> onErrorAction = new Action1<Throwable>(){
    @Override
    public void call(Throwable  throwable){
        
      }
};
Action0 onCompletedAction = new Action0(){
    @Override
    public void call(){
        Log.d(TAG,"onCompleted");
      }
};
observable.subscride(onNextAction,onErrorAction,onCompletedAction);

?
我們定義了onNextAction來處理 onNext的回調(diào),同理,我們還定義了onErrorAction和onCompletedAction來分別處理onError和onCompleted的回調(diào),最后我們把它們傳給subscribe方法。很顯然這樣寫的靈活更大一些;同時(shí)我們也可以只傳一個(gè)或者兩個(gè)Action,如下所示:

observable.subscride(onNextAction);
observable.subscribe(onNextAction,onErrorAction);

?第一行之定義了onNextAction來處理onNext的回調(diào);而第二行則定義了onNextAction處onNext的回調(diào),onErrorAction來處理onError的回調(diào)

作者:張三兒 郵箱:921849651@qq.com
注:歡迎大佬指點(diǎn)江山,希望大家多多支持作者。持續(xù)技術(shù)分享!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容