RxJava,你好

在我研究響應(yīng)式編程的過(guò)程中,我所找到的每一篇文章幾乎都以響應(yīng)式編程很難學(xué)習(xí)的理念開(kāi)頭。針對(duì)響應(yīng)式編程零基礎(chǔ)人員準(zhǔn)備的文章少之又少。本文嘗試通過(guò)在android上使用RxJava為初學(xué)者厘清響應(yīng)式編程的基本概念。

什么是響應(yīng)式編程?

響應(yīng)式編程就是編程處理異步數(shù)據(jù)流。

等等,我使用callback也很容易處理異步數(shù)據(jù)啊。所以這和響應(yīng)式編程有什么不同呢?

是的,這個(gè)概念并不新鮮。它可以通過(guò)命令式(imperatively)編程來(lái)完成,而且通常都是這么做的。

如果我們不僅僅考慮回調(diào),同時(shí)再考慮一下讓回調(diào)啟動(dòng)并運(yùn)行的支持機(jī)制。使用命令式方法來(lái)支持通常會(huì)涉及到狀態(tài)管理還需要考慮狀態(tài)改變所帶來(lái)的副作用。在軟件開(kāi)發(fā)界,這些考慮已經(jīng)成為大量錯(cuò)誤的原因。響應(yīng)式編程采用函數(shù)式方法;它處理的是流從而避免了全局狀態(tài)以及相應(yīng)的副作用。

什么是流?

萬(wàn)物皆流,無(wú)物常住 --- 赫拉克利特

流代表一個(gè)數(shù)據(jù)序列。想象一下我們的交通系統(tǒng)。某條高速路上的汽車就是一條一直流動(dòng)偶爾出現(xiàn)一個(gè)瓶頸的對(duì)象流。所謂響應(yīng)式編程,就是我們接收連續(xù)流動(dòng)的數(shù)據(jù)--數(shù)據(jù)流--提供處理數(shù)據(jù)流的方法并將該方法應(yīng)用到數(shù)據(jù)流。數(shù)據(jù)的源頭我們并不(也不應(yīng))關(guān)心。

數(shù)據(jù)流無(wú)處不在。任何物體都可以是數(shù)據(jù)流:變量,用戶輸入,屬性,緩存,數(shù)據(jù)結(jié)構(gòu)等等。

什么是聲明式編程,什么是命令式編程?

  • 命令式引導(dǎo)你怎么做
  • 聲明式告訴你做什么

在深入代碼之前,我們還是來(lái)看一下我們的交通系統(tǒng)網(wǎng)絡(luò)。讓我們假設(shè)市長(zhǎng)想臨時(shí)在一條指定的高速路上分間隔擺放停車標(biāo)志來(lái)中斷車流。市長(zhǎng)會(huì)說(shuō):“將高速路分成均勻的幾段,在每一段的邊界上放一個(gè)停車標(biāo)志”。承包商會(huì)說(shuō):“等等,在分段之前,我需要確定每一段的長(zhǎng)度;為了確定分段長(zhǎng)度,我需要知道高速路的總長(zhǎng),我們要放置多少個(gè)停車標(biāo)志,以及車輛的平均長(zhǎng)度”。在這個(gè)場(chǎng)景里,市長(zhǎng)擁有足夠多的職能部門(mén)(包括DOT(交通部)),她在處理事情時(shí)只需要專注于宣布她的意圖,而不用關(guān)心事情具體怎么完成的細(xì)節(jié)--這就是聲明式方法。而另一方面。承包商需要保證整個(gè)流程的每一處細(xì)節(jié)都要考慮周全并準(zhǔn)確的完成--這就是命令式方法。如果你可以像市長(zhǎng)建設(shè)她的城市一樣構(gòu)建一個(gè)軟件會(huì)是什么樣子呢?我們一起來(lái)看一個(gè)示例:

例:使用命令式方法過(guò)濾掉偶數(shù)。

