RxJava——操作符篇

一、RxJava操作符概述

RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易懂。

操作符實(shí)質(zhì)上就是RxJava函數(shù)式編程模式的體現(xiàn),在上篇文章中,我詳細(xì)闡述了RxJava中觀察者模式、Iterator模式、函數(shù)式編程模式概念的解讀,詳情請戳→文章傳送門。

Rxjava操作符的類型眾多,在本文中,我詳細(xì)解釋如圖1.1所示的9種操作符。

圖1.1 RxJava操作符的分類

本項(xiàng)目案例代碼已上傳Github,詳情戳→GitHub案例代碼

圖1.2 RxJava操作符案例圖

二、RxJava操作符詳解

1、創(chuàng)建操作符

創(chuàng)建操作符的分類如下圖所示,關(guān)于create操作符的詳細(xì)操作可在我的上篇文章中查看,在本文中不加以贅述,文章鏈接戳→文章傳送門。在本文中從from操作符開始介紹。

圖2.1? 創(chuàng)建操作符的分類

:在這里,我將請求的不完整回調(diào)在父類中進(jìn)行了封裝,具體代碼可查看GitHub的代碼鏈接。

①from操作符

在這里以發(fā)送數(shù)組為例,from操作符的使用代碼如下所示:

//from操作符,創(chuàng)建以數(shù)組內(nèi)容發(fā)送事件的ObservableString[] observableArr =newString[]{"Alex","Payne"};//onNextAction、onErrorAction提取到父類中,具體代碼可查看GitHub的代碼鏈接Observable.from(observableArr).subscribe(onNextAction, onErrorAction);

首先,我們查看如下所示from操作符的結(jié)構(gòu)圖,可以看到它有多種實(shí)現(xiàn)方式,但是有一個(gè)共同點(diǎn),都會(huì)返回一個(gè)Observable對象。

圖2.1.1 from操作符操作結(jié)構(gòu)圖

實(shí)質(zhì)上,Observable將數(shù)組中的元素逐個(gè)進(jìn)行發(fā)送,在發(fā)送過程中轉(zhuǎn)換為Observable對象。

進(jìn)一步查看源碼,可得知from操作符的作用:將一個(gè)Iterable、一個(gè)Future、 或者一個(gè)數(shù)組,內(nèi)部通過代理的方式轉(zhuǎn)換成一個(gè)Observable。

Future轉(zhuǎn)換為OnSubscribe是通過OnSubscribeToObservableFuture進(jìn)行的,Iterable轉(zhuǎn)換通過OnSubscribeFromIterable進(jìn)行。數(shù)組通過OnSubscribeFromArray轉(zhuǎn)換。

②just操作符

使用代碼如下所示:

//just操作符,創(chuàng)建將逐個(gè)內(nèi)容進(jìn)行發(fā)送的Observable,其內(nèi)部發(fā)送內(nèi)容在內(nèi)部以from的操作符的方式進(jìn)行轉(zhuǎn)換Observable.just("Alex","Payne").subscribe(onNextAction);

圖2.1.2 just操作符結(jié)構(gòu)圖

查看just操作符的結(jié)構(gòu)圖,結(jié)合源碼得知,just操作符將單個(gè)參數(shù)發(fā)送的內(nèi)容通過ScalarSynchronousObservable轉(zhuǎn)換為一個(gè)新的Observable對象,而將多個(gè)參數(shù)發(fā)送的內(nèi)容轉(zhuǎn)換為一個(gè)數(shù)組,然后將數(shù)組通過from操作符進(jìn)行發(fā)送。

③interval操作符

interval操作符使用代碼如下所示:

//interval操作符,創(chuàng)建以1秒為事件間隔發(fā)送整數(shù)序列的ObservableObservable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe(onNextAction);

圖2.1.3 interval操作符結(jié)構(gòu)圖

查看interval的結(jié)構(gòu)圖,其只能發(fā)送Long類型的數(shù),實(shí)質(zhì)上其作用為:創(chuàng)建一個(gè)按固定時(shí)間間隔發(fā)射整數(shù)序列的Observable,這個(gè)序列為一個(gè)無限遞增的整數(shù)序列。

需要注意的是:interval默認(rèn)在computation調(diào)度器上執(zhí)行。你也可以傳遞一個(gè)可選的Scheduler參數(shù)來指定調(diào)度器。

④range操作符

range操作符使用的代碼如下所示:

//range操作符,創(chuàng)建以發(fā)送范圍內(nèi)的整數(shù)序列的ObservableObservable.range(0,3).subscribe(onNextAction);

range操作符發(fā)射一個(gè)范圍內(nèi)的有序整數(shù)序列,并且我們可以指定范圍的起始和長度

圖2.1.4 range操作符參數(shù)釋義

⑤repeat操作符

repeat操作符使用的代碼如下所示:

//repeat操作符,創(chuàng)建一個(gè)以N次重復(fù)發(fā)送數(shù)據(jù)的ObservableObservable.range(0,3).repeat(2).subscribe(onNextAction);

