RxJava 系列 (二)RxJava 1.0 操作符

本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應式編程作為結合使用的,對什么是操作、操作符都有哪些以及RxJava中的操作符進行介紹,最后介紹一下RxJava操作符在android app開發(fā)實際生產(chǎn)中使用情況。

目錄

一.編程方式介紹

1.函數(shù)式編程
2.響應式編程
3.函數(shù)響應式編程

二、操作符介紹

1.什么是操作符?
2.RxJava中的操作符簡介
3.RxJava中的操作符分類

三、操作符在Android中的應用實踐

1.線程切換注意事項
2.線程的生命周期
3.線程池的使用

一、編程方式介紹

常見的編程方式有函數(shù)式編程、響應式編程、命令式編程、面向對象編程、面向接口編程、面向過程編程。RxJava函數(shù)響應式編程是將函數(shù)式編程與響應式編程的結合。

  • 1.函數(shù)式編程
    函數(shù)是什么?根據(jù)入?yún)⒌玫椒祷刂怠ava中的方法、C中的函數(shù)、JavaScript中的function。在編程語言中方法的參數(shù)可以是方法(函數(shù)),返回值也可以是方法(函數(shù)),適用于計算處理的場景。函數(shù)式編程是編程中是一種手段或者方法。被重新提出和重視。
    函數(shù)式編程有一些特點:
    純函數(shù)的定義是,對于相同的輸入,永遠會得到相同的輸出,而且沒有任何可觀察的副作用,也不依賴外部環(huán)境的狀態(tài)。int method1(int a, int b);
    函數(shù)柯里化(curry)的定義很簡單:傳遞給函數(shù)一部分參數(shù)來調用它,讓它返回一個函數(shù)去處理剩下的參數(shù)。
    var add = function(x){
        return function(y){
        return x + y
        }
    }
    var add = x => (y => x + y);  // js ES6寫法,也是比較正統(tǒng)的函數(shù)式寫法 

函數(shù)組:學會了使用純函數(shù)以及如何把它柯里化之后,我們會很容易寫出這樣的“包菜式”代碼:h(g(f(x)));
高階函數(shù):就是參數(shù)為函數(shù)或者返回值為函數(shù)的函數(shù),有了高階函數(shù),就可以將復用力度降低到函數(shù)級別,相對于面向對象語言,復用的力度更低
閉包:函數(shù)編程支持函數(shù)作為第一類對象,有時稱為閉包或者仿函數(shù)(functor)對象。實質上,閉包是起函數(shù)的作用并可以像對象一樣操作的對象。
惰性求值:函數(shù)式編程語言還提供惰性求值(Lazy evaluation,也稱作call-by-need),是在將表達式賦值給變量(或稱作綁定)時并不計算表達式的值,而在變量第一次被使用時才進行計算。這樣就可以通過避免不必要的求值提升性能。
無狀態(tài) 變量的不變性,入?yún)⑹菬o變化的。

簡單說,"函數(shù)式編程"是一種"編程范式"(programming paradigm),也就是如何編寫程序的方法論。它屬于"結構化編程"的一種,主要思想是把運算過程盡量寫成一系列嵌套的函數(shù)調用。函數(shù)式編程是給軟件開發(fā)者提供的另一套工具箱,為我們提供了另外一種抽象和思考的方式。Lambda演算在最初設計的時候就是為了研究計算相關的問題。所以函數(shù)式編程主要解決的也是計算問題,函數(shù)式編程是一種編程模型,他將計算機運算看做是數(shù)學中函數(shù)的計算,并且避免了狀態(tài)以及變量的概念。函數(shù)式編程的抽象本質則是將函數(shù)也作為一個抽象單位,而反映成代碼形式,則是高階函數(shù)。

  • 2.響應式編程
    響應式編程就是與異步數(shù)據(jù)流交互的編程范式。在某種程度上,這并不是什么新東西。事件總線(Event buses)或咱們常見的單擊事件就是一個異步事件流,你可以觀察這個流,也可以基于這個流做一些自定義操作(原文:side effects,副作用,本文皆翻譯為自定義操作)。響應式就是基于這種想法。你能夠創(chuàng)建所有事物的數(shù)據(jù)流,而不僅僅只是單擊和懸停事件數(shù)據(jù)流。 流廉價且無處不在,任何事物都可以當作一個流:變量、用戶輸入、屬性、緩存、數(shù)據(jù)結構等等。比如,假設你的微博評論就是一個跟單擊事件一樣的數(shù)據(jù)流,你能夠監(jiān)聽這個流,并做出響應。響應式編程(Reactive Programming 或稱反應式編程)是一種流行的編程方法,編寫代碼是基于對變化的反應。它的靈感來自于我們的日常生活,也即我們?nèi)绾尾扇⌒袆右约芭c他人溝通。和平常經(jīng)常聽說的面向對象編程和函數(shù)式編程一樣,響應式編程(Reactive Programming)就是一個編程范式,但是與其他編程范式不同的是它是基于數(shù)據(jù)流和變化傳播的。
    我們經(jīng)常在程序中這樣寫 A = B + C
    A被賦值為B和C的值。這時,如果我們改變B的值,A的值并不會隨之改變。而如果我們運用一種機制,當B或者C的值發(fā)現(xiàn)變化的時候,A的值也隨之改變,這樣就實現(xiàn)了”響應式“。而響應式編程的提出,其目的就是簡化類似的操作,因此它在用戶界面編程領域以及基于實時系統(tǒng)的動畫方面都有廣泛的應用。另一方面,在處理嵌套回調的異步事件,復雜的列表過濾和變換的時候也都有良好的表現(xiàn)。Reactive響應式(反應式)編程 是一種新的編程風格,其特點是異步或并發(fā)、事件驅動、推送PUSH機制以及觀察者模式的衍生。reactive應用(響應式應用)允許開發(fā)人員構建事件驅動(event-driven),可擴展性,彈性的反應系統(tǒng):提供高度敏感的實時的用戶體驗感覺,可伸縮性和彈性的應用程序棧的支持,隨時可以部署在多核和云計算架構。

響應式編程特點

  • 響應性是指一個系統(tǒng)應該總是能夠及時響應用戶請求,并且保持很低的延遲。
  • 彈性是指一個系統(tǒng)即使在部分組件開始出現(xiàn)故障的情況下也應該能夠作出響應,將停機時間將至最低。
  • 可伸縮性是指一個系統(tǒng)在負載增加時應該能夠根據(jù)需求增加資源以確保響應性,但同時也應該能在負載降低時減少資源,保持高效的資源利用率。
  • 消息驅動是指在一個系統(tǒng)的不同部分之間傳遞消息

響應式流是一種規(guī)范,下面三個重要的概念是響應式流API的構建基礎:

  • 發(fā)布者是事件的發(fā)送方,可以向它訂閱。
  • 訂閱者是事件訂閱方。
  • 訂閱將發(fā)布者和訂閱者聯(lián)系起來,使訂閱者可以向發(fā)布者發(fā)送信號。

