快速開發(fā)之響應(yīng)式編程-RXJava 1

  • 如果問移動開發(fā)的潮流是什么的話,那么在這個16年初的移動技術(shù)圈里 RxJava(響應(yīng)式編程)+Retrofit(可能是目前最好的注解式網(wǎng)絡(luò)框架)+ORM(當(dāng)然如果你信任完全面向?qū)ο蟮臄?shù)據(jù)庫可以看一下Realm(不是ORM)據(jù)說是目前最好的面向?qū)ο髷?shù)據(jù)庫全系統(tǒng)解決方案了)

本文記錄一些本屌學(xué)習(xí)RxJava/RxAndroid響應(yīng)式編程的內(nèi)容

  • 什么是RxJava?為什么要使用Rxjava?

官方:RxJava 是一個在Java虛擬機上實現(xiàn)的響應(yīng)式擴展庫:提供了基于observable序列實現(xiàn)的異步調(diào)用及基于事件編程。 它擴展了觀察者模式,支持?jǐn)?shù)據(jù)、事件序列并允許你合并序列,無需關(guān)心底層的線程處理、同步、線程安全、并發(fā)數(shù)據(jù)結(jié)構(gòu)和非阻塞I/O處理。

簡單來說:就是一個使得異步處理起來草雞簡單的解決方案。

你一定會用到的RxJava常用操作符.

項目中有沒有Thread的漫天飛;一用AsyncTask又感覺很多簡單功能立馬復(fù)雜了;Handler數(shù)據(jù)傳遞不好跟蹤,處理的事物又非常的重?

那么RxJava簡直就是救星了?。?/p>

網(wǎng)上很多在介紹RxJava的文章為了體現(xiàn)筆者高端,都是用Lambda表達式來演示的,因為Lambda的可讀性其實是很不好的,我很不推薦初學(xué)者使用Lambda來使用RxJava,這樣使用會很不理解RxJava本身的,所以本文就不附帶Lambda了

BUT

沒有什么框架是全能的,那么Rxjava缺點是什么呢?
總結(jié):

Using RxJava for individual tasks is low risk and potentially high gain. It can simplify your code considerably. However, the more you use Rx in your project, the more likely you are to see a domino effect of a reactive expansion. Bridging the gap between non-Rx and Rx parts can be troublesome, and it can become tempting to simply write everything in Rx. This, on the other hand, is a decision not easily made. While indescribably deliberating, you will be wandering into a new land with unthinkable possibilities but with very little help.

我的理解就是:

  • 在輕量級的應(yīng)用開發(fā)中RxJava能夠提供隔壁高性能和效率的,但是在很大的項目中使用RxJava的話,可能會有一些引用或者內(nèi)存泄露的問題產(chǎn)生。
  • 并且RxJava自發(fā)布到現(xiàn)在已經(jīng)超過兩年了,API也一直有不小的變動,需要開發(fā)者一直關(guān)注Rxjava的變更。

當(dāng)然這并不能妨礙我們對這一框架的膜拜和學(xué)習(xí),下面進入正題...

  • RxJava的概念們

RxJava主要有四個主體概念:Scheduler(線程控制)、Observable (被觀察者)、Observer (觀察者)以及Subscribe (訂閱、事件)


基本關(guān)系

1.Scheduler(線程控制)

Scheduler負(fù)責(zé)線程調(diào)度,RxJava 通過它來指定每一段代碼應(yīng)該運行在什么樣的線程。RxJava 已經(jīng)內(nèi)置了幾個 Scheduler ,它們已經(jīng)適合大多數(shù)的使用場景:

  Schedulers.computation() 用于計算任務(wù),如事件循環(huán)或和回調(diào)處理,不要用于IO操作(IO操作請使用Schedulers.io());默認(rèn)線程數(shù)等于處理器的數(shù)量
  Schedulers.from(executor) 使用指定的Executor作為調(diào)度器
  Schedulers.immediate(?) 在當(dāng)前線程立即開始執(zhí)行任務(wù)
  Schedulers.io() 用于IO密集型任務(wù),如異步阻塞IO操作,這個調(diào)度器的線程池會根據(jù)需要增長;對于普通的計算任務(wù),請使用Schedulers.computation()
  Schedulers.newThread() 為每個任務(wù)創(chuàng)建一個新線程
  Schedulers.trampoline() 當(dāng)其它排隊的任務(wù)完成后,在當(dāng)前線程排隊開始執(zhí)行
  另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行。