在這里需要強(qiáng)調(diào)一下,它不是創(chuàng)建一個(gè)Observable,而是重復(fù)發(fā)射原始Observable的數(shù)據(jù)序列,這個(gè)序列或者是無限的,或者通過repeat(n)指定重復(fù)次數(shù)。

2、變換操作符

在這里我介紹如下圖所示7種變換操作符,變換操作符的作用是將源Observable發(fā)送的數(shù)據(jù)進(jìn)行變換。

圖2.1 變換操作符的分類

①map操作符

map操作符使用的代碼如下所示:

//map操作符,通過指定一個(gè)Func,將Observable轉(zhuǎn)換為另一個(gè)Observable對象并發(fā)送Observable.just("Alex_Payne")? ? ? ? ? ? ? ? ? .map(newFunc1() {? ? ? ? ? ? ? ? ? ? ? @Override? ? ? ? ? ? ? ? ? ? ? publicStringcall(Strings) {return"My Name is"+ s;? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? }).subscribe(onNextAction);

map操作符將源Observable發(fā)送的數(shù)據(jù)轉(zhuǎn)換為一個(gè)新的Observable對象。

在這里,F(xiàn)unc1和Action的區(qū)別在于,F(xiàn)unc1包裝的是有返回值的方法。另外,和ActionX 一樣,F(xiàn)uncX 也有多個(gè),用于不同參數(shù)個(gè)數(shù)的方法。

FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。

②flatMap操作符

flatMap操作符使用的代碼如下所示:

//flatMap操作符,將Observable發(fā)送的數(shù)據(jù)集合轉(zhuǎn)換為Observable集合//flatMap的合并運(yùn)行允許交叉,允許交錯(cuò)的發(fā)送事件String[] observableArr = {"Alex","Max","Bruce","Frank","Tom"};? ? ? ? Observable.from(observableArr).flatMap(newFunc1>() {? ? ? ? ? ? ? @Override? ? ? ? ? ? ? public Observable call(Strings) {returnObservable.just("My Name is:"+ s);? ? ? ? ? ? ? }? ? ? ? }).subscribe(onNextAction);

源Observable通過flatMap操作符轉(zhuǎn)換為包含源Observable發(fā)送的所有子條目的Observable集合,可見下圖的示意圖,然后從Observable集合中逐個(gè)取出轉(zhuǎn)化為單個(gè)Observable對象進(jìn)行發(fā)送。不同于map操作符的一點(diǎn)就是一對多的轉(zhuǎn)化。

圖2.2.2 flatMap轉(zhuǎn)換示意圖

它的應(yīng)用場景可以體現(xiàn)在源Observable發(fā)送的內(nèi)容為一個(gè)復(fù)雜的數(shù)據(jù)集,例如一個(gè)Bean對象,而該外層Bean對象中一個(gè)成員變量為另一個(gè)內(nèi)層Bean對象,我們想要拆解外層Bean對象獲取內(nèi)層Bean對象,就可以用flatMap操作符。

注意:FlatMap對這些Observables發(fā)射的數(shù)據(jù)做的是合并(merge)操作,因此它們可能是交錯(cuò)的。

③concatMap操作符

concatMap操作符使用的代碼如下所示:

//concatMap操作符,將Observable發(fā)送的數(shù)據(jù)集合轉(zhuǎn)換為Observable集合//解決了flatMap的交叉問題,將發(fā)送的數(shù)據(jù)連接發(fā)送String[] observableArr = {"Alex","Max","Bruce","Frank","Tom"};? ? ? ? Observable.from(observableArr).concatMap(newFunc1>() {? ? ? ? ? ? ? @Override? ? ? ? ? ? ? public Observable call(Strings) {returnObservable.just("My Name is:"+ s);? ? ? ? ? ? ? }? ? ? ? }).subscribe(onNextAction);

concatMap操作符類似于flatMap操作符,不同的一點(diǎn)是它按次序連接。

④cast操作符

cast操作符使用的代碼如下所示:

//cast操作符,將類對象進(jìn)行轉(zhuǎn)換Object[] objectsArr = {"1","2","3"};? ? ? ? Observable.from(objectsArr).cast(String.class).subscribe(onNextAction);

cast操作符將源Observable發(fā)送的數(shù)據(jù)都強(qiáng)制轉(zhuǎn)換為一個(gè)指定的類型,然后再發(fā)射數(shù)據(jù)。

需強(qiáng)調(diào)的一點(diǎn)是只能由父類對象轉(zhuǎn)換為子類對象,否則會(huì)報(bào)錯(cuò)。

⑤flatMapIterable操作符

flatMapIterable操作符使用的代碼如下所示:

//將數(shù)據(jù)集合轉(zhuǎn)換為Iterable,在Iterable中對數(shù)據(jù)進(jìn)行處理Observable.just(1,2,3).flatMapIterable(newFunc1>() {@OverridepublicIterablecall(Integer number){? ? ? ? ? ? ? ? ? ArrayList mList =newArrayList<>();? ? ? ? ? ? ? ? ? mList.add(1000+ number);returnmList;? ? ? ? ? ? ? }? ? ? ? ? }).subscribe(onNextAction);

