作為一個Android開發(fā)從業(yè)者,當(dāng)你處理異步任務(wù)時,如果還在使用著Handler+Thread,那么你可能需要了解下RxJava這個優(yōu)秀的開源框架;當(dāng)然如果你正在跳槽面試,RxJava也是經(jīng)常被問到的框架。
關(guān)于介紹RxJava的文章也非常多,但是很多文章基于的版本還是1.0.X,而本博文就基于2.0版本對RxJava進行一個簡單的介紹和分析,也算是拋磚引玉吧。
本博文基于RxJava 2.0.0版本進行分析講解。
參考:拋物線大神《給 Android 開發(fā)者的 RxJava 詳解》
RxJava是什么?
簡單的歸納為兩個字:異步。
歸納畢竟是歸納,不能完全表明RxJava的概念,那么我們來看GitHub上給出的解釋:
a library for composing asynchronous and event-based programs by using observable sequences.
我用我蹩腳的CET-6水平給大家翻譯下,大概就是這個意思:
一個使用可觀測序列來組成異步的、基于事件的程序的庫。
這對于剛接觸的童鞋們可能不太容易理解,RxJava的核心還是異步,其他的定語都是基于其之上,有了這個思維和認識,再去學(xué)習(xí)RxJava也能更容易接受和理解其設(shè)計。
為什么要使用RxJava?
我寫溜溜的[AsyncTask / Handler / Thread/ ... ],干嘛要使用這個奇怪的RxJava???
還能為什么?簡潔唄。
異步操作的很重要的一點就是保持程序和代碼的簡潔性,Android內(nèi)部提供的AsyncTask以及Handlder+Thread都是為了解決異步代碼編寫繁瑣問題,從而使編寫異步代碼更加簡潔。在保持代碼和程序簡潔這個目的上,RxJava倒是更加的努力和方便,它的優(yōu)點是隨著程序邏輯變得越來越復(fù)雜,它仍然可以保持簡潔、優(yōu)雅。
口說無憑,我們來分析下面這樣一個例子。
圖片展示可能是我們每個Android開發(fā)者都要面對的問題,假設(shè)在我們的Activity上存在一個ListView,并且我們提供了一個addImage方法來任意添加待顯示的圖片?,F(xiàn)在需要將某個目錄下所有的png圖片都加載并顯示在ListView中,由于讀取和解析圖片是一個耗時過程,因此我們需要將這個過程放在后臺執(zhí)行;而圖片的顯示則必須放在主線程(UI線程)中。
那么在沒有使用RxJava時,我們怎么編寫這段代碼呢?
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
((MainActivity) context).runOnUiThread(new Runnable() {
@Override
public void run() {
imageList.add(bitmap);
imageListAdatper.notifyDataSetChanged();
}
});
}
}
}
}
}.start();
沒有對比,就沒有傷害,如果我們使用RxJava的話,是如何實現(xiàn)的呢?
Observable.fromArray(folders)
.flatMap(new Function<File, ObservableSource<File>>() {
@Override
public ObservableSource<File> apply(File file) throws Exception {
return Observable.fromArray(file.listFiles());
}
})
.filter(new Predicate<File>() {
@Override
public boolean test(File file) throws Exception {
return file.getName().endsWith(".png");
}
})
.map(new Function<File, Bitmap>() {
@Override
public Bitmap apply(File file) throws Exception {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
imageList.add(bitmap);
imageListAdatper.notifyDataSetChanged();
}
});
這代碼變簡潔了嗎?這代碼量也沒減少啊,而且這一大堆代碼都是什么意思?。客耆床欢 ?/p>
各位看官,你先消消氣,我們講的簡潔是:邏輯上的簡潔,并不是單純的代碼減少(說實話,我們其實更關(guān)注這個)。
仔細看下這段代碼,之前的if..else呢?之前的那么多循環(huán)呢?好像都不見了,完全是從上到下的一條鏈?zhǔn)秸{(diào)用,而且沒有嵌套(你是不是也討厭好多層的嵌套,反正我是),現(xiàn)在看起來是不是邏輯更加清楚了呢。
此時RxJava的優(yōu)勢還不能完全體現(xiàn)出來,而且看到這么多陌生的函數(shù),你也一定有點不知其解,那么我們就帶著疑惑接著往下看。
API
雖然我知道你有很強的理解和學(xué)習(xí)能力,但是我還是決定要對RxJava的一些常用的API進行介紹和說明,以便你能更順暢的閱讀全文。
1.觀察者模式
RxJava的異步實現(xiàn),是通過一種擴展的觀察者模式來實現(xiàn)的。
我們來看下什么是觀察者模式?
觀察者模式(有時又被稱為發(fā)布(publish )-訂閱(Subscribe)模式、模型-視圖(View)模式、源-收聽者(Listener)模式或從屬者模式)是軟件設(shè)計模式的一種。在此種模式中,一個目標(biāo)物件管理所有相依于它的觀察者物件,并且在它本身的狀態(tài)改變時主動發(fā)出通知。這通常透過呼叫各觀察者所提供的方法來實現(xiàn)。此種模式通常被用來實現(xiàn)事件處理系統(tǒng)。
這是百度給出的解釋,我們在日常編碼中使用的點擊事件的處理就采用了觀察者模式。
clkBtn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
Toast.makeText(MainActivity.this, "The button was clicked", Toast.LENGTH_LONG).show();
}
});
在典型的Click事件處理中,Button就是被觀察者,而我們設(shè)置的OnClickListener就是觀察者,在我們點擊Button時,OnClickListener的onClick方法就會被回調(diào)。
2. RxJava的觀察者模式
2.1 幾個對象
我們先來了解下RxJava給我們提供的幾個常用的對象。
- FLowable與Observable
在2.0版本中被觀察者新的實現(xiàn)叫做Flowable, 同時舊的Observable也保留了。因為在 RxJava1.x 中,有很多事件不被能正確的背壓,從而拋出MissingBackpressureException。
舉個簡單的例子,在 RxJava1.x 中的 observeOn, 因為是切換了消費者的線程,因此內(nèi)部實現(xiàn)用隊列存儲事件。在 Android 中默認的 buffersize 大小是16,因此當(dāng)消費比生產(chǎn)慢時, 隊列中的數(shù)目積累到超過16個,就會拋出MissingBackpressureException, 初學(xué)者很難明白為什么會這樣,使得學(xué)習(xí)曲線異常得陡峭。
而在 2.0 中,Observable 不再支持背壓,而Flowable 支持非阻塞式的背壓。并且規(guī)范要求,所有的操作符強制支持背壓。
幸運的是,F(xiàn)lowable 中的操作符大多與舊有的 Observable 類似。
- Observer與Subscriber
Observer就是我們前面提到的觀察者,與Observable組合使用。
Subscriber也被成為訂閱者,一般與Flowable組合使用。
因為Observable不再支持背壓,因此如果我們使用RxJava2.0版本,F(xiàn)lowable可能是你的不二人選。
基于以上的分析,本文以下的示例將采用Flowable進行說明和講解。
2.2 回調(diào)
為什么稱RxJava采用了擴展的觀察者模式呢?我們知道傳統(tǒng)的觀察者回調(diào)接口中只有一個update方法,那么RxJava呢?它可不止一個,讓我們來看下Subscriber的定義。
public interface Subscriber<T> {
/**
* Invoked after calling {@link Publisher#subscribe(Subscriber)}.
* <p>
* No data will start flowing until {@link Subscription#request(long)} is invoked.
* <p>
* It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
* <p>
* The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
*
* @param s
* {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
*/
public void onSubscribe(Subscription s);
/**
* Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
*
* @param t the element signaled
*/
public void onNext(T t);
/**
* Failed terminal state.
* <p>
* No further events will be sent even if {@link Subscription#request(long)} is invoked again.
*
* @param t the throwable signaled
*/
public void onError(Throwable t);
/**
* Successful terminal state.
* <p>
* No further events will be sent even if {@link Subscription#request(long)} is invoked again.
*/
public void onComplete();
}
RxJava的觀察者接口中提供了onSubscribe、onNext、onError、onComplete四個回調(diào)方法,而傳統(tǒng)的觀察者模式中只有update一個回調(diào)方法,這也是稱之為擴展的觀察者模式的一部分原因。
下面我們來分析下Subscriber接口中幾個方法:
onSubscribe
這個方法是2.0之后才有的方法,主要是給觀察者提供了一個終止事件接收的機會(當(dāng)然我們也可以做一些預(yù)處理),它也會首先被調(diào)用。
要終止接收事件,可以調(diào)用Subscription的cancel方法。onNext
我們可以將其理解為傳統(tǒng)觀察者模式回調(diào)接口中的update方法,它可能會被調(diào)用多次。它的調(diào)用順序在onSubscribe之后。onError
在事件處理過程中出異常時,onError會被觸發(fā),同時事件隊列自動終止,不會再有事件發(fā)出。onComplete
在事件隊列傳遞完畢后,該方法會被調(diào)用。
在一個正確運行的事件序列中, onComplete()和onError()有且只有一個,并且是事件序列中的最后一個。
需要注意的是,onComplete() 和 onError() 二者也是互斥的,即在隊列中調(diào)用了其中一個,就不應(yīng)該再調(diào)用另一個。
在一個正確的事件序列中,onError與onComplete互斥且唯一。
相比于傳統(tǒng)的觀察者模式,RxJava使用的擴展觀察者模式好像變得復(fù)雜了,但是從另一方面來講它也更加的豐富了,把更多的主動權(quán)和機會交給了使用者。
3. 實戰(zhàn)
看了那么多的概念,是不是覺得有點枯燥和乏味呢,那我們就開始動手使用RxJava來體驗一下吧。
3.1 引用
怎么在我們的項目中使用RxJava和RxAndroid呢?
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex.rxjava2:rxjava:2.1.3'
3.2 實例
- 1.0 方式
//定義被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello");
e.onNext("World");
e.onNext("!");
//注意在此調(diào)用onComplete方法結(jié)束事件的處理
e.onComplete();
}
});
// 定義觀察者
Observer<String> observer = new Observer<String>() {
// 該方法會在onNext方法之前調(diào)用
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe->11111");
// d.dispose();
}
@Override
public void onNext(String value) {
System.out.println(value);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete->222222");
}
};
// 訂閱
observable.subscribe(observer);
- 2.0方式
//創(chuàng)建Flowable對象
Flowable flowable = Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(@NonNull FlowableEmitter e) throws Exception {
e.onNext("Hello");
e.onNext("World");
e.onNext("!");
//注意在此調(diào)用onComplete方法結(jié)束事件的處理
e.onComplete();
}
}, BackpressureStrategy.BUFFER);
// 定義觀察者
Subscriber subsrciber= new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("onSubscribe->11111");
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete->222222");
}
};
// 訂閱
flowable.subscribe(subsrciber);
- 訂閱
訂閱這句代碼看起來好奇怪,主要是subscribe()這個方法有點怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關(guān)系。這讓人讀起來有點別扭,不過如果把 API 設(shè)計成 observer.subscribe(observable) / subscriber.subscribe(observable) ,雖然更加符合思維邏輯,但對流式 API 的設(shè)計就造成影響了,比較起來明顯是得不償失的。
- 運行結(jié)果
分別運行上面的兩段代碼,運行效果相同,如下所示:
onSubscribe->11111
Hello
World
!
onComplete->222222
這可能是最簡單的RxJava使用示例了。
3.3 創(chuàng)建被觀察者
在上面的示例中,我們采用了Observable.create方法來創(chuàng)建被觀察者,并且在subscribe方法中完成了事件的傳遞。
RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列,我們一起來看一下。
- just(T...)
將傳遞的參數(shù),依次發(fā)送出去。
Flowable.just("Hello", "World", "!")
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("World");
// onNext("!");
// onComplete();
這句代碼的效果與上面示例中的效果相同。
- from(T[]) / from(Iterable<? extends T>)
將傳入的數(shù)組或 Iterable 拆分成具體對象后,依次發(fā)送出來。
String[] values = new String[]{"Hello", "Wrold", "!"};
Flowable observable = Flowable.fromArray(values);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("World");
// onNext("!");
// onComplete();
上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create() 的例子是等價的。
3.4 靈活的事件回調(diào)定義
RxJava支持定義不完整的事件回調(diào)定義,就是我們可以拋棄Subscriber的定義,而只選擇定義其中的一部分回調(diào)。
看下代碼可能會更明了。
String[] values = new String[]{"Hello", "Wrold", "!"};
Consumer onNext = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext:" + s);
}
};
Consumer<? super Throwable> onError = new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
}
};
Action onComplete = new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
};
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext()
Flowable.fromArray(values)
.subscribe(onNext);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
Flowable.fromArray(values)
.subscribe(onNext, onError);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
Flowable.fromArray(values)
.subscribe(onNext, onError, onComplete);
是不是很靈活?嗯,是的。
3.5 Schedulers
在 RxJava的默認規(guī)則中,事件的發(fā)出和消費都是在同一個線程的,在哪個線程調(diào)用subscribe(),就在哪個線程生產(chǎn)事件;在哪個線程生產(chǎn)事件,就在哪個線程消費事件。
也就是說,如果只用上面的方法,實現(xiàn)出來的只是一個同步的觀察者模式。
觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機制,因此異步對于RxJava 是至關(guān)重要的。
而要實現(xiàn)異步,則需要用到 RxJava 的另一個概念: Schedulers(調(diào)度器) 。
- API
在RxJava中,Scheduler相當(dāng)于線程控制器,RxJava通過它來指定每一段代碼應(yīng)該運行在什么樣的線程。
RxJava已經(jīng)內(nèi)置了一些調(diào)度器,主要有以下幾個:
- Schedulers.immediate(): 直接在當(dāng)前線程運行,相當(dāng)于不指定線程,這是默認的Scheduler。
- Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。
- Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
- Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
- AndroidSchedulers.mainThread():它指定的操作將在 Android 主線程運行,屬于Android專用的調(diào)度器。
有了這幾個Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。
subscribeOn(): 指定 subscribe() 所發(fā)生的線程,即Flowable.OnSubscribe 被激活時所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。
-
observeOn(): 指定 Subscriber 所運行在的線程?;蛘呓凶鍪录M的線程。
Flowble.just(1, 2, 3) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("The receive num is :" + integer); } }
上面這段代碼中,由于subscribeOn(Schedulers.io()) 的指定,被創(chuàng)建的事件的內(nèi)容 1、2、3 將會在 IO線程發(fā)出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber數(shù)字的打印將發(fā)生在主線程。
事實上,這種在 subscribe() 之前寫上兩句subscribeOn(Scheduler.io()) 和observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它適用于多數(shù)的 『后臺線程取數(shù)據(jù),主線程顯示』的程序策略。
4.變換
RxJava提供了對事件序列進行變換的支持,這是它的核心功能之一,也是大多數(shù)人說『RxJava 真是太好用了』的最大原因。所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉(zhuǎn)換成不同的事件或事件序列。
在開發(fā)中我們經(jīng)常碰到這樣的場景:從本地讀取并加載圖片。也就是說我們通常的入?yún)⑹且粋€文件路徑,而我們想要得到的是一個BitMap對象,那么如果使用RxJava我們該如何優(yōu)雅的實現(xiàn)呢?
final String filePath = "/images/logo.png";
Flowble.just(filePath)
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(@NonNull String s) throws Exception {
return getBitmapFromFile(new File(filePath));
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
showBitmap(bitmap);
}
});
就問你優(yōu)雅不優(yōu)雅?牛逼不牛逼?
可以看到,map()方法將參數(shù)中的String對象轉(zhuǎn)換成一個Bitmap對象后返回,而在經(jīng)過map()方法后,事件的參數(shù)類型也由 String轉(zhuǎn)為了Bitmap。這種直接變換對象并返回的,是最常見的也最容易理解的變換。
那么常用的事件變換有那些呢?
1. map
事件對象的直接變換,具體功能上面已經(jīng)介紹過,它是RxJava 最常用的變換。
在上面的例子中我們可以看到,map方法將參數(shù)中的 String對象變換為一個 Bitmap對象后返回,而在經(jīng)過 map方法后,事件的參數(shù)類型也由String變?yōu)榱?Bitmap。這種直接變換對象并返回的,是最常見的也最容易理解的變換。
2. flatMap
flatMap和map有共同點,都是將一個對象轉(zhuǎn)換為另一個對象,不同的是map只是一對一的轉(zhuǎn)換,而flatMap可以是一對多的轉(zhuǎn)換,并且是轉(zhuǎn)換為另外一個Flowable對象!
示例如下:
ArrayList<String[]> list = new ArrayList<>();
String[] words1 = {"Hello,", "I am", "China!"};
String[] words2 = {"Hello,", "I am", "Beijing!"};
list.add(words1);
list.add(words2);
Flowable.fromIterable(list)
.flatMap(new Function<String[], Publisher<String>>() {
@Override
public Publisher<String> apply(@NonNull String[] strings) throws Exception {
return Flowable.fromArray(strings);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("Consumer->accept:"+s);
}
});
運行結(jié)果如下所示:
Consumer->accept:Hello,
Consumer->accept:I am
Consumer->accept:China!
Consumer->accept:Hello,
Consumer->accept:I am
Consumer->accept:Beijing!
flatMap的轉(zhuǎn)換可以分解為三個過程:
- 根據(jù)傳入的事件生成一個Publisher對象(其實也可以理解為Flowable)。
- 激活該Flowable對象發(fā)送事件,而不是直接發(fā)送該Flowable對象。
- 同一個Flowable對象發(fā)送的事件都會匯總到Flowable后,F(xiàn)lowable負責(zé)將事件統(tǒng)一傳遞給subsrciber。
3. lift
我們可以將該方法視為map與flatMap的底層調(diào)用實現(xiàn),其目的就是定義我們自己的Operator來完成變換。
lift方法接收一個FlowableOperator的參數(shù),這個FlowableOperator就是定義我們自己的轉(zhuǎn)換操作。
這樣解釋起來可能有些不太明了,下面我們舉兩個簡單的例子來看下怎么使用lift實現(xiàn)map和flatMap的效果。
- map的lift寫法
Flowable.just(filePath)
.lift(new FlowableOperator<Bitmap, String>() {
@Override
public Subscriber<? super String> apply(@NonNull final Subscriber<? super Bitmap> observer) throws Exception {
return new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
observer.onSubscribe(s);
}
@Override
public void onNext(String s) {
observer.onNext(getBitmapFromFile(new File(s)));
}
@Override
public void onError(Throwable t) {
observer.onError(t);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
})
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
showBitmap(bitmap);
}
});
- flatMap的lift寫法
Flowable.fromIterable(list)
.lift(new FlowableOperator<String, String[]>() {
@Override
public Subscriber<? super String[]> apply(@NonNull final Subscriber<? super String> observer) throws Exception {
return new Subscriber<String[]>() {
@Override
public void onSubscribe(Subscription s) {
observer.onSubscribe(s);
}
@Override
public void onNext(String[] strings) {
Flowable.fromArray(strings)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
observer.onNext(s);
}
});
}
@Override
public void onError(Throwable t) {
observer.onError(t);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept ->"+s);
}
});
4. range
該方法比較簡單,用于產(chǎn)生int和long型數(shù)字。
Flowable.range(1,5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
})
輸出1 2 3 4 5五個數(shù)字。
5. merge
主要用戶合并對象,示例如下:
ArrayList<String> list1 = new ArrayList<>();
list1.add("1");
list1.add("2");
list1.add("3");
ArrayList<String> list2 = new ArrayList<>();
list2.add("4");
list2.add("5");
list2.add("6");
Flowable.merge(Flowable.fromIterable(list1), Flowable.fromIterable(list2))
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
輸出1 2 3 4 5 6。
6. compose
調(diào)解轉(zhuǎn)換的作用,示例如下:
Flowable.merge(Flowable.fromIterable(list1), Flowable.fromIterable(list2))
.compose(new FlowableTransformer<String, Integer>() {
@Override
public Publisher<Integer> apply(@NonNull Flowable<String> upstream) {
return upstream.map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return Integer.parseInt(s);
}
});
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer s) throws Exception {
System.out.println(s);
}
});
輸出1 2 3 4 5 6 六個數(shù)字。
7. compose與lift的區(qū)別
兩者都實現(xiàn)了變換的功能,但是變換的內(nèi)容和對象卻不相同。
- lift實現(xiàn)的是對事件和事件序列的變換。
- compose實現(xiàn)的是Flowable本身的變換。
5 總結(jié)
至此,我們對RxJava的使用分析告一段落,作為一個牛逼的異步框架,如果能正確的引入到我們的項目中來一定能提高我們效率,降低后期我們的維護成本。
祝各位工作愉快。