響應式編程是一種基于異步數(shù)據(jù)流概念的編程模式。數(shù)據(jù)流就像一條河:它可以被觀測,被過濾,被操作,或者為新的消費者與另外一條流合并為一條新的流。響應式編程的一個關鍵概念是事件。事件可以被等待,可以觸發(fā)過程,也可以觸發(fā)其它事件。事件是唯一的以合適的方式將我們的現(xiàn)實世界映射到我們的軟件中:如果屋里太熱了我們就打開一扇窗戶。同樣的,當我們更改電子表(變化的傳播)中的一些數(shù)值時,我們需要更新整個表格或者我們的機器人碰到墻時會轉彎(響應事件)。

  • 3.函數(shù)響應式編程
    FRP與普通的函數(shù)式編程相似,但是每個函數(shù)可以接收一個輸入值的流,如果其中,一個新的輸入值到達的話,這個函數(shù)將根據(jù)最新的輸入值重新計算,并且產(chǎn)生一個新的輸出。這是一種”數(shù)據(jù)流"編程模式。
    而主要利用函數(shù)式編程(Functional Programming)的思想和方法(函數(shù)、高階函數(shù))來支持Reactive Programming就是所謂的Functional Reactive Programming,簡稱FRP。FPR 將輸入分為兩個基礎的部分:行為(behavior)和事件(events) 。這兩個基本元素在函數(shù)響應式編程中都是第一類(first-class)值。 其中行為是隨時間連續(xù)變化的數(shù)據(jù),而事件則是基于離散的時間序列 。例如:在我們操作網(wǎng)頁的時候,會觸發(fā)很多的事件,包括點擊,拖動,按鍵事件等。這些事件都是不連續(xù)的。對事件求值是沒有意義的,所有我們一般要通過fromEvent,buffer等將其變成連續(xù)的行為來做進一步處理。與RP相比,F(xiàn)RP更偏重于底層。由于采用了函數(shù)式編程范式,F(xiàn)RP也自然而然帶有其特點。這其中包括了不可變性,沒有副作用以及通過組合函數(shù)來構建程序等特點。

二、操作符介紹

  • 1.什么是操作符?

指令系統(tǒng)的每一條指令都有一個操作符,它表示該指令應進行什么性質的操作。
操作符,常見于計算機語言之中,不同的指令用操作符這個字段的不同編碼來表示,每一種編碼代表一種指令。
指令系統(tǒng)的每一條指令都有一個操作符,它表示該指令應進行什么性質的操作。不同的指令用操作符這個字段的不同編碼來表示,每一種編碼代表一種指令。組成操作符字段的位數(shù)一般取決于計算機指令系統(tǒng)的規(guī)模。
操作符用于操作數(shù)據(jù)并生成一個新值。

先看看大家熟悉的java操作符的相關的就比較好理解了。
Java的算數(shù)操作符與其它大多數(shù)程序設計語言都是相同的,其中包括加號(+)、減號(-)、乘號(×)、除號(÷)以及取模(%)。
也就是說 int i = j+k; 這個語句中的 “+” 就是一個操作符?!?”也是一個操作符,“+”為算數(shù)操作符、“=”為賦值操作符。
布爾操作符 !、 &&、 ||
關系比較操作符 < 、 <= 、 > 、>=、 != 、 == 、 === 、 !==

還有些人會發(fā)現(xiàn)這些都是編程語言級別的東西,怎么RxJava一個框架要定義這些東西?RxJava是 函數(shù)響應式編程,是以函數(shù)作為基本元素,相較于原來的編程都是變量、常量、對象這些作為基本元素。
響應式編程是一種面向數(shù)據(jù)流和變化傳播的編程范式。這意味著可以在編程語言中很方便地表達靜態(tài)或動態(tài)的數(shù)據(jù)流,而相關的計算模型會自動將變化的值通過數(shù)據(jù)流進行傳播。

  • 2.RxJava中的操作符簡介
  • 就是建立在變量、常量、對象這些編程方法的基礎上,搭建的另一種編程方式或者是編程語言,只是這個語言的基礎是原來的編程的方式。
    通過RxJava中的規(guī)則完成編程的工作,這些規(guī)則的實現(xiàn)就是RxJava操作符。
  • RxJava將函數(shù)創(chuàng)建成RxJava中的元素Observable,然后進行變換、條件篩選、過濾、組合等操作然后將處理的結果發(fā)送給接收者Observer。
  • 這里邊的兩個英文單詞要區(qū)分清楚Observable 和Observer,一個是可被觀察者、一個是觀察者??杀挥^察者Observable完成一系列操作后將數(shù)據(jù)發(fā)送給觀察者Observer,一個是發(fā)射者,一個是接收者。弄清楚了,下面的操作符就理解主體結構了。
  • 3.RxJava中的操作符分類

像Java中有算數(shù)操作符加減乘除、邏輯操作符與或非、條件操作符大于小于等于這些。數(shù)據(jù)庫sql中過濾、篩選、count這樣的操作符。

RxJava也有它自己需要的操作符,分為了下面幾類:
  • 1.直接創(chuàng)建一個Observable(創(chuàng)建操作)
  • 2.組合多個Observable(組合操作)
  • 3.對Observable發(fā)射的數(shù)據(jù)執(zhí)行變換操作(變換操作)
  • 4.從Observable發(fā)射的數(shù)據(jù)中取特定的值(過濾操作)
  • 5.這些操作符用于從錯誤通知中恢復(錯誤處理)
  • 6.用于處理Observable的操作符(輔助操作)
  • 7.轉發(fā)Observable的部分值(條件/布爾/過濾操作)
  • 8.對Observable發(fā)射的數(shù)據(jù)序列求值(算術/聚合操作)

學習這些操作符按照編程語言的思想去學習,這些操作符就比較好理解了。再看上幾個實例,這些操作符基本使用就會了,其他的知道有這些操作符,具體使用的時候再研究就可以搞定了。學習幾個實例就會明白這些操作符都是干什么的了。下面會主要以操作符的實現(xiàn)思想和實例的形式來介紹RxJava中的操作符。

(1)RxJava入門用法:

基礎知識: RxJava最核心的兩個東西是Observables(被觀察者,事件源)和 Subscribers(觀察者)。 Observables發(fā)出一系列事件,Subscribers處理這些事件。 這里的事件可以是任何你感興趣的東西(觸摸事件,web接口調用返回的數(shù)據(jù)。。。)

// 觀察者, 創(chuàng)建事件
        Observable<String> myObservable = Observable.create(new     Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> sub) {
                sub.onNext("Hello, world!");
                sub.onNext("Hello, world!");
                sub.onNext("Hello, world!");
                sub.onCompleted();
                 // sub.onError(new Exception("Observable throw onError exception!")); //onCompleted()和onError()不能同時使用
            }
        });
        // 訂閱者 接收處理事件
        Subscriber<String> mySubscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                System.out.println("onNext:" + s);
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted!");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError:" + e.getMessage());
            }
        };
        myObservable.subscribe(mySubscriber);