flatMapIterable相當(dāng)于是flatMap的變體,直接在內(nèi)部以Iterable接口將集合數(shù)據(jù)進(jìn)行接收,示意圖如下所示:

圖2.2.5 flatMapIterable示意圖

⑥buffer操作符

buffer操作符使用的代碼如下所示:

//buffer操作符,將原有Observable轉(zhuǎn)換為一個(gè)新的Observable,這個(gè)新的Observable每次發(fā)送一組值,而不是一個(gè)個(gè)進(jìn)行發(fā)送Observable.just(1,2,3,4,5,6)? ? ? ? ? ? ? .buffer(3).subscribe(newAction1>() {@Overridepublicvoidcall(List mList){for(Integer i : mList) {? ? ? ? ? ? ? ? ? ? ? ? Toast.makeText(getActivity(),"new Number i is:"+ i, Toast.LENGTH_SHORT).show();? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? Toast.makeText(getActivity(),"Another request is called", Toast.LENGTH_SHORT).show();? ? ? ? ? ? ? ? }? ? ? ? ? ? });

buffer操作符將原有Observable轉(zhuǎn)換為一個(gè)新的Observable,這個(gè)新的Observable每次發(fā)送一組值,而不是一個(gè)個(gè)進(jìn)行發(fā)送,我們可以定義這個(gè)新的Observable存放幾個(gè)原有的Observable對象。

圖2.2.6 buffer操作符示意圖

⑦groupBy操作符

groupBy操作符使用的代碼如下所示:

//groupBy操作符,可以做分組操作Observable.range(0,10).groupBy(newFunc1() {@OverridepublicIntegercall(Integer num){returnnum %3;? ? ? ? ? ? ? ? ? }? ? ? ? ? ? }).subscribe(newAction1>() {@Overridepublicvoidcall(finalGroupedObservable groupedObservable){? ? ? ? ? ? ? ? ? groupedObservable.subscribe(newAction1() {@Overridepublicvoidcall(Integer num){? ? ? ? ? ? ? ? ? ? ? ? ? Toast.makeText(getActivity(),"當(dāng)前的組別是:"+ groupedObservable.getKey() +"組別內(nèi)的數(shù)字是:"+ num, Toast.LENGTH_SHORT).show();? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? });? ? ? ? ? ? ? }? ? ? ? ? });

groupBy操作符,將原有的Observable對象轉(zhuǎn)換為發(fā)送一組Observable集合的GroupedObservable對象,可以做分組操作,GroupedObservable將分組完畢的Observable對象可以繼續(xù)發(fā)送。

注意:groupBy將原始Observable分解為一個(gè)發(fā)射多個(gè)GroupedObservable的Observable,一旦有訂閱,每個(gè)GroupedObservable就開始緩存數(shù)據(jù)。因此,如果你忽略這些GroupedObservable中的任何一個(gè),這個(gè)緩存可能形成一個(gè)潛在的內(nèi)存泄露。因此,如果你不想觀察,也不要忽略GroupedObservable。你應(yīng)該使用像take(0)這樣會(huì)丟棄自己的緩存的操作符。

3、過濾操作符

過濾操作符用于從Observable發(fā)射的數(shù)據(jù)中進(jìn)行選擇,在這里介紹如下圖所示的8種。

圖3.1 過濾操作符的分類

①filter操作符

filter操作符使用的代碼如下所示:

//filter過濾操作符,對Observable發(fā)送的內(nèi)容根據(jù)自定義的規(guī)則進(jìn)行過濾Observable.range(0,5).filter(newFunc1() {@OverridepublicBooleancall(Integer num){returnnum >2;//自定義的條件,只有符合條件的結(jié)果才會(huì)提交給觀察者}? ? ? ? ? }).subscribe(onNextAction);

filter默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。

②elementAt操作符

elementAt操作符使用的代碼如下所示:

//elementAt操作符,用于返回指定位置后一位的數(shù)據(jù),即腳標(biāo)+1的數(shù)據(jù)//在這里發(fā)送0、1、2、3、4,腳標(biāo)為3的數(shù)據(jù)為2,發(fā)送其后一位數(shù)據(jù)3Observable.range(0,5).elementAt(3).subscribe(onNextAction);

elementAt操作符獲取原始Observable發(fā)射的數(shù)據(jù)序列指定索引位置的數(shù)據(jù)項(xiàng),然后當(dāng)做自己的唯一數(shù)據(jù)發(fā)射。對應(yīng)示意圖如下:

圖2.3.2 elementAt操作符示意圖

③distinct操作符

distinct操作符使用的代碼如下所示:

//distinct操作符,用于Observable發(fā)送的元素的去重Observable.just(1,1,2,2,2,3).distinct().subscribe(onNextAction);

在這里需要強(qiáng)調(diào)一點(diǎn):distinct操作符只允許還沒有發(fā)射過的數(shù)據(jù)項(xiàng)通過。

④skip操作符