Observable通過使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制。

  • subscribeOn(): 指定 subscribe() 所發(fā)生的線程,即 Observable.OnSubscribe 被激活時所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。
  • observeOn(): 指定 Subscriber 所運行在的線程?;蛘呓凶鍪录M的線程。
  Observable.create(new Observable.OnSubscribe(){...})//創(chuàng)建一個訂閱獲取網(wǎng)絡(luò)數(shù)據(jù)的Observable 
  .subscribeOn(Schedulers.io())// 在非UI線程中執(zhí)行獲取網(wǎng)絡(luò)數(shù)據(jù)
  .observeOn(AndroidSchedulers.mainThread())// 在UI線程中執(zhí)行結(jié)果

寫個獲取網(wǎng)絡(luò)數(shù)據(jù)的偽代碼

  Observable.create(new Observable.OnSubscribe<NewsBean>() {
      @Override
      public void call(Subscriber<? super NewsBean> subscriber) {
          //獲取網(wǎng)絡(luò)數(shù)據(jù)
          NewsBean bean = loadDataFromNet();
          subscriber.onNext(bean);
      }
  })
          .subscribeOn(Schedulers.io())// 在非UI線程中執(zhí)行獲取網(wǎng)絡(luò)數(shù)據(jù)
          .observeOn(AndroidSchedulers.mainThread())// 在UI線程中執(zhí)行結(jié)果
          .subscribe(new Observer<NewsBean>() {
              @Override
              public void onCompleted() {
              }

              @Override
              public void onError(Throwable e) {
              }

              @Override
              public void onNext(NewsBean newsBean) {
                  //更新UI上面的數(shù)據(jù)
                  mViewHolder.bindData(newsBean);
              }
          });

是不是沒有Thread,沒有handler,沒有AsyncTask

2.Observable (被觀察者)

Observable可以說是使用的核心了,熟悉了Observable的API基本上就了解了RxJava所能幫你做那些事情了,先來看一下Observable中API們,其實Observable這個類本身就是Observable的一個生成器模式,提供了大量的在不同場景下面創(chuàng)造一個被觀察者的方法。

  • 下面來看一個最簡單的創(chuàng)建Observable的方法
  Observable observable;
  observable = Observable.create(new Observable.OnSubscribe<String>() {
      @Override
      public void call(Subscriber<? super String> subscriber) {
          subscriber.onNext("onStart");
          subscriber.onNext("onDestroy");
          //onCompleted 和 onError都是終止調(diào)用,只會被出發(fā)之一
          subscriber.onCompleted();
          subscriber.onError(new Throwable("onError"));
      }
  });

是不是很好理解,就是創(chuàng)建一個帶有OnSubscribe的被觀察者,OnSubscribe接口里面的call方法是在當(dāng)observable.subscribe的時候被調(diào)用(同OnSubscribe的字面意思:在被訂閱的時候做的事)

  • 而且Observable提供了非常多的創(chuàng)建方法(也就是其他很多文章中的“操作符”),下面列舉一下Observable常用的方法和介紹

subscribe
創(chuàng)建一個觀察者觀察當(dāng)前事件,并且調(diào)用當(dāng)前被觀察者的call()方法

  Observable.create(new Observable.OnSubscribe<String>() {
      @Override
      public void call(Subscriber<? super String> subscriber) {
          subscriber.onNext("onStart");
          subscriber.onNext("onDestroy");
      }
  }).subscribe(new Observer<String>() {
      @Override
      public void onCompleted() {
      }

      @Override
      public void onError(Throwable e) {
      }

      @Override
      public void onNext(String s) {
          println(s);
      }
  });
//結(jié)果就是打印出了
//onStart
//onDestroy

just & from
靜態(tài)方法用來創(chuàng)建了一個Observable:順序調(diào)用subscriber.onNext執(zhí)行定義的數(shù)組