(2)RxJava入門用法的簡化用法:
        // 首先來看看如何簡化Observable對象的創(chuàng)建過程。
        // RxJava內(nèi)置了很多簡化創(chuàng)建Observable對象的函數(shù),比如Observable.just就是用來創(chuàng)建只發(fā)出一個事件就結束的Observable對象,
        // 上面創(chuàng)建Observable對象的代碼可以簡化為一行
        Observable<String> myObservable = Observable.just("Hello, world!");

        // 接下來看看如何簡化Subscriber,
        // 上面的例子中,我們其實并不關心OnComplete和OnError,
        // 我們只需要在onNext的時候做一些處理,這時候就可以使用Action1類。
        Action1<String> onNextAction = new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("call:" + s);
            }
        };
        myObservable.subscribe(onNextAction);
        // subscribe方法有一個重載版本,接受三個Action1類型的參數(shù),分別對應OnNext,OnComplete, OnError函數(shù)。
        // myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction);
(3)RxJava入門用法使用java8的lambda表達式
 Observable.just("Hello, world!").subscribe(s -> System.out.println(s));//一行就搞定了
創(chuàng)建操作符 — 用于創(chuàng)建Observable的操作符,共有下面幾種,其中create、from、just是比較常用的操作符,在這里介紹下這三個操作符。

Create — 通過調用觀察者的方法從頭創(chuàng)建一個Observable
From — 將其它的對象或數(shù)據(jù)結構轉換為Observable
Just — 將對象或者對象集合轉換為一個會發(fā)射這些對象的Observable
Defer — 在觀察者訂閱之前不創(chuàng)建這個Observable,為每一個觀察者創(chuàng)建一個新的Observable
Interval — 創(chuàng)建一個定時發(fā)射整數(shù)序列的Observable
Range — 創(chuàng)建發(fā)射指定范圍的整數(shù)序列的Observable
Repeat — 創(chuàng)建重復發(fā)射特定的數(shù)據(jù)或數(shù)據(jù)序列的Observable
Empty/Never/Throw — 創(chuàng)建行為受限的特殊Observable
Start — 創(chuàng)建發(fā)射一個函數(shù)的返回值的Observable
Timer — 創(chuàng)建在一個指定的延遲之后發(fā)射單個數(shù)據(jù)的Observable

(4)Create 操作符

通過調用觀察者的方法從頭創(chuàng)建一個Observable,是最基礎的創(chuàng)建操作符.

        // 觀察者, 創(chuàng)建事件
        Observable<String> myObservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> sub) {
                //Edit data
                sub.onNext("Hello, world!");
                sub.onCompleted();
            }
        });
        // 訂閱者 接收處理事件
        Subscriber<String> mySubscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                System.out.println("onNext:" + s);
                onError(new Throwable("1"));
                onError(new Throwable("2"));
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted!");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError:" + e.getMessage());
            }
        };
        myObservable.subscribe(mySubscriber);
(5)from操作符

它接收一個集合作為輸入,然后每次輸出一個元素給subscriber。

String[] urls = { "url1", "url2", "url3" };
Observable.from(urls).subscribe(url -> System.out.println(url));
(6)Just 操作符

將對象或者對象集合轉換為一個會發(fā)射這些對象的Observable,與from的區(qū)別:支持多個類型作為入?yún)?,集合類型不是每次輸出一個元素,而是整個集合輸出。

String[] urls = { "url1", "url2", "url3" };
Observable.just("字符串", 123, true, urls).subscribe(item -> System.out.println(item));//這里的urls是一次發(fā)射出去的,不是拆分成數(shù)組后發(fā)射出去。
Observable.just("url1", "url2", "url3").subscribe(item -> System.out.println(item));
變換操作符 - 這些操作符可用于對Observable發(fā)射的數(shù)據(jù)進行變換。map、Buffer、FlatMap是比較典型的變換操作符,在這里介紹下這三個操作符。

Map — 映射,通過對序列的每一項都應用一個函數(shù)變換Observable發(fā)射的數(shù)據(jù),實質是對序列中的每一項執(zhí)行一個函數(shù),函數(shù)的參數(shù)就是這個數(shù)據(jù)項
Buffer — 緩存,可以簡單的理解為緩存,它定期從Observable收集數(shù)據(jù)到一個集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個。
FlatMap — 扁平映射,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進一個單獨的Observable,可以認為是一個將嵌套的數(shù)據(jù)結構展開的過程。
ConcatMap — cancatMap操作符與flatMap操作符類似,都是把Observable產(chǎn)生的結果轉換成多個Observable,然后把這多個。Observable“扁平化”成一個Observable,并依次提交產(chǎn)生的結果給訂閱者。與flatMap操作符不同的是,concatMap操作符在處理產(chǎn)生的。Observable時,采用的是“連接(concat)”的方式,而不是“合并(merge)”的方式,這就能保證產(chǎn)生結果的順序性,也就是說提交給訂閱者的結果是按照順序提交的,不會存在交叉的情況。
GroupBy — 分組,將原來的Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個Observable發(fā)射一組不同的數(shù)據(jù)。
Scan — 掃描,對Observable發(fā)射的每一項數(shù)據(jù)應用一個函數(shù),然后按順序依次發(fā)射這些值。
Window — 窗口,定期將來自Observable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口,而不是每次發(fā)射一項。類似于Buffer,但Buffer發(fā)射的是數(shù)據(jù),Window發(fā)射的是Observable,每一個Observable發(fā)射原始Observable的數(shù)據(jù)的一個子集。

(7)Map — 映射

通過對序列的每一項都應用一個函數(shù)變換Observable發(fā)射的數(shù)據(jù),實質是對序列中的每一項執(zhí)行一個函數(shù),函數(shù)的參數(shù)就是這個數(shù)據(jù)項。

關鍵詞:將Observable發(fā)射出的數(shù)據(jù)轉換成另外一個Observable。

Observable.just("http://www.baidu.com/", "http://www.google.com/", "https://www.bing.com/")
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        try {
                            URL url = new URL(s);
                            String host = url.getHost();
                            return InetAddress.getByName(host).toString();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        return null;
                    }
                }).subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println("subscribe : " + s);
                    }
                });
(8)Buffer — 緩存

可以簡單的理解為緩存,它定期從Observable收集數(shù)據(jù)到一個集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個。

關鍵詞:將Observable攢著一塊發(fā)出去。

final String[] mails = new String[]{"Here is an email!", "Another email!", "Yet another email!"}; //定義郵件內(nèi)容
        //每隔0.8秒就隨機發(fā)布一封郵件
        Observable<String> endlessMail = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
            try {
                if (subscriber.isUnsubscribed()) return;
                Random random = new Random();
                while (true) {
                String mail = mails[random.nextInt(mails.length)];
                subscriber.onNext(mail);
                Thread.sleep(800);
                }
            } catch (Exception ex) {
                subscriber.onError(ex);
            }
            }
        });
        //把上面產(chǎn)生的郵件內(nèi)容緩存到列表中,并每隔2秒通知訂閱者
        endlessMail.buffer(2, TimeUnit.SECONDS)
        .subscribe(new Action1<List<String>>() {
            @Override
            public void call(List<String> list) {
            System.out.println(String.format("You've got %d new messages!  Here they are!", list.size()));
            for (int i = 0; i < list.size(); i++)
                System.out.println("**" + list.get(i).toString());
            }
        }); 
(9)FlatMap — 扁平映射

將Observable發(fā)射的數(shù)據(jù)變換為Observables集合,然后將這些Observable發(fā)射的數(shù)據(jù)的放進一個單獨的Observable,可以認為是一個將嵌套的數(shù)據(jù)結構展開的過程。

