RxJava學(xué)習(xí)筆記(基本用法)

首先這是基于rxjava1.0的學(xué)習(xí),最近又出了2.0版本,有了一些改動,首先用法就有了一定的變化,就比如開始我在Android Studio添加的依賴庫選了2.0,結(jié)果測試rxjava-essentials中的代碼時,發(fā)現(xiàn)方法根本找不到了。</br>
不過正所謂學(xué)習(xí)嘛,雖然說更新了2.0版本,但是內(nèi)部的核心肯定是換湯不換藥的,就從1.0版本開始學(xué)!

RxJava的與眾不同

從純Java的觀點看,Rxjava Observale類源自于經(jīng)典的Gang of Four的觀察者模式。
它添加了三個缺少的功能:</br>

  • 生產(chǎn)者在沒有更多數(shù)據(jù)可用時能夠發(fā)出信號通知:onCompleted()事件。
  • 生產(chǎn)者在發(fā)生錯誤時能夠發(fā)出信號通知:onError()事件。
  • RxJava Observables 能夠組合而不是嵌套,從而避免開發(fā)者陷入回調(diào)地獄。

在RxJava的世界里,我們有四種角色:

  • Observable
  • Observer
  • Subscriber
  • Subjects

Observables和Subjects是兩個“生產(chǎn)”實體,Observers和Subscribers是兩個“消
費”實體。

以上摘自《rxjava-essentials》

Observer是觀察者接口,而Subscriber是其抽象實現(xiàn)類。

Observable.create()

  • Observable.create()創(chuàng)建一個Observable();

  • Observable.subscribe(Observer ob)訂閱一個觀察者。

      public static void main(String... args) {
          Observable.create(new Observable.OnSubscribe<Integer>() {
    
              @Override
              public void call(Subscriber<? super Integer> subscriber) {
                  for (int i = 0; i < 5; i++) {
                      subscriber.onNext(i);
                  }
                  subscriber.onCompleted();
              }
          
          }).subscribe(new Observer<Integer>() {
    
              @Override
              public void onCompleted() {
                  System.out.println("Completed");
              }
    
              @Override
              public void onError(Throwable e) {
                  System.out.println("Error");
              }
    
              @Override
              public void onNext(Integer integer) {
                  System.out.println("i = " + integer);
              }
          });
    
      }
    

打印結(jié)果:</br>
i = 0</br>
i = 1</br>
i = 2</br>
i = 3</br>
i = 4</br>
Completed</br>

除了可以用Observable.create(),同時還有Observable.from(),Observable.just()。

Observable.from()

  • Observable.from()可以直接將我們的列表丟進參數(shù)中。

      public static void main(String... args) {
      List<Integer> items = new ArrayList<>();
      items.add(1);
      items.add(11);
      items.add(111);
      items.add(1111);
      
      Observable.from(items)
              .subscribe(new Observer<Integer>() {
      
                  @Override
                  public void onCompleted() {
                      System.out.println("Completed");
                  }
      
                  @Override
                  public void onError(Throwable e) {
                      System.out.println("Error");
                  }
      
                  @Override
                  public void onNext(Integer integer) {
                      System.out.println("i = " + integer);
                  }
              });
      }
    

打印結(jié)果:</br>
i = 0</br>
i = 1</br>
i = 2</br>
i = 3</br>
i = 4</br>
Completed</br>

Observable.just()

  • Observable.just()可以將我們現(xiàn)有的方法轉(zhuǎn)換成Observable。

      public static void main(String... args) {
          Observable.just(reInt())
                  .subscribe(new Observer<Integer>() {
              @Override
              public void onCompleted() {
                  System.out.println("Completed");
              }
    
              @Override
              public void onError(Throwable e) {
                  System.out.println("Error");
              }
    
              @Override
              public void onNext(Integer integer) {
                  System.out.println("i = " + integer);
              }
          });
      }
    
      public static int reInt(){
          return 12;
      }
    

打印結(jié)果:</br>
i = 12</br>
Completed</br>

Subject

  • 前面對Observer,Observable,Subscriber都有用到,單單還有一個Subject。
  • Subject = Observable + Observer,它可以是一個Observable同時也可以是一個Observer,它作為連接這兩個世界的一座橋梁。
  • 一個Subject可以訂閱一個Observable,就像一個觀察者,并且它可以發(fā)射新的數(shù)據(jù),或者傳遞它接受到的數(shù)據(jù),就像一個Observable。很明顯,作為一個Observable,觀察者們或者其它Subject都可以訂閱它。

Rxjava提供了四種不同的Subject:

  • PublishSubject
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

PublishSubject

public static void main(String... args) {
    PublishSubject<String> publishSubject = PublishSubject.create();
    publishSubject.subscribe(new Observer<Object>() {
        @Override
        public void onCompleted() {
            System.out.println("Completed");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("Error");
        }

        @Override
        public void onNext(Object o) {
            System.out.println(o.toString());
        }
    });
    publishSubject.onNext("HelloWorld");
}

打印結(jié)果:</br>
HelloWorld

  • 代碼開始創(chuàng)建了一個PublishSubject,用create()發(fā)射一個String值,之后訂閱了PublishSubject。一直到這里,也沒有數(shù)據(jù)要發(fā)送,直到最后publishSubject.onNext("HelloWorld")觸發(fā)了觀察者的onNext()方法,將傳遞過來的"HelloWorld"打印出來。

BehaviorSubject

  • BehaviorSubject會首先向他的訂閱者發(fā)送截至訂閱前最新的一個數(shù)據(jù)對象或者初始值,然后正常的發(fā)送訂閱后的數(shù)據(jù)流。

      public static void main(String... args) {
          BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);
          behaviorSubject.subscribe(new Observer<Integer>() {
              @Override
              public void onCompleted() {
    
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onNext(Integer integer) {
                  System.out.println("i = " + integer);
              }
          });
          behaviorSubject.onNext(999);
      }
    

打印結(jié)果:</br>
i = 1</br>
i = 999</br>
首先接受到創(chuàng)建Observer時的初始值1,然后出發(fā)觀察者的onNext()方法。

ReplaySubject

ReplaySubject會緩存它所訂閱的所有數(shù)據(jù),向任意一個訂閱它的觀察者重發(fā)。

public static void main(String... args) {
    ReplaySubject<Integer> replaySubject = ReplaySubject.create();
    replaySubject.onNext(333);
    replaySubject.onNext(444);
    replaySubject.onNext(555);
    replaySubject.onCompleted();

    replaySubject.subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("i = " + integer);
        }
    });
}

可以看到,replaySubject先發(fā)出了信息,然后才訂閱了Observer。</br>
打印結(jié)果:</br>
i = 333</br>
i = 444</br>
i = 555</br>

AsyncSubject

當(dāng)Observable完成時AsyncSubject只會發(fā)布最后一個數(shù)據(jù)給已經(jīng)訂閱的每一個觀察者。

public static void main(String... args) {
    AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
    asyncSubject.subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("i = " + integer);
        }
    });
    asyncSubject.onNext(333);
    asyncSubject.onNext(444);
    asyncSubject.onNext(555);
    asyncSubject.onCompleted();
}

在這里,asyncSubject發(fā)出了三個信息,但是根據(jù)asyncSubject的特性,只會發(fā)布最后一個數(shù)據(jù)給訂閱的觀察者,注意最后這一行代碼:asyncSubject.onCompleted(),如果不加這一行代碼,程序不能識別555是最后一個數(shù)據(jù),將不會打印任何結(jié)果。</br>
打印結(jié)果:</br>
i = 555</br>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容