skip操作符使用的代碼如下所示:

//skip操作符,用于Observable發(fā)送的元素前N項(xiàng)去除掉Observable.range(0,5).skip(2).subscribe(onNextAction);

skip操作符抑制Observable發(fā)射的前N項(xiàng)數(shù)據(jù),只發(fā)送后N項(xiàng)數(shù)據(jù)

圖2.3.4 skip操作符示意圖

⑤take操作符

//take操作符,用于Observable發(fā)送的元素只取前N項(xiàng)Observable.range(0,5).take(2).subscribe(onNextAction);

圖2.3.5 take操作符示意圖

⑥ignoreElements操作符

//ignoreElements操作符,忽略掉源Observable發(fā)送的結(jié)果,只把Observable的onCompleted或onError發(fā)送Observable.range(0,5).ignoreElements().subscribe(onNextAction, onErrorAction, onCompletedAction);

IgnoreElements操作符抑制原始Observable發(fā)射的所有數(shù)據(jù),只允許它的終止通知(onError或onCompleted)進(jìn)行發(fā)送。

⑦throttleFirst操作符

//throttleFirst操作符,會(huì)定期發(fā)送這個(gè)時(shí)間段里源Observable發(fā)送的第一個(gè)數(shù)據(jù)//throttleFirst操作符默認(rèn)在computaioin調(diào)度器上執(zhí)行,其他的數(shù)據(jù)都會(huì)被過濾掉Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){for(inti =0; i <10; i++) {? ? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onNext(i);//線程休眠100毫秒try{? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Thread.sleep(100);? ? ? ? ? ? ? ? ? ? ? ? ? ? }catch(InterruptedException e) {? ? ? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? }? ? ? ? ? })? ? ? ? ? .throttleFirst(200, TimeUnit.MILLISECONDS)? ? ? ? ? .subscribe(onNextAction);

throttleFirst操作符會(huì)按照固定的時(shí)間間隔將信息進(jìn)行發(fā)送。在這里我設(shè)置的事件間隔為200毫秒,其中每發(fā)送一個(gè)數(shù)據(jù)線程休眠100毫秒,所以最后會(huì)顯示的數(shù)據(jù)為0,示意圖如下:

圖2..3.7 throttleFirst操作符示意圖

注:throttleFirst操作符默認(rèn)在computation調(diào)度器上執(zhí)行,但是你可以使用第三個(gè)參數(shù)指定其它的調(diào)度器。

⑧throttleWithTimeOut操作符

//throttleWithTimeout操作符//源發(fā)射數(shù)據(jù)時(shí),如果兩次數(shù)據(jù)的發(fā)射間隔小于指定時(shí)間,就會(huì)丟棄前一次的數(shù)據(jù),直到指定時(shí)間內(nèi)都沒有新數(shù)據(jù)發(fā)射時(shí)才進(jìn)行發(fā)射? ? ? ? ? Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){? ? ? ? ? ? ? ? ? ? ? ? subscriber.onNext(1);try{? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Thread.sleep(500);? ? ? ? ? ? ? ? ? ? ? ? ? }catch(InterruptedException e) {throwExceptions.propagate(e);? ? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? subscriber.onNext(2);try{? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Thread.sleep(500);? ? ? ? ? ? ? ? ? ? ? ? ? }catch(InterruptedException e) {throwExceptions.propagate(e);? ? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onNext(3);try{? ? ? ? ? ? ? ? ? ? ? ? ? ? Thread.sleep(1000);? ? ? ? ? ? ? ? ? ? ? ? ? }catch(InterruptedException e) {throwExceptions.propagate(e);? ? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onNext(4);? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onNext(5);? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onCompleted();? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? })? ? ? ? ? ? ? ? .throttleWithTimeout(800, TimeUnit.MILLISECONDS)? ? ? ? ? ? ? ? .subscribeOn(Schedulers.newThread())? ? ? ? ? ? ? ? .observeOn(AndroidSchedulers.mainThread())? ? ? ? ? ? ? ? .subscribe(onNextAction);

在這里,我設(shè)置的時(shí)間間隔指定為800毫秒,所以最后顯示的數(shù)據(jù)是有3、4、5。

4、組合操作符

組合操作符用于將多個(gè)Observable組合成一個(gè)單一的Observable,在這里我介紹如下圖所示5種操作符:

圖4.1 組合操作符分類

①startWith組合操作符

startWith組合操作符使用的代碼如下所示:

//startWith操作符,會(huì)在發(fā)送的數(shù)據(jù)之前插入數(shù)據(jù)Observable.range(3,5).startWith(0,10086).subscribe(onNextAction);

很簡單,會(huì)在發(fā)送的數(shù)據(jù)序列前插入數(shù)據(jù)序列,并且會(huì)發(fā)送插入的數(shù)據(jù)序列。

②merge組合操作符

merge組合操作符使用的代碼如下所示:

//merge操作符,會(huì)將多個(gè)Observable對象合并到一個(gè)Observable對象中進(jìn)行發(fā)送? ? ? ? ? ? ? ? ObservablefirstObservable = Observable.just(0, 1, 2).subscribeOn(Schedulers.io());? ? ? ? ? ? ? ? ObservablesecondObservable = Observable.just(3, 4, 5);? ? ? ? ? ? ? ? Observable.merge(firstObservable, secondObservable).subscribe(onNextAction, onErrorAction);

如下圖所示,merge操作符會(huì)將多個(gè)Observable對象進(jìn)行合并。

圖2.4.2 merge操作符示意圖

需要注意的是:merge可能會(huì)讓合并的Observables發(fā)射的數(shù)據(jù)交錯(cuò)。

在這里我將firstObservable指定在IO線程中進(jìn)行發(fā)送,secondObservable沒有指定線程,兩者合并然后發(fā)送數(shù)據(jù)時(shí)便會(huì)產(chǎn)生數(shù)據(jù)交錯(cuò)的現(xiàn)象。

③concat組合操作符

concat組合操作符使用的代碼如下所示:

//concat操作符,會(huì)將多個(gè)Observable對象合并到一個(gè)Observable對象中進(jìn)行發(fā)送,嚴(yán)格按照順序進(jìn)行發(fā)送? ? ? ? ? ? ? ? ObservablefirstObservable = Observable.just(0, 1, 2).subscribeOn(Schedulers.io());? ? ? ? ? ? ? ? ObservablesecondObservable = Observable.just(3, 4, 5);? ? ? ? ? ? ? ? Observable.concat(firstObservable, secondObservable)? ? ? ? ? ? ? ? ? ? ? ? .subscribeOn(Schedulers.io())? ? ? ? ? ? ? ? ? ? ? ? .observeOn(AndroidSchedulers.mainThread())? ? ? ? ? ? ? ? ? ? ? ? .subscribe(onNextAction);

concat操作符不同于merge操作符的區(qū)別就是:會(huì)將多個(gè)Observable對象合并到一個(gè)Observable對象中進(jìn)行發(fā)送,嚴(yán)格按照順序進(jìn)行發(fā)送。如下圖所示,直到第一個(gè)Observable發(fā)送完畢數(shù)據(jù)后,第二個(gè)Observable才會(huì)進(jìn)行數(shù)據(jù)的發(fā)送。

圖2.4.3 concat組合操作符

④zip組合操作符

zip組合操作符使用的代碼如下所示:

//zip操作符,會(huì)將多個(gè)Observable對象轉(zhuǎn)換成一個(gè)Observable對象然后進(jìn)行發(fā)送,轉(zhuǎn)換關(guān)系可根據(jù)需求自定義Observable integerObservable = Observable.range(0,4);? ? ? ? ? ? ? ? Observable stringObservable = Observable.just("a","b","c","d");? ? ? ? ? ? ? ? Observable.zip(integerObservable, stringObservable,newFunc2() {? ? ? ? ? ? ? ? ? ? @Override? ? ? ? ? ? ? ? ? ? publicStringcall(Integer num,Stringinfo) {//在這里的轉(zhuǎn)換關(guān)系為將數(shù)字與字串內(nèi)容進(jìn)行拼接return"數(shù)字為:"+ num +"……字符為:"+ info;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }).subscribe(onNextAction);

image.png

zip操作符返回一個(gè)Obversable,它使用這個(gè)函數(shù)按順序結(jié)合兩個(gè)或多個(gè)Observables發(fā)射的數(shù)據(jù)項(xiàng),然后它發(fā)射這個(gè)函數(shù)返回的結(jié)果。

它按照嚴(yán)格的順序進(jìn)行數(shù)據(jù)發(fā)送。它只發(fā)射與發(fā)射數(shù)據(jù)項(xiàng)最少的那個(gè)Observable一樣多的數(shù)據(jù)。

⑤combineLastest組合操作符

combineLastest組合操作符使用的代碼如下所示:

//combineLastest操作符,會(huì)將多個(gè)Observable對象轉(zhuǎn)換一個(gè)Observable對象然后進(jìn)行發(fā)送,轉(zhuǎn)換關(guān)系可以根據(jù)需求自定義//不同于zip操作符的是,會(huì)將最新發(fā)送的數(shù)據(jù)組合到一起integerObservable = Observable.just(1,2,3);? ? ? ? ? ? ? ? stringObservable = Observable.just("a","b","c");? ? ? ? ? ? ? ? Observable.combineLatest(integerObservable, stringObservable,newFunc2() {? ? ? ? ? ? ? ? ? ? @Override? ? ? ? ? ? ? ? ? ? publicStringcall(Integer num,Stringinfo) {//在這里的轉(zhuǎn)換關(guān)系為將數(shù)字與字串內(nèi)容進(jìn)行拼接return"數(shù)字為:"+ num +"……字符為:"+ info;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }).subscribe(onNextAction);

當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了數(shù)據(jù)時(shí),使用一個(gè)函數(shù)結(jié)合每個(gè)Observable發(fā)射的最近數(shù)據(jù)項(xiàng),并且基于這個(gè)函數(shù)的結(jié)果發(fā)射數(shù)據(jù)??稍诒景咐a中進(jìn)行驗(yàn)證。