關鍵詞:將Observable發(fā)射出的數(shù)據(jù)轉換成多個Observable。提供了一個Observable轉換成多個Observable的能力。

  • 在學習RxJava操作符的時候,Map和FlatMap比較難以理解。這里對FlatMap和Map的比較并做了針對性的介紹。
  • 這里有兩個基礎點可以仔細琢磨一下:1.Observable對象既可以是一個元素也可以是一個集合。2.Map操作符是提供一對一的轉換、FlatMap是提供一對多的轉換。
  • 下面是總結了Map和FlatMap的比較,可以參照分析出Map和FlatMap的操作符區(qū)別和作用。攻克了這兩個操作符也就可以說基本明白RxJava操作符是怎么回事了。
  • 【基礎知識】:Observable 對象 本身可以是一個對象也可以是一組對象
    一個對象如:Observable.just(1) 結果是經(jīng)過發(fā)射器之后只發(fā)射一條數(shù)據(jù)
    一組對象如:Observable.just(1,2,3) 或者 Observable.from({"222", "sss", "eee"})。結果是經(jīng)過發(fā)射器之后可以發(fā)射多條數(shù)據(jù)。
  • 【map和FlatMap的區(qū)別】:
  • map 和FlatMap都能把參數(shù)轉化為另一個對象
    map 是一對一的轉化 返回值是普通類型 , 如String、File 返回值是單個轉換后的事件
  • FlatMap 一對多的轉化 返回值是Observable,如Observable<File>、Observable<String> 返回值是Observable(轉換后的Observable集合)
    flatMap的返回類型是Observable,說明是可以返回多個內(nèi)容,而不是事件類型String、List、或者數(shù)組(這些最終都是一個事件)。

FlatMap() 的原理是這樣的

  1. 使用傳入的事件對象創(chuàng)建一個 Observable 對象;
  2. 并不發(fā)送這個 Observable, 而是將它激活,于是它開始發(fā)送事件; 【備注:這里的激活概念可以理解為這個操作Observable obs = Observable.from(file.listFiles());】
  3. 每一個創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統(tǒng)一交給 Subscriber 的回調方法。
    這三個步驟,把事件拆成了兩級,通過一組新創(chuàng)建的 Observable 將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。而這個『鋪平』就是 flatMap() 所謂的 flat。
  • 使用場景: FlatMap() 解決嵌套的問題 一個入?yún)?,返回多個事件,再根據(jù)多個事件,返回多個事件,這種嵌套問題。 (一個返回多個的使用from就可以搞定)
    Observable.just(new File("E:\\aa\\"))
        .flatMap(new Func1<File, Observable<File>>() { 
            @Override
            public Observable<File> call(File file) {//1. 使用傳入的事件對象創(chuàng)建一個 Observable 對象; --------> file就是傳入的事件對象
                return Observable.from(file.listFiles()); //2.中的并不發(fā)送這個 Observable是個集合 ---> 就是這個  Observable.from(file.listFiles())
            }//3.每一個創(chuàng)建出來的 Observable --->這個Observable就是  就是(2)中激活出來的file.listFiles()中的每一個File的Observable對象
             //3.都被匯入同一Observable  這個Observable 是 這個call方法的返回值 Observable<File>,這個Observable是個集合,也是返回的集合(也就是扁平化的為一個集合)
        })
        .flatMap(new Func1<File, Observable<File>>() {
            @Override
            public Observable<File> call(File file) {
                return Observable.from(file.listFiles());
            }
        })
        .subscribe(new Action1<File>() {
            @Override
            public void call(File file) {
                System.out.println(file.getName());
            }
        });

FlatMap再來一個例子有助于理解

    /**
     * 【flatMap案例】
     * 首先假設這么一種需求:假設有一個數(shù)據(jù)結構『學生』,現(xiàn)在需要打印出一組學生的名字。實現(xiàn)方式很簡單:用map轉換一下就可以了,此實現(xiàn)忽略
     * 那么再假設:如果要打印出每個學生所需要修的所有課程的名稱呢?(需求的區(qū)別在于,每個學生只有一個名字,但卻有多個課程。)首先可以這樣實現(xiàn):將學生作為事件傳入for循環(huán)打印課程名稱
     * 如果不想使用for循環(huán)該怎么辦?   -------------------flatMap 代碼見下方
     * 用rx的方式再使用for循環(huán),就會覺得這個flatMap這個沒有什么意義了。就變的難理解,用以前能實現(xiàn)的東西,再學這個flatMap的抵觸。變成思維的轉變。。。
     */
    public static void flatMap2(){
        String[] courses1 = {"語文", "數(shù)學" , "英語", "數(shù)學1" , "英語1", "數(shù)學2" , "英語3", "數(shù)學5" , "英語6"};
        String[] courses2 = {"語文111", "數(shù)學111" , "英語111", "數(shù)學1131" , "英語1141", "數(shù)學1511" , "英語1161"};
        Student[] students = {new Student("xiaoli", courses1), new Student("zhangsan", courses2)};

        Observable.from(students)
        .flatMap(new Func1<Student, Observable<String>>() { 
            @Override
            public Observable<String> call(Student student) {
                return Observable.from(student.courses);
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String str) {
                System.out.println(str);
            }
        });
    }
    
    static class Student{
        public Student(String name, String[] courses){
            this.name = name;
            this.courses = courses;
        }
        public String name;
        public String[] courses;
    }

過濾操作符 - 這些操作符用于從Observable發(fā)射的數(shù)據(jù)中進行選擇。這里介紹比較典型的filter、distinct、elementAt三個操作符。

  • Debounce — 只有在空閑了一段時間后才發(fā)射數(shù)據(jù),通俗的說,就是如果一段時間沒有操作,就執(zhí)行一次操作
  • Distinct — 去重,過濾掉重復數(shù)據(jù)項
  • ElementAt — 取值,取特定位置的數(shù)據(jù)項
  • Filter — 過濾,過濾掉沒有通過謂詞測試的數(shù)據(jù)項,只發(fā)射通過測試的
  • First — 首項,只發(fā)射滿足條件的第一條數(shù)據(jù)
  • IgnoreElements — 忽略所有的數(shù)據(jù),只保留終止通知(onError或onCompleted)
  • Last — 末項,只發(fā)射最后一條數(shù)據(jù)
  • Sample — 取樣,定期發(fā)射最新的數(shù)據(jù),等于是數(shù)據(jù)抽樣,有的實現(xiàn)里叫ThrottleFirst
  • Skip — 跳過前面的若干項數(shù)據(jù)
  • SkipLast — 跳過后面的若干項數(shù)據(jù)
  • Take — 只保留前面的若干項數(shù)據(jù)
  • TakeLast — 只保留后面的若干項數(shù)據(jù)
(10)filter操作符

是對源Observable產(chǎn)生的結果按照指定條件進行過濾,只有滿足條件的結果才會提交給訂閱者。

Observable.just(1, 2, 3, 4, 5)
        .filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer item) {
                return (item < 4);
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }

            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });
(11)distinct操作符

對源Observable產(chǎn)生的結果進行過濾,把重復的結果過濾掉,只輸出不重復的結果給訂閱者,非常類似于SQL里的distinct關鍵字。

        Observable.just(1, 2, 1, 1, 2, 3)
        .distinct().subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }

            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });
(12)elementAt操作符

在源Observable產(chǎn)生的結果中,僅僅把指定索引的結果提交給訂閱者,索引是從0開始的。

        Observable.just(1, 2, 3, 4, 5, 6)
        .elementAt(5).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println("Next:" + integer);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("Error:" + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("completed!");
            }
        });

組合操作符 -- 用于將多個Observable組合成一個單一的Observable,這里只是介紹個CombineLatest操作符,以對組合操作符了解即可。

  • And/Then/When — 通過模式(And條件)和計劃(Then次序)組合兩個或多個Observable發(fā)射的數(shù)據(jù)集
  • CombineLatest — 當兩個Observables中的任何一個發(fā)射了一個數(shù)據(jù)時,通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)(一共兩個數(shù)據(jù)),然后發(fā)射這個函數(shù)的結果
  • Join — 無論何時,如果一個Observable發(fā)射了一個數(shù)據(jù)項,只要在另一個Observable發(fā)射的數(shù)據(jù)項定義的時間窗口內(nèi),就將兩個Observable發(fā)射的數(shù)據(jù)合并發(fā)射
  • Merge — 將兩個Observable發(fā)射的數(shù)據(jù)組合并成一個
  • StartWith — 在發(fā)射原來的Observable的數(shù)據(jù)序列之前,先發(fā)射一個指定的數(shù)據(jù)序列或數(shù)據(jù)項
  • Switch — 將一個發(fā)射Observable序列的Observable轉換為這樣一個Observable:它逐個發(fā)射那些Observable最近發(fā)射的數(shù)據(jù)
  • Zip — 打包,使用一個指定的函數(shù)將多個Observable發(fā)射的數(shù)據(jù)組合在一起,然后將這個函數(shù)的結果作為單項數(shù)據(jù)發(fā)射
(13)combineLatest操作符

把兩個Observable產(chǎn)生的結果進行合并,合并的結果組成一個新的Observable。這兩個Observable中任意一個Observable產(chǎn)生的結果,都和另一個Observable最后產(chǎn)生的結果,按照一定的規(guī)則進行合并。

關鍵詞:發(fā)射兩個Observable的合并結果

        Integer[] array2 = { 10, 20, 30, 40, 50 };
        Observable.combineLatest(Observable.just(4, 2,5), Observable.from(array2),
                new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer a, Integer b) {
                        //System.out.println("a:" + a + ",b:");
                        return a + b;
                    }
                }).subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("Sequence complete.");
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.err.println("Error: " + e.getMessage());
                    }

                    @Override
                    public void onNext(Integer a) {
                        System.out.println("Next: " + a);
                    }
                });
異常處理操作符 -- 這些操作符用于從錯誤通知中恢復。這里主要介紹下retry、onErrorReturn、onErrorResumeNext三個操作符。
  • Catch — 捕獲,繼續(xù)序列操作,將錯誤替換為正常的數(shù)據(jù),從onError通知中恢復
  • Retry — 重試,如果Observable發(fā)射了一個錯誤通知,重新訂閱它,期待它正常終止
  • onErrorReturn - 方法 返回一個鏡像原有Observable行為的新Observable。會忽略前者的onError調用,不會將錯誤傳遞給觀察者,而是發(fā)射一個特殊的項并調用觀察者的onCompleted方法。
  • onErrorResumeNext - onErrorResumeNext方法與onErrorReturn()方法類似,都是攔截原Observable的onError通知,不同的是攔截后的處理方式,onErrorReturn創(chuàng)建并返回一個特殊項,而onErrorResumeNext創(chuàng)建并返回一個新的Observabl,觀察者會訂閱它,并接收其發(fā)射的數(shù)據(jù)。
  • onExceptionResumeNext - onExceptionResumeNext方法與onErrorResumeNext方法類似創(chuàng)建并返回一個擁有類似原Observable的新Observable,,也使用這個備用的Observable。不同的是,如果onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable。
  • retryWhen - retryWhen和retry類似,區(qū)別是,retryWhen將onError中的Throwable傳遞給一個函數(shù),這個函數(shù)產(chǎn)生另一個Observable,retryWhen觀察它的結果再決定是不是要重新訂閱原始的Observable。如果這個Observable發(fā)射了一項數(shù)據(jù),它就重新訂閱,如果這個Observable發(fā)射的是onError通知,它就將這個通知傳遞給觀察者然后終止。
(14)retry操作符

是當Observable發(fā)生錯誤或者異常時,重新嘗試執(zhí)行Observable的邏輯,如果經(jīng)過n次重新嘗試執(zhí)行后仍然出現(xiàn)錯誤或者異常,則最后回調執(zhí)行onError方法;當然如果源Observable沒有錯誤或者異常出現(xiàn),則按照正常流程執(zhí)行。

關鍵詞:observable.retry(1)

        Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                if (subscriber.isUnsubscribed()) return;
                //循環(huán)輸出數(shù)字
                try {
                    for (int i = 0; i < 10; i++) {
                    if (i == 4) {
                        throw new Exception("this is number 4 error!");
                    }
                    subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                } catch (Throwable e) {
                    subscriber.onError(e);
                }
                }
            });
            observable.retry(1).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                System.out.println("Sequence complete.");
                }

                @Override
                public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
                }

                @Override
                public void onNext(Integer value) {
                System.out.println("Next:" + value);
                }
            });
(15)onErrorReturn操作符

是在Observable發(fā)生錯誤或異常的時候(即將回調oError方法時),攔截錯誤并執(zhí)行指定的邏輯,返回一個跟源Observable相同類型的結果,最后回調訂閱者的onComplete方法

關鍵詞:攔截error,并轉換處理

            Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
                        @Override
                        public void call(Subscriber<? super Integer> subscriber) {
                            if (subscriber.isUnsubscribed())
                                return;
                            // 循環(huán)輸出數(shù)字
                            try {
                                for (int i = 0; i < 10; i++) {
                                    if (i == 4) {
                                        throw new Exception("this is number 4 error!");
                                    }
                                    subscriber.onNext(i);
                                }
                                subscriber.onCompleted();
                            } catch (Exception e) {
                                subscriber.onError(e);
                            }
                        }
                    });

                    observable.onErrorReturn(new Func1<Throwable, Integer>() {
                        @Override
                        public Integer call(Throwable throwable) {
                            return 1004;
                        }
                    }).subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onCompleted() {
                            System.out.println("Sequence complete.");
                        }

                        @Override
                        public void onError(Throwable e) {
                            System.err.println("Error: " + e.getMessage());
                        }

                        @Override
                        public void onNext(Integer value) {
                            System.out.println("Next:" + value);
                        }
                    });     
(16)onErrorResumeNext操作符

跟onErrorReturn類似,只不過onErrorReturn只能在錯誤或異常發(fā)生時只返回一個和源Observable相同類型的結果,而onErrorResumeNext操作符是在錯誤或異常發(fā)生時返回一個Observable,也就是說可以返回多個和源Observable相同類型的結果。