Integer[] numbers = {1, 2, 3, 4, 5};
List<Integer> lists = Arrays.asList(numbers);
List<Integer> results = new ArrayList<>();

for (Integer num : numbers) {
    if (num % 2 != 0)
        results.add(num);
}

聲明式方法

List<Integer> results = lists.stream()
        .filter(s -> s % 2 != 0)
        .collect(Collectors.toList());

很酷,我喜歡聲明式方法,但是如果我們不告訴它怎么做,計(jì)算機(jī)怎么知道做什么呢?
在現(xiàn)在的世界里,任何事情最終落實(shí)到操作系統(tǒng)和硬件時(shí)都是命令式的。而響應(yīng)式編程,是函數(shù)式編程的一種抽象。就和我們所使用的高階命令式編程語(yǔ)言是底層二進(jìn)制以及匯編命令的抽象一樣(市長(zhǎng)也需要有她的DOT承包商)。

所以,我們?cè)鯓釉贘ava中使用聲明式編程風(fēng)格呢?
Java8有一個(gè)很驚艷的Stream API,但是如果你和我一樣是一個(gè)Android開(kāi)發(fā)者,你不能使用Stream API,因?yàn)閍ndroid還不支持Java8的所有特性。盡管如此,你可以使用RxJava,這是由Netflix的開(kāi)發(fā)者為Java提供的一個(gè)響應(yīng)式擴(kuò)展。

RxJava怎么工作?

響應(yīng)式代碼的基礎(chǔ)是被觀察者(Observable)觀察者(Observer)。

  • 被觀察者(Observable)可以被監(jiān)聽(tīng)(和觀察者模式中的Subject相似)
  • 觀察者(Observer)則監(jiān)聽(tīng)被觀察者

被觀察者是一個(gè)發(fā)送數(shù)據(jù)流或者事件流的類,觀察者則對(duì)被觀察者發(fā)送出的數(shù)據(jù)/事件做出反應(yīng)。一個(gè)被觀察者可以有多個(gè)觀察者,對(duì)于被觀察者發(fā)送出的每一個(gè)事件/項(xiàng)目都會(huì)被Observer.onNext()方法接收并處理。一旦被觀察者發(fā)送完了所有的數(shù)據(jù)它會(huì)調(diào)用Observer.onComplete()。如果發(fā)生錯(cuò)誤,被觀察者會(huì)調(diào)用Observer.onError()方法。

注意: 有的被觀察者永遠(yuǎn)都不會(huì)終止(比如溫度傳感器的輸出)。

觀察者和被觀察者之間通過(guò)Subscription連接,觀察者在后面也可以通過(guò)Subscription取消訂閱被觀察者。