5、輔助操作符

輔助操作符就是處理Observable的幫助動(dòng)作,在這里介紹如下5種輔助操作符。

圖2.5 輔助操作符分類

①delay操作符

delay操作符使用的代碼如下所示:

//delay操作符可以讓源Observable對象發(fā)送數(shù)據(jù)之前暫停一段制定的時(shí)間Observable.just(1,2,3)? ? ? ? ? ? ? ? ? ? ? ? ? .delay(2, TimeUnit.SECONDS)? ? ? ? ? ? ? ? ? ? ? ? ? .observeOn(AndroidSchedulers.mainThread())? ? ? ? ? ? ? ? ? ? ? ? ? .subscribe(onNextAction);

在這里我將延時(shí)時(shí)間設(shè)置為2秒,延遲指定的時(shí)間后發(fā)射源Observable中的數(shù)據(jù)。

②do操作符

do操作符使用的代碼如下所示:

//doOnNext是do操作符中的一種? ? ? ? ? ? ? ? Observable.range(0, 3).doOnNext(onNextAction).subscribe(onNextAction);

do操作符,其下細(xì)分有很多內(nèi)容,以doOnNext為例,其作用就是為源Observable對象發(fā)送數(shù)據(jù)后,當(dāng)Subscriber接收到數(shù)據(jù)時(shí),即當(dāng)Subscriber的onNext方法被調(diào)用時(shí),提供回調(diào)相應(yīng)數(shù)據(jù)。

③subscribeOn輔助操作符

④observeOn輔助操作符

subscribeOn、observeOn操作符使用的代碼如下所示:

Observable.just("當(dāng)前的線程ID為" +Thread.currentThread().getName()).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(onNextAction);

subscribeOn操作符,指定subscribe()所發(fā)生的線程,即Observable.OnSubscribe被激活時(shí)所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。

observeOn操作符,指定Subscriber所運(yùn)行的線程?;蛘呓凶鍪录M(fèi)的線程。

上篇文章中我提到Schedulers可以使得RxJava實(shí)現(xiàn)線程切換,實(shí)質(zhì)上就是借助于lift變換方法進(jìn)行轉(zhuǎn)換,subscribeOn發(fā)生在下圖的通知過程,observeOn發(fā)生在下圖中的發(fā)送過程。

圖2.5.4 Observable與Subscriber的轉(zhuǎn)換關(guān)系圖

⑤timeout輔助操作符

timeout操作符使用的代碼如下所示:

//timeout操作符,如果源Observable對象過了一段時(shí)間沒有發(fā)送數(shù)據(jù),timeout會(huì)以onError通知終止這個(gè)ObservableObservable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){for(inti =0; i <5; i++) {try{? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Thread.sleep(i *100);? ? ? ? ? ? ? ? ? ? ? ? ? ? }catch(InterruptedException e) {? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onNext(i);? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }).timeout(200, TimeUnit.MILLISECONDS, Observable.just(100,200))? ? ? ? ? ? ? ? ? ? ? ? .observeOn(AndroidSchedulers.mainThread())? ? ? ? ? ? ? ? ? ? ? ? .subscribe(onNextAction);

需要強(qiáng)調(diào)的一點(diǎn)是,在這里timeout(long timeout, TimeUnit timeUnit, Observable other)是timeout其中的一種,它在超時(shí)的時(shí)候會(huì)將源Observable轉(zhuǎn)換為備用的Observable對象進(jìn)行發(fā)送。

6、錯(cuò)誤操作符

圖2.6 錯(cuò)誤操作符的分類

①catch操作符

實(shí)質(zhì)上在這里catch操作符細(xì)分有三種實(shí)現(xiàn)方案:onErrorReturn、onErrorResumeNext、onExceptionResumeNext。