關鍵詞:與onErrorReturn相比可以返回多個結果。

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if (subscriber.isUnsubscribed())
                    return;
                // 循環(huán)輸出數(shù)字
                try {
                    for (int i = 0; i < 10; i++) {
                        if (i == 4) {
                            throw new Exception("this is number 4 error!");
                        }
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        });

        observable.onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
            @Override
            public Observable<? extends Integer> call(Throwable throwable) {
                return Observable.just(100, 101, 102, 103);
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next:" + value);
            }
        });
算術、聚合、連接操作符 -- 這里主要介紹count、reduce、public三個操作符。

算術聚合操作符

  • Average — 計算Observable發(fā)射的數(shù)據(jù)序列的平均值,然后發(fā)射這個結果
  • Concat — 不交錯的連接多個Observable的數(shù)據(jù)
  • Count — 計算Observable發(fā)射的數(shù)據(jù)個數(shù),然后發(fā)射這個結果
  • Max — 計算并發(fā)射數(shù)據(jù)序列的最大值
  • Min — 計算并發(fā)射數(shù)據(jù)序列的最小值
  • Reduce — 按順序對數(shù)據(jù)序列的每一個應用某個函數(shù),然后返回這個值
  • Sum — 計算并發(fā)射數(shù)據(jù)序列的和
  • 連接操作符
  • Connect — 指示一個可連接的Observable開始發(fā)射數(shù)據(jù)給訂閱者
  • Publish — 將一個普通的Observable轉換為可連接的
  • RefCount — 使一個可連接的Observable表現(xiàn)得像一個普通的Observable
  • Replay — 確保所有的觀察者收到同樣的數(shù)據(jù)序列,即使他們在Observable開始發(fā)射數(shù)據(jù)之后才訂閱
(17)Count操作符

將一個Observable轉換成一個發(fā)射單個值的Observable,這個值表示原始Observable發(fā)射的數(shù)據(jù)的數(shù)量。如果原始Observable發(fā)生錯誤終止,Count不發(fā)射數(shù)據(jù)而是直接傳遞錯誤通知。如果原始Observable永遠不終止,Count既不會發(fā)射數(shù)據(jù)也不會終止。

關鍵詞:只發(fā)射數(shù)量,不關心內(nèi)容。

            Subscriber<Integer> mySubscriber = new Subscriber<Integer>() {
                @Override
                public void onNext(Integer s) {
                    System.out.println("onNext:" + s);
                }

                @Override
                public void onCompleted() {
                    System.out.println("onCompleted!");
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("onError:" + e.getMessage());
                }
            };
            Observable.from(new String[] { "one", "two", "three" })
            .count()
            .subscribe(mySubscriber);
(18)Reduce操作符

對原始Observable發(fā)射數(shù)據(jù)的第一項應用一個函數(shù),然后再將這個函數(shù)的返回值與第二項數(shù)據(jù)一起傳遞給函數(shù),以此類推,持續(xù)這個過程直到原始Observable發(fā)射它的最后一項數(shù)據(jù)并終止,此時Reduce返回的Observable發(fā)射這個函數(shù)返回的最終值。 注意如果原始Observable沒有發(fā)射任何數(shù)據(jù),reduce拋出異常IllegalArgumentException。 在其它場景中,這種操作有時被稱為累積,聚集,壓縮,折疊,注射等。

關鍵字:疊加,返回疊加結果

            Observable.just(1,2,3,4)
            .reduce(new Func2<Integer, Integer, Integer>() {
                //integer為前面幾項只和,integer2為當前發(fā)射的數(shù)據(jù)
                @Override
                public Integer call(Integer integer, Integer integer2) {
                //System.out.println("integer:"+integer+"  integer2:"+integer2);
                return integer+integer2;
                }
            }).subscribe(integer -> System.out.println("reduce:"+integer));
(19)Publish 操作符

將普通的Observable轉換為可連接的Observable(ConnectableObservable),ConnectableObservable是Observable的子類。 可連接的Observable (connectable Observable)與普通的Observable差不多,不過它并不會在被訂閱時開始發(fā)射數(shù)據(jù),而是直到使用了Connect操作符時才會開始,這樣可以更靈活的控制發(fā)射數(shù)據(jù)的時機。 注意:如果一個ConnectableObservable已經(jīng)開始發(fā)射數(shù)據(jù),再對其進行訂閱只能接受之后發(fā)射的數(shù)據(jù),訂閱之前已經(jīng)發(fā)射過的數(shù)據(jù)就丟失了。

關鍵詞:發(fā)射可以控制,訂閱完不會立即發(fā)射。

        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS).take(5);
        //使用publish操作符將普通Observable轉換為可連接的Observable
        ConnectableObservable<Long> connectableObservable = obs.publish();
        //第一個訂閱者訂閱,不會開始發(fā)射數(shù)據(jù)
        connectableObservable.subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("1.onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("1.onError");
            }
            @Override
            public void onNext(Long along) {
                System.out.println("1.onNext:"+along+"->time:"+ sdf.format(new Date()));
            }
        });
        //開始發(fā)射數(shù)據(jù)
        System.out.println("start time:" + sdf.format(new Date()));
        connectableObservable.connect();
        //第二個訂閱者延遲2s訂閱,這將導致丟失前面2s內(nèi)發(fā)射的數(shù)據(jù)
        connectableObservable.delaySubscription(2, TimeUnit.SECONDS)
                .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("2.onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("2.onError");
            }
            @Override
            public void onNext(Long along) {
                System.out.println("2.onNext:"+along+"->time:"+ sdf.format(new Date()));
            }
        });
條件和布爾操作 -- 這些操作符可用于單個或多個數(shù)據(jù)項,也可用于Observable。這里主要介紹all、amb、skipUtil這三個操作符。
  • All — 判斷Observable發(fā)射的所有的數(shù)據(jù)項是否都滿足某個條件
  • Amb — 給定多個Observable,只讓第一個發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)
  • Contains — 判斷Observable是否會發(fā)射一個指定的數(shù)據(jù)項
  • DefaultIfEmpty — 發(fā)射來自原始Observable的數(shù)據(jù),如果原始Observable沒有發(fā)射數(shù)據(jù),就發(fā)射一個默認數(shù)據(jù)
  • SequenceEqual — 判斷兩個Observable是否按相同的數(shù)據(jù)序列
  • SkipUntil — 丟棄原始Observable發(fā)射的數(shù)據(jù),直到第二個Observable發(fā)射了一個數(shù)據(jù),然后發(fā)射原始Observable的剩余數(shù)據(jù)
  • SkipWhile — 丟棄原始Observable發(fā)射的數(shù)據(jù),直到一個特定的條件為假,然后發(fā)射原始Observable剩余的數(shù)據(jù)
  • TakeUntil — 發(fā)射來自原始Observable的數(shù)據(jù),直到第二個Observable發(fā)射了一個數(shù)據(jù)或一個通知
  • TakeWhile — 發(fā)射原始Observable的數(shù)據(jù),直到一個特定的條件為真,然后跳過剩余的數(shù)據(jù)
(20)All操作符

對Observable發(fā)送的所有數(shù)據(jù)根據(jù)某個條件進行判斷,當其發(fā)射出去的數(shù)據(jù)都滿足該條件時,則返回true,否則返回false。