聽(tīng)起來(lái)和觀察者模式很相似,那么觀察者模式和RxJava框架之間有什么區(qū)別呢?
RxJava的被觀察者為觀察者模式添加了兩個(gè)功能。

  • 當(dāng)不再產(chǎn)生數(shù)據(jù)時(shí),生產(chǎn)者會(huì)通知消費(fèi)者。(onComplete()
  • 當(dāng)發(fā)生錯(cuò)誤時(shí),生產(chǎn)者會(huì)通知消費(fèi)者。(onError()

除此之外,RxJava的威力在于僅僅只需要幾行代碼就可以變換,聚合過(guò)濾被觀察者發(fā)送的數(shù)據(jù)流,這樣可以極大的減少需要維護(hù)的狀態(tài)變量。

給我看代碼

創(chuàng)建一個(gè)被觀察者(Observable):

Integer[] numbers = {1, 2, 3, 4, 5, 6, 7};
List<Integer> lists = Arrays.asList(numbers);
Observable<Integer> integerObservable = Observable.from(lists);

integerObservable將發(fā)射數(shù)字1、2、3、4、5、6、7然后結(jié)束。

注意: 創(chuàng)建被觀察者的方式有很多很多。更多信息可以參考官方文檔。

Subscriber

Subscriber是一種特殊類型的觀察者,它可以取消訂閱被觀察者。

    Observable.
    Subscriber<Integer> mySubscriber = new Subscriber<Integer>() {
        @Override
        public void onNext(Integer data) {
           Log.d("Rx", "onNext:"+data);
         }
    
        @Override
        public void onCompleted() {     
           Log.d("Rx","Complete!"); 
        }
    
        @Override
        public void onError(Throwable e) { 
          // handle your error here
        }
    };

將Subscriber連接到被觀察者:

被觀察者是惰性的,在沒(méi)有訂閱者監(jiān)聽(tīng)之前它不會(huì)做任何事情。

    myObservable.subscribe(mySubscriber);
    // Outputs:
    // onNext: 1
    // onNext: 2
    // onNext: 3
    // onNext: 4
    // onNext: 5
    // onNext: 6
    // onNext: 7
    // Complete!

改變流:

RxJava提供了許多改變流的運(yùn)算符。下面幾個(gè)操作方法是最常用的。

  • Filter:Filter運(yùn)算符會(huì)過(guò)濾被觀察者,被觀察者發(fā)射的數(shù)據(jù)中只有通過(guò)你在謂詞函數(shù)中指定的測(cè)試后才能繼續(xù)往下流動(dòng)。

      integerObservable.filter(new Func1<Integer, Boolean>() {
          @Override
          public Boolean call(Integer o) {
              return o % 2 == 0;
          }
      }).subscribe(mySubscriber);
      // Outputs :
      // onNext: 2
      // onNext: 4
      // onNext: 6
      // Complete!
    

這里我過(guò)濾掉了所有的奇數(shù)項(xiàng)。

    ---1---2---3---4----5----6----7---|-->
             filter(x % 2 == 0)
    -------2-------4---------6--------|-->

注意: Func<T, R>表示一個(gè)單參數(shù)的函數(shù),T是第一個(gè)參數(shù)的類型,R是返回結(jié)果的類型。

  • Map: Map運(yùn)算符將會(huì)將你指定的函數(shù)應(yīng)用到被觀察者發(fā)射的每一項(xiàng),并返回一個(gè)被觀察者,這個(gè)被觀察者發(fā)射的數(shù)據(jù)就是你指定函數(shù)的返回結(jié)果。

      integerObservable.map(new Func1<Integer, Integer>() {
          @Override
          public Integer call(Integer value) {
              return value * value;
          }
      }).subscribe(mySubscriber);
      // onNext:1
      // onNext:4
      // onNext:9
      // onNext:16
      // onNext:25
      // onNext:36
      // onNext:49
      // Complete!
    

這里我使用map運(yùn)算符將發(fā)射出的數(shù)據(jù)改變成另外一個(gè)數(shù)。我改變了integerObservable所發(fā)射出的每一項(xiàng),所以最后每一個(gè)數(shù)據(jù)都變成了該數(shù)據(jù)的平方。

    ---1---2---3---4----5----6----7---|-->
            map(x -> x * x)
    ---1---4---9---16---25---36---49---|-->

RxJava中有大量的操作符用于處理流變換。

好吧,你不是說(shuō)響應(yīng)式編程是異步的嗎?

如果你不告訴它需要使用異步的方式,RxJava默認(rèn)是同步的。

但是同步是響應(yīng)式系統(tǒng)必須的行為嗎?

確定是使用異步還是同步的被觀察者需要根據(jù)具體的問(wèn)題分析。例如:從內(nèi)存緩存中獲取數(shù)據(jù)并立即返回也許使用同步會(huì)更合適。另一方面,如果被觀察者會(huì)產(chǎn)生網(wǎng)絡(luò)調(diào)用或者一些耗時(shí)的數(shù)據(jù)處理則應(yīng)該使用異步的方式??偟脑瓌t就是:如果是在開(kāi)發(fā)一個(gè)圖形系統(tǒng),當(dāng)一項(xiàng)任務(wù)起源于UI線程并且需要阻塞(或者大量的計(jì)算操作)時(shí)應(yīng)該采用異步的方式。

在異步從何而來(lái)這個(gè)問(wèn)題上,RxJava持不可知論者的態(tài)度。

了解,現(xiàn)在告訴我怎么創(chuàng)建一個(gè)異步的observable?

首先,我們來(lái)看一下使用RxJava之前,將一個(gè)密集的長(zhǎng)時(shí)間的I/O操作轉(zhuǎn)移的其他線程(非ui線程)時(shí)的處理方式。

以前的方式

    private class FetchUsersTask 
                extends AsyncTask<String, Void, User> {
    
      protected User doInBackground(String... someData) {
        String userId=params[0];
        User user = UsersService.getUser(userId);
        return user;
      }
    
      protected void onPostExecute(User user) {
        //handle the result and update the view
      }
    }

FetchUsersTask調(diào)用usersService.getUsers()并返回一個(gè)字符串列表,然后傳遞給onPostExecute()方法??雌饋?lái)非常簡(jiǎn)單,但是這段代碼中存在一些問(wèn)題

  • 錯(cuò)誤處理: doInBackground()也許會(huì)發(fā)生錯(cuò)誤或者異常,為了能夠從異常中恢復(fù)我們需要添加try-catch。通常情況下,當(dāng)我們捕獲到異常時(shí),我們會(huì)log輸出并且通知用戶,這又需要回到UI線程。使用AsyncTask我們也可以將Object作為doInBackground()方法的返回類型,然后在onPostExecute()中使用instanceof來(lái)檢查類型--這需要更多的代碼。
  • 內(nèi)存泄漏: 即使啟動(dòng)異步任務(wù)的Activity/Fragment 被銷毀,異步任務(wù)仍持續(xù)運(yùn)行直到doInBackground()方法執(zhí)行完成,因?yàn)樵诤笈_(tái)任務(wù)完成后asyncTask需要通知view,所以運(yùn)行時(shí)必須持有Activity/Fragment的引用。如果Activity在后臺(tái)任務(wù)完成之前就被銷毀,而開(kāi)發(fā)者又沒(méi)有采取合適的技術(shù)比如弱引用,那么就會(huì)導(dǎo)致內(nèi)存泄漏甚至是應(yīng)用程序崩潰。也許可以采用另一種方法,就是使用cancel(boolean)來(lái)取消正在執(zhí)行的任務(wù),那么最終會(huì)調(diào)用onCancelled()方法而不是onPostExecute()。
  • 連續(xù)多次網(wǎng)絡(luò)調(diào)用: 編排多個(gè)AsyncTask的唯一辦法就是使用嵌套,這將使得代碼非常復(fù)雜。