首先分析onErrorReturn的代碼:

Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){for(inti =0; i <5; i++) {if(i >3) {? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onError(newThrowable("User Alex Defined Error"));? ? ? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onNext(i);? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }).onErrorReturn(newFunc1() {@OverridepublicIntegercall(Throwable throwable){return404;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }).subscribe(onNextAction, onErrorAction, onCompletedAction);

onErrorReturn操作符,會(huì)在遇到錯(cuò)誤時(shí),停止源Observable的,并調(diào)用用戶自定義的返回請求,實(shí)質(zhì)上就是調(diào)用一次OnNext方法進(jìn)行內(nèi)容發(fā)送后,停止消息發(fā)送。

然后分析onErrorResumeNext的代碼:

Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){for(inti =0; i <5; i++) {if(i >3) {? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onError(newThrowable("User Alex Defined Error"));? ? ? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onNext(i);? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }).onErrorResumeNext(newFunc1>() {@OverridepublicObservable call(Throwable throwable) {returnObservable.just(100,101,102);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }).subscribe(onNextAction,onErrorAction,onCompletedAction);

onErrorResumeNext操作符,會(huì)在源Observable遇到錯(cuò)誤時(shí),立即停止源Observable的數(shù)據(jù)發(fā)送,并取用新的Observable對象進(jìn)行新的數(shù)據(jù)發(fā)送。

最后,分析onExceptionResumeNext的代碼:

Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){for(inti =0; i <5; i++) {if(i >3) {? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onError(newThrowable("User Alex Defined Error"));? ? ? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onNext(i);? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }).onExceptionResumeNext(Observable.just(100,101,102)).subscribe(onNextAction,onErrorAction,onCompletedAction);

onExceptionResumeNext,會(huì)將錯(cuò)誤發(fā)給Observer,而不會(huì)調(diào)用備用的Observable

②retry操作符

retry操作符實(shí)現(xiàn)的代碼如下所示:

//retry操作符,當(dāng)遇到exception時(shí)會(huì)進(jìn)行重試,重試次數(shù)可以由用戶進(jìn)行定義Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){for(inti =0; i <5; i++) {if(i >1) {? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onError(newThrowable("User Alex Defined Error"));? ? ? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? ? ? subscriber.onNext(i);? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }).retry(2).subscribe(onNextAction,onErrorAction,onCompletedAction);

retry操作符不會(huì)將原始Observable的onError通知傳遞給觀察者,它會(huì)重新訂閱這個(gè)Observable。

圖2.6.2 retry操作符示意圖

7、布爾操作符

布爾操作符根據(jù)給定規(guī)則進(jìn)行判斷,是否符合規(guī)則然后返回布爾值。布爾操作符意義簡單操作簡便在這里介紹如下5種:

圖2.7 布爾操作符分類

①all操作符

all操作符實(shí)現(xiàn)的代碼如下所示:

Observable.just(1,2,3,4).all(newFunc1() {@OverridepublicBooleancall(Integer num){returnnum >3;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }).subscribe(onNextAction, onErrorAction, onCompletedAction);

all操作符,對源Observable發(fā)送的每一個(gè)數(shù)據(jù)根據(jù)給定的條件進(jìn)行判斷。如果全部符合條件,返回true,否則返回false。

②contains操作符

contains操作符實(shí)現(xiàn)的代碼如下所示:

Observable.just(1, 2, 3, 4).contains(2).subscribe(onNextAction,onErrorAction,onCompletedAction);

contains操作符,對源Observable發(fā)送的數(shù)據(jù)是否包含定義的選項(xiàng)進(jìn)行判斷。如果包含返回true,否則返回false。

③isEmpty操作符

isEmpty操作符實(shí)現(xiàn)的代碼如下所示:

Observable.just(1, 2, 3, 4).isEmpty().subscribe(onNextAction,onErrorAction,onCompletedAction);

isEmpty操作符,對源Observable發(fā)送的數(shù)據(jù)是否為空進(jìn)行判斷。如果源Observable發(fā)送的數(shù)據(jù)為空返回true,否則返回false。

④exists操作符

exists操作符實(shí)現(xiàn)的代碼如下所示:

Observable.just(1,2,3,4).exists(newFunc1() {@OverridepublicBooleancall(Integer num){returnnum >3;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }).subscribe(onNextAction, onErrorAction, onCompletedAction);

exists操作符,對源Observable發(fā)送的單獨(dú)一個(gè)數(shù)據(jù)根據(jù)給定的條件進(jìn)行判斷。如果有一個(gè)數(shù)據(jù)符合條件,返回true,否則返回false。

⑤sequenceEqual操作符

sequenceEqual操作符實(shí)現(xiàn)的代碼如下所示:

Observable.sequenceEqual(Observable.just(1, 2, 3, 4),Observable.just(1)).subscribe(onNextAction,onErrorAction,onCompletedAction);

sequenceEqual操作符,對兩個(gè)Observable進(jìn)行判斷,兩個(gè)Observable相同時(shí)返回true,否則返回false。這里包含兩個(gè)Observable的數(shù)據(jù),發(fā)射順序,終止?fàn)顟B(tài)是否相同。

8、條件操作符

圖2.8 條件操作符

①amb操作符

amb操作符實(shí)現(xiàn)的代碼如下所示:

//給定多個(gè)Observable,只讓第一個(gè)發(fā)送數(shù)據(jù)的Observable發(fā)送數(shù)據(jù)Observable? ? ? ? ? ? ? ? ? ? ? ? .amb(Observable.range(0,3).delay(2000, TimeUnit.MILLISECONDS),Observable.range(100,3))? ? ? ? ? ? ? ? ? ? ? ? .subscribe(onNextAction);

如下圖所示,首先發(fā)送通知給Amb的那個(gè),不管發(fā)射的是一項(xiàng)數(shù)據(jù)還是一個(gè)onError或onCompleted通知。Amb將忽略和丟棄其它所有Observables的發(fā)射物。

圖2.8.1 amb操作符示意圖

②defaultIfEmpty操作符

amb操作符實(shí)現(xiàn)的代碼如下所示:

//如果源Observable沒有發(fā)送數(shù)據(jù),則發(fā)送一個(gè)默認(rèn)數(shù)據(jù)Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){? ? ? ? ? ? ? ? ? ? ? ? subscriber.onCompleted();? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }).defaultIfEmpty(404).subscribe(onNextAction,onErrorAction,onCompletedAction);

9、轉(zhuǎn)換操作符

轉(zhuǎn)換操作符可以將Observable轉(zhuǎn)換為其它的對象或數(shù)據(jù)結(jié)構(gòu)。在這里介紹如下所示三種轉(zhuǎn)換操作符:

圖2.9 轉(zhuǎn)換操作符的分類

①toList操作符

toList操作符實(shí)現(xiàn)的代碼如下所示:

//toList操作符,將源Observable發(fā)送的數(shù)據(jù)組合為一個(gè)List集合//然后再次在onNext方法中將轉(zhuǎn)換完的List集合進(jìn)行傳遞Observable.just(1,2,3).toList().subscribe(newAction1>() {@Overridepublicvoidcall(List numList){for(Integer i : numList) {? ? ? ? ? ? ? ? ? ? ? ? ? ? Toast.makeText(getActivity(),"i:"+ i, Toast.LENGTH_SHORT).show();? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? });

通常,發(fā)射多項(xiàng)數(shù)據(jù)的Observable會(huì)為每一項(xiàng)數(shù)據(jù)調(diào)用onNext方法。你可以用toList操作符改變這個(gè)行為,讓Observable將多項(xiàng)數(shù)據(jù)組合成一個(gè)List,然后調(diào)用一次onNext方法傳遞整個(gè)列表。

②toSortedList操作符

toSortedList操作符實(shí)現(xiàn)的代碼如下所示:

//toSortedList操作符,會(huì)將源Observable發(fā)送的數(shù)據(jù)組合為一個(gè)List集合,并會(huì)按照升序的方式進(jìn)行排序//然后再次在onNext方法中將轉(zhuǎn)換完的List集合進(jìn)行傳遞Observable.just(40,10,80,30).toSortedList().subscribe(newAction1>() {@Overridepublicvoidcall(List numList){for(Integer i : numList) {? ? ? ? ? ? ? ? ? ? ? ? ? ? Toast.makeText(getActivity(),"i:"+ i, Toast.LENGTH_SHORT).show();? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? });

不同于toList操作符的是,它會(huì)對產(chǎn)生的列表排序,默認(rèn)是自然升序。

③toMap操作符

toMap操作符實(shí)現(xiàn)的代碼如下所示:

//toMap操作符,將源Observable發(fā)送的數(shù)據(jù)作為Map集合中的值,需要值進(jìn)行鍵的定義//將轉(zhuǎn)換完畢的Map集合在onNext方法中進(jìn)行發(fā)送Observable.just("Alex","Payne").toMap(newFunc1() {@OverridepublicIntegercall(String s){returns.equals("Alex")?0:1;? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? }).subscribe(newAction1>() {@Overridepublicvoidcall(Map convertMap){for(inti =0; i < convertMap.size(); i++) {? ? ? ? ? ? ? ? ? ? ? ? ? Toast.makeText(getActivity(), convertMap.get(i), Toast.LENGTH_SHORT).show();? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? });

源Observable發(fā)送的數(shù)據(jù)作為鍵值對中的值,我們可以提供一個(gè)用于生成Map的Key的函數(shù),然后不同的鍵存儲源Observable發(fā)送的不同的值。

toMap默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。

圖2.9.3 toMap操作符示意圖

三、總結(jié)

1、在本文中,我結(jié)合項(xiàng)目代碼詳細(xì)介紹了部分RxJava的操作符,局部參照:RxJava中文翻譯文檔。

2、本文中的案例代碼已上傳Github,歡迎大家star、fork。詳情戳→GitHub案例代碼。

3、操作符實(shí)質(zhì)上就是RxJava函數(shù)式編程模式的體現(xiàn),Lambda表達(dá)式并且可以進(jìn)一步優(yōu)化RxJava。

4、在下篇文章中我會(huì)對于RxJava進(jìn)行深層次的剖析,還有RxJava結(jié)合例如Retrofit、RxBus等開源框架的內(nèi)容,希望本文對你在學(xué)習(xí)RxJava的路上有所啟發(fā)。

小禮物走一走,來簡書關(guān)注我

作者:Alex_Payne

鏈接:http://www.itdecent.cn/p/d997805b37d4

來源:簡書

簡書著作權(quán)歸作者所有,任何形式的轉(zhuǎn)載都請聯(lián)系作者獲得授權(quán)并注明出處。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    測天測地測空氣閱讀 683評論 0 1
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    王帥Alex閱讀 19,471評論 21 99
  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符。對于擴(kuò)展包,由于使用率較低,如有需求,請讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 46,187評論 8 93
  • 一、Retrofit詳解 ·Retrofit的官網(wǎng)地址為 : http://square.github.io/re...
    余生_d630閱讀 2,081評論 0 5
  • 注:只包含標(biāo)準(zhǔn)包中的操作符,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,366評論 2 8

友情鏈接更多精彩內(nèi)容