關鍵點:都滿足該條件,才發(fā)送一個true,否則返回false。

            Observable.just(2, 2, 1, 4).all(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    //System.out.println("integer:" + integer);
                    if (integer % 2 == 0) {
                        return true;
                    } else {
                        return false;
                    }
                }
            }).subscribe(flag -> System.out.println(flag));
(21)Amb操作符

是對2到9個Observable進行處理,這些Observable會形成一種競爭關系,當哪個Observable最先發(fā)射出數(shù)據(jù),則amb進行發(fā)射這個Observable里的數(shù)據(jù),而其它的Observable將被丟棄。
關鍵字:只發(fā)送其中一組,其他組被拋棄

            Subscriber<Integer> mySubscriber = new Subscriber<Integer>() {
                @Override
                public void onNext(Integer s) {
                    System.out.println("onNext:" + s);
                }

                @Override
                public void onCompleted() {
                    System.out.println("onCompleted!");
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("onError:" + e.getMessage());
                }
            };
            Observable.amb(
                    createDelayObservable(4), 
                    createDelayObservable(3),
                    createDelayObservable(2), 
                    createDelayObservable(1))//發(fā)射出去的是這個,內(nèi)容是1,2,3
            .subscribe(mySubscriber);
(22)SkipUtil操作符

是根據(jù)一個目標Observable為基準,當目標Observable沒發(fā)射出去數(shù)據(jù)的時,原Observable發(fā)射出去的數(shù)據(jù)將會被忽略,當目標Observable發(fā)射數(shù)據(jù)時,則原Observable才開始發(fā)射數(shù)據(jù)。

關鍵詞:skipUntil(Observable.just(100))這個是目標,他發(fā)射數(shù)據(jù)之前,原來的Observable發(fā)射出去的數(shù)據(jù)將會被忽略

                Subscriber<Long> mySubscriber = new Subscriber<Long>() {
                        @Override
                        public void onNext(Long s) {
                            System.out.println("onNext:" + s);
                        }

                        @Override
                        public void onCompleted() {
                            System.out.println("onCompleted!");
                        }

                        @Override
                        public void onError(Throwable e) {
                            System.out.println("onError:" + e.getMessage());
                        }
                    };
Observable.interval(1, TimeUnit.SECONDS)  .skipUntil(Observable.just(100).delay(5, TimeUnit.SECONDS))//延遲5s發(fā)送,目標Observable
                    //.skipUntil(Observable.timer(5, TimeUnit.SECONDS))
                    .subscribe(mySubscriber);
輔助操作符 -- 輔助操作一組用于處理Observable的操作符.這里介紹下delay、do、SubscribeOn三個操作符。
  • Delay — 延遲一段時間發(fā)射結果數(shù)據(jù)
  • Do — 注冊一個動作占用一些Observable的生命周期事件,相當于Mock某個操作
  • Materialize/Dematerialize — 將發(fā)射的數(shù)據(jù)和通知都當做數(shù)據(jù)發(fā)射,或者反過來
  • ObserveOn — 指定觀察者觀察Observable的調度程序(工作線程)
  • Serialize — 強制Observable按次序發(fā)射數(shù)據(jù)并且功能是有效的
  • Subscribe — 收到Observable發(fā)射的數(shù)據(jù)和通知后執(zhí)行的操作
  • SubscribeOn — 指定Observable應該在哪個調度程序上執(zhí)行
  • TimeInterval — 將一個Observable轉換為發(fā)射兩個數(shù)據(jù)之間所耗費時間的Observable
  • Timeout — 添加超時機制,如果過了指定的一段時間沒有發(fā)射數(shù)據(jù),就發(fā)射一個錯誤通知
  • Timestamp — 給Observable發(fā)射的每個數(shù)據(jù)項添加一個時間戳
  • Using — 創(chuàng)建一個只在Observable的生命周期內(nèi)存在的一次性資源
(23)delay

讓原始Observable在發(fā)射每項數(shù)據(jù)之前都暫停一段指定的時間段,結果是Observable發(fā)射的數(shù)據(jù)項在時間上整體延后一段時間.注意:delay不會平移onError通知,它會立即將這個通知傳遞給訂閱者,同時丟棄任何待發(fā)射的onNext通知。但是它會平移一個onCompleted通知。

Observable<Integer> obs = Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    for (int i = 0; i < 5; i++) {
                        if (i > 2) {
                            // subscriber.onError(new Throwable("VALUE TO MAX"));
                            // //delay不會平移onError通知
                        }
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                }
            }).subscribeOn(Schedulers.computation());

            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
            /*
             * Delay操作符讓原始Observable在發(fā)射每項數(shù)據(jù)之前都暫停一段指定的時間段。
             * 效果是Observable發(fā)射的數(shù)據(jù)項在時間上向前整體平移了一個增量
             *
             * 注意:delay不會平移onError通知,它會立即將這個通知傳遞給訂閱者,同時丟棄任何待發(fā)射的onNext通知。
             * 然而它會平移一個onCompleted通知。
             */
            System.out.println("delay start:" + sdf.format(new Date()));
            obs.delay(2, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    System.out.println("delay onCompleted" + sdf.format(new Date()));
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("delay onError" + e.getMessage());
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("delay onNext:" + sdf.format(new Date()) + "->" + integer);
                }
            });
(24)Do系列操作符就是為原始Observable的生命周期事件注冊一個回調,當Observable的某個事件發(fā)生時就會調用這些回調。
  • RxJava實現(xiàn)了很多doxxx操作符
  • doOnEach:為 Observable注冊這樣一個回調,當Observable沒發(fā)射一項數(shù)據(jù)就會調用它一次,包括onNext、onError和 onCompleted
  • doOnNext:只有執(zhí)行onNext的時候會被調用
  • doOnSubscribe: 當觀察者(Sunscriber)訂閱Observable時就會被調用
  • doOnUnsubscribe: 當觀察者取消訂閱Observable時就會被調用;Observable通過onError或者onCompleted結束時,會反訂閱所有的Subscriber
  • doOnCompleted:當Observable 正常終止調用onCompleted時會被調用。
  • doOnError: 當Observable 異常終止調用onError時會被調用。
  • doOnTerminate: 當Observable 終止之前會被調用,無論是正常還是異常終止
  • finallyDo: 當Observable 終止之后會被調用,無論是正常還是異常終止。
        Observable.just(1, 2, 3)
                // 只有onNext的時候才會被觸發(fā)
                .doOnNext(new Action1<Integer>() {
                    @Override
                    public void call(Integer item) {
                        System.out.println("-->doOneNext: " + item);
                    }
                }).subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onNext(Integer item) {
                        System.out.println("Next: " + item);
                    }

                    @Override
                    public void onError(Throwable error) {
                        System.out.println("Error: " + error.getMessage());
                    }

                    @Override
                    public void onCompleted() {
                        System.out.println("Sequence complete.");
                    }
                });

        System.out.println("doOnEach,doOnError------------------------");
        Observable.just(1, 2, 3)
                // Observable每發(fā)射一個數(shù)據(jù)的時候就會觸發(fā)這個回調,不僅包括onNext還包括onError和onCompleted
                .doOnEach(new Action1<Notification<? super Integer>>() {
                    @Override
                    public void call(Notification<? super Integer> notification) {
                        System.out.println("-->doOnEach: " + notification.getKind() + ":" + notification.getValue());
                        if ((int) notification.getValue() > 1) {
                            throw new RuntimeException("Item exceeds maximum value");
                        }
                    }
                })
                // Observable異常終止調用onError時會被調用
                .doOnError(new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        System.out.println("-->doOnError: " + throwable.getMessage());
                    }
                }).subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onNext(Integer item) {
                        System.out.println("Next: " + item);
                    }

                    @Override
                    public void onError(Throwable error) {
                        System.out.println("Error: " + error.getMessage());
                    }

                    @Override
                    public void onCompleted() {
                        System.out.println("Sequence complete.");
                    }
                });

        System.out.println("doxxx------------------------");
        Observable.just(1, 2, 3).doOnCompleted(new Action0() {
            @Override
            public void call() {
                System.out.println("-->doOnCompleted:正常完成onCompleted"); // 數(shù)據(jù)序列發(fā)送完畢回調
            }
        }).doOnSubscribe(() -> System.out.println("-->doOnSubscribe:被訂閱")) // 被訂閱時回調
                // 反訂閱(取消訂閱)時回調。當一個Observable通過OnError或者OnCompleted結束的時候,會反訂閱所有的Subscriber
                .doOnUnsubscribe(() -> System.out.println("-->doOnUnsubscribe:反訂閱"))
                // Observable終止之前會被調用,無論是正常還是異常終止
                .doOnTerminate(() -> System.out.println("-->doOnTerminate:終止之前"))
                // Observable終止之后會被調用,無論是正常還是異常終止
                .finallyDo(() -> System.out.println("-->finallyDo:終止之后")).subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onNext(Integer item) {
                        System.out.println("Next: " + item);
                    }

                    @Override
                    public void onError(Throwable error) {
                        System.out.println("Error: " + error.getMessage());
                    }

                    @Override
                    public void onCompleted() {
                        System.out.println("Sequence complete.");
                    }
                });