RxJava方式:

現(xiàn)在我們來(lái)看一下怎么使用RxJava來(lái)異步加載數(shù)據(jù)。

    Observable.fromCallable(new Callable<User>() {
          @Override public User call() throws Exception {
             return UsersService.getUser(userId);
           }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<User>() {
          @Override public void onCompleted() {
            Log.d("Rx", "Completed");
          }
    
          @Override public void onError(Throwable e) {
            Log.d("Rx", e.getMessage());
          }
    
          @Override public void onNext(User user) {
            Log.d("Rx", user.getName());
          }
        });

這里subscribeOn(Schedulers.io())將使observable工作在新的線程,而observeOn(AndroidSchedulers.mainThread()))將使訂閱者在主UI線程上去處理observable發(fā)送出來(lái)的結(jié)果。

這和AsyncTask很相似但是更簡(jiǎn)單更簡(jiǎn)潔。RxJava解決了我前面提到的所有問(wèn)題。

  • 錯(cuò)誤處理:使用RxJava方式以后錯(cuò)誤處理變得非常簡(jiǎn)單,因?yàn)樗锌赡艿腻e(cuò)誤和異常都會(huì)拋給onError()方法。由于我們是在主線程監(jiān)聽(tīng)(AndroidSchedulers.mainThread()),所以我們可以非常便捷的和UI交互從而告知用戶相關(guān)錯(cuò)誤。

  • 內(nèi)存泄漏:RxJava不會(huì)魔力般的減輕內(nèi)存泄漏的問(wèn)題,但是要阻止內(nèi)存泄漏則非常簡(jiǎn)單。RxJava提供了非常簡(jiǎn)潔的方式來(lái)取消訂閱正在執(zhí)行的異步調(diào)用。調(diào)用 subscriber 或者 subscription 的 unsubscribe()方法可以讓Activity或者Fragment從通知列表中注銷掉。如果你有多個(gè)subscription,可以使用CompositeSubscription來(lái)持有所有的Subscriptions,然后在onDestroy()或者onDestroyView()中一次取消所有的訂閱。

      private CompositeSubscription allSubscriptions = 
                new CompositeSubscription();
      //add all the subscription to allSubscriptions
      allSubscriptions.add(subscription1);
      allSubscriptions.add(subscription2); 
      allSubscriptions.add(subscription3);
      //clear all subscription on onDestroy
      @Override    
      public void onDestroy() 
      {        
           super.onDestroy();       
           allSubscriptions.clear();    
      }
    
  • 連續(xù)多次網(wǎng)絡(luò)調(diào)用:有許多運(yùn)算符可以幫助我們串聯(lián)并修改observable。一旦理解,連續(xù)多次進(jìn)行網(wǎng)絡(luò)調(diào)用將變得非常簡(jiǎn)單。我們一起來(lái)看一下這樣的場(chǎng)景,從第一次網(wǎng)絡(luò)調(diào)用中獲取到了一個(gè)用戶ID列表,然后需要對(duì)每一個(gè)用戶ID調(diào)用getUser()來(lái)獲取用戶的信息。

      Observable.fromCallable(new Callable<List<String>>() {
        @Override public List<String> call() throws Exception {
          return UserService.getUserIds();
        }
      }).flatMap(new Func1<List<String>, Observable<String>>() {
        @Override public Observable<String> call(List<String> userIds) {
          return Observable.from(userIds);
        }
      }).flatMap(new Func1<String, Observable<User>>() {
        @Override public Observable<User> call(String userId) {
          return Observable.just(UserSerive.getUser(userId));
        }
      })
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Subscriber<User>() {
        @Override public void onCompleted() {
          Log.d("Rx", "emit","Completed!");
        }
      
        @Override public void onError(Throwable e) {
          Log.d("Rx", "emit", e.getMessage());
        }
      
        @Override public void onNext(User user) {
          Log.d("Rx", user.getName());
        }
      });
    

使用Lambda后的代碼:

    Observable.fromCallable(() -> UsersService.getUserIds())
         .flatMap(userIds -> Observable.from(userIds))
         .flatMap(userId -> Observable.just(UserService.getUser(userId))
         .subscribeOn(Schedulers.io())
         .observeOn(AndroidSchedulers.mainThread()))
         .subscribe(new Subscriber<User>() {
            @Override
            public void onCompleted() {
                Log.d("Rx", "emit","Completed!");
            }
    
            @Override
            public void onError(Throwable e) {
                Log.d("Rx", "emit", e.getMessage());
            }
    
            @Override
            public void onNext(User user) {
                Log.d("Rx", "emit", user.getName());
            }
    });

下面的圖標(biāo)描述了將一個(gè)含有單個(gè)字符串列表流變化成含有多個(gè)用戶信息流的過(guò)程。

    -------{~~~~~~~~~~~~list of user ids [1,2,3,4,5]~~~~~~~~~}---|-->
       
               flatMap(userIds -> Observable.from(userIds))
    -------1------------2----------3------------4------------5---|--->
              flatMap (userId -> UserService.getUser(userId))
    ----user1--------user2------user3--------user4-------user5---|--->

參考文獻(xiàn)

本文譯自Howdy RxJava

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容