Observable<String> observable = Observable.just("onStart","onDestroy");//完全等同于上面的create方法
//同等
String[] operations = {"onStart","onDestroy"};
Observable observable = Observable.from(operations);

**map **
創(chuàng)建一個新的Observable包裝一個現(xiàn)有的Observable,使得每個到達觀察者的數(shù)據(jù)是新的Observable來的。

  Observable<String> observable = Observable.just("onStart","onDestroy");
  Observable mapObservable = observable.map(new Func1<String, String>() {
      @Override
      public String call(String s) {
          return "activity " + s;
      }
  });
//那么到達mapObservable的觀察者的數(shù)據(jù)就是("activity onStart","activity onDestroy")

take
限制觀察者的onNext方法調(diào)用的次數(shù)

Observable<String> observable = Observable.just("onStart","onDestroy");
Observable<String> takeObservable = observable.take(1);
//這樣到達takeObservable的觀察者的數(shù)據(jù)就只有("onStart")了

Observable的方法先簡單介紹到這把,后面講Observable原理的時候再詳細(xì)介紹其它的flatMap、filter、doOnNext、zip、throttleFirst、forEach等方法吧

3.Observer (觀察者)

這里說的觀察者不完全是指Observer這個類,觀察者是訂閱被觀察者的事件,并且處理被觀察者傳遞過來的數(shù)據(jù)的類。
觀察者有三個“動作”共同完成完整數(shù)據(jù)流的處理

public interface Observer<T> {

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onCompleted();

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onCompleted}.
     * 
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(Throwable e);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or
     * {@link #onError}.
     * 
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(T t);
}

使用

        observable.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.d(tag, "onCompleted !");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(tag, "onError : " + e);
            }

            @Override
            public void onNext(String s) {
                Log.d(tag, "onNext : " + s);
            }
        });

這里看起來是不是很奇怪,像是:一個被觀察者注冊了一個觀察者
NoNoNo,其實這是為了代碼的易讀和易寫,能夠很完整的寫完一整套響應(yīng)處理
因為在Rxjava里面的基礎(chǔ)動作是Action(無返回參數(shù))和Func(有返回參數(shù)),所以在被觀察者Observable中

public final Subscription subscribe(final Observer<? super T> observer) {
...//這里其實將Observer轉(zhuǎn)換成了Subscriber
}
等于
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
...//這里其實將onNext、onError、onComplete轉(zhuǎn)換成了Subscriber
}
因為在Observable中沒有直接使用Observer而是最終調(diào)用的
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
}

所以最終調(diào)用的靜態(tài)方法,是不是就是像在一個被觀察者中訂閱一個事件啦。

4.Subscribe (訂閱、事件)

是不是很屌,接到上面說的其實:“一個被觀察者注冊了一個觀察者”
其實就是:在一個被觀察者中訂閱一個事件
那么,subscribe a Subscriber<? super T> to Observable<T> 做了什么呢?下面我用偽代碼寫一下

    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        // validate and proceed
        ...

        // do nothing by default
        subscriber.onStart();

        // subscriber是個abstract class當(dāng)然需要變成實體類啦
        if (!(subscriber instanceof SafeSubscriber)) {
            //SafeSubscriber就是一個保證了數(shù)據(jù)傳遞完整性的實體對象
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        try {
            observable.onSubscribe.call(subscriber);
            return subscriber;
        } catch (Throwable e) {
            try {
                subscriber.onError(e);
            } catch (Throwable e2) {
                throw new RuntimeException by e2;
            }
            //return a static final Unsubscribed Subscription;
            return Subscriptions.unsubscribed();
        }
    }

So , demo可以看上面獲取網(wǎng)絡(luò)數(shù)據(jù)的偽代碼 ↖(ω)↗.


基本講到這,相信都對RxJava有了個簡單的了解。。。。但是

其實上面所有講的都沒有:異步的細(xì)節(jié)、被觀察者的細(xì)節(jié)、響應(yīng)式編程思想的分析等等,有空再繼續(xù)做一些Rxjava造輪子的研究和Android中Rxjava能夠帶來多大的便利

聽說簡書還可以打賞支持,有沒有老司機帶帶我 (ˉ﹃ˉ)

  • 歡迎拍磚,原文鏈接
  • 歡迎交流,QQ:569785819
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容