(25)線程切換SubscribeOn、observeOn:
  • 這兩個操作符對于Android開發(fā)來說非常適用,因為Android中只能在主線程中修改UI,耗時操作不能在主線程中執(zhí)行,
  • 所以我們經(jīng)常會創(chuàng)建新的Thread去執(zhí)行耗時操作,然后配合Handler修改UI,或者使用AsyncTask。
  • RxJava中使用這兩個操作符能夠讓我們非常方便的處理各種線程問題。
  • SubscribeOn:指定Observable自身在哪個調度器上執(zhí)行(即在那個線程上運行),如果Observable需要執(zhí)行耗時操作,
  • 一般我們可以讓其在新開的一個子線程上運行,好比AsyncTask的doInBackground方法。
  • observeOn:可以使用observeOn操作符指定Observable在哪個調度器上發(fā)送通知給觀察者(調用觀察者的onNext,onCompleted,onError方法)。
  • 一般我們可以指定在主線程中觀察,這樣就可以修改UI,相當于AsyncTask的onPreExecute 、onPrograssUpdate和onPostExecute 方法中執(zhí)行
  • Schedulers.computation(?) 用于計算任務,如事件循環(huán)或和回調處理,不要用于IO操作(IO操作請使用Schedulers.io());默認線程數(shù)等于處理器的數(shù)量
  • Schedulers.from(executor) 使用指定的Executor作為調度器
  • Schedulers.immediate(?) 在當前線程立即開始執(zhí)行任務
  • Schedulers.io(?) 用于IO密集型任務,如異步阻塞IO操作,這個調度器的線程池會根據(jù)需要增長;對于普通的計算任務,請使用Schedulers.computation();Schedulers.io(?)默認是一個CachedThreadScheduler,很像一個有線程緩存的新線程調度器
  • Schedulers.newThread(?) 為每個任務創(chuàng)建一個新線程
  • Schedulers.trampoline(?) 當其它排隊的任務完成后,在當前線程排隊開始執(zhí)行
            System.out.println("currentThread:" + Thread.currentThread().getName());
            Observable<Integer> obs = Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    System.out.println("on subscrib:" + Thread.currentThread().getName());
                    subscriber.onNext(1);
                    subscriber.onCompleted();
                }
            });

            // 在新建子線程中執(zhí)行,在主線程中觀察    AndroidSchedulers.mainThread()
            obs.subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
                    .subscribe(i -> System.out.println("mainThread-onNext:" + Thread.currentThread().getName()));
            System.out.println("-------------------");
            obs.delaySubscription(2, TimeUnit.SECONDS).subscribeOn(Schedulers.trampoline()) // 用于計算任務,如事件循環(huán)或和回調處理
                    .observeOn(Schedulers.immediate()) // 在當前線程立即開始執(zhí)行任務
                    .subscribe(i -> System.out.println("immediate-onNext:" + Thread.currentThread().getName()));
            try {
                Thread.sleep(15000);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }

三、操作符在Android中的應用實踐

1.線程切換注意事項

subscribeOn()和observeOn()這兩個操作符在使用的時候可能弄不清楚,下面操作符使用分類圖可以參考一下。
create() , just() , from() 等 -- 事件產(chǎn)生
map() , flapMap() , scan() , filter() 等 -- 事件加工
subscribe() -- 事件消費
事件產(chǎn)生:默認運行在當前線程,可以由 subscribeOn() 自定義線程
事件加工:默認跟事件產(chǎn)生的線程保持一致, 可以由 observeOn() 自定義線程
事件消費:默認運行在當前線程,可以有observeOn() 自定義

2.線程的生命周期

在Android開發(fā)中生命周期可以使用rxlifecycle作為自動生命周期管理。
在app開發(fā)中異步線程會產(chǎn)生內(nèi)存泄漏的問題,是所有的異步線程都會產(chǎn)生,不只是RxJava才有。并且RxJava中
RxJava除了這種自動回收以外。還可以使用手動取消線程。RxJava有取消訂閱的方法可以使用。
注意一點在已經(jīng)進入非主線程的代碼執(zhí)行的時候,是取消不了發(fā)射的,但是可以取消訂閱就可以解決內(nèi)存泄漏的問題了。

3.線程池的使用

RxJava的線程處理是可以配置線程池的
ThreadPoolExecutor executor;
.subscribeOn(Schedulers.from(executor)) //發(fā)射端使用線程池
.observeOn(AndroidSchedulers.mainThread()) //接收端使用主線程
這組線程池的搭配可以替代AsyncTask使用了,也可以與AsyncTask共用一個線程池來使用。

全文完

參考資料

https://zhuanlan.zhihu.com/p/21714695
http://blog.jobbole.com/110593/
https://www.zhihu.com/question/28292740
http://blog.csdn.net/womendeaiwoming/article/details/46506017
http://blog.csdn.net/caiwanxia1/article/details/52980999
http://www.jdon.com/reactive.html
http://www.infoq.com/cn/news/2016/01/reactive-basics
http://blog.csdn.net/fly1183989782/article/details/62053973
http://ios.jobbole.com/86815/
http://blog.csdn.net/hjjdehao/article/details/53063879
http://blog.csdn.net/Job_Hesc/article/details/46495281
http://blog.csdn.net/dylanzhuang/article/details/52211313
http://blog.csdn.net/xmxkf/article/details/51658445

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容