什么是RxJava
在GitHub主頁上的介紹:
a library for composing asynchronous and event-based programs using observable sequences for the Java VM
在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫
本質(zhì)上來說就是解決了原來繁瑣的異步操作帶來的代碼不簡潔,可讀性非常低的問題,尤其在調(diào)度過程比較復雜的應用場景下。
RxJava 的原理簡析
簡單點來說 RxJava 的異步實現(xiàn) 就是觀察者模式的通用實現(xiàn),用過Android 事件處理的同學都知道 Android的事件處理模型就是基于觀察者實現(xiàn)的。
<h5> RxJava Android 實現(xiàn)三步驟</h5>
//創(chuàng)建一個觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "Completed");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "Error");
}
@Override
public void onNext(String s) {
Log.i(TAG, s);
}
};
//使用Observable.create()創(chuàng)建被觀察者
Observable observable1 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Wrold");
subscriber.onCompleted();
}
});
//訂閱
observable1.subscribe(observer);
Observable
create() 最基本的創(chuàng)造事件序列:
just(T...):
將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
from(T[])
將傳入的數(shù)組或 Iterable 拆分成具體對象后,依次發(fā)送出來。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
just(T...) from(T[]) create(OnSubscribe) 創(chuàng)建出來的觀察者都是等價的。
Subscribe
onStart() 在 subscribe 剛開始調(diào)用,可以用于做一些準備工作,例如數(shù)據(jù)的清零或重置。
onNext() 事件回調(diào)時調(diào)用 (相當于 onClick() / onEvent())
onCompleted() 事件隊列完結(jié)觸發(fā)onCompleted()方法。
onError() 事件隊列異常。在事件處理過程中出異常時,onError()
在一個正確運行的事件序列中,onCompleted()和onError()有且只有一個,并且是事件序列中的最后一個。需要注意的是,onCompleted()
和onError()二者也是互斥的,即在隊列中調(diào)用了其中一個,就不應該再調(diào)用另一個。
Action
Action 的出現(xiàn) 其實 是一種更簡單觀察者的實現(xiàn)方式 ,比如我們在處理業(yè)務邏輯時 只用了onNext,但是onError和onCompleted并沒有用到,那其實另外兩個是可以省略掉的。
Observable.just("One", "Two").subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
});
Scheduler
RxJava在不指定線程的情況下,發(fā)起時間和消費時間默認使用當前線程,所有就油了Scheduler(線程控制器),可以指定每一段代碼在什么樣的線程中執(zhí)行。
Observable.just("Hello", "Word")
.subscribeOn(Schedulers.newThread())//指定 subscribe() 發(fā)生在新的線程
.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
});
Schedulers.immediate()
直接在當前線程運行,相當于不指定線程。這是默認的 Scheduler。
Schedulers.newThread()
總是啟用新線程,并在新線程執(zhí)行操作。
Schedulers.io()
I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
Schedulers.computation()
計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
AndroidSchedulers.mainThread()
它指定的操作將在 Android 主線程運行。
Observable(map、flatMap)
Map

有點類似于Spring 里面的攔截器,我們先假設(shè)一個需求,需要打印多個球員的比賽數(shù)據(jù)。
Action1<List<Match>> action = new Action1<List<Match>>() {
@Override
public void call(List<Match> matches) {
for (int i = 0; i < matches.size(); i++){
Log.i(TAG, matches.get(i).getName());
}
}
};
Observable.from(personList)
.map(new Func1<Person, List<Match>>() {
@Override
public List<Match> call(Person person) {
return person.getMatchesList();
}
}).subscribe(action1);
flatMap
一般用于輸出一個Observable,而其隨后的subscribe中的參數(shù)也跟Observable中的參數(shù)一樣,注意不是Observable,一般用于對原始數(shù)據(jù)返回一個Observable,這個Observable中數(shù)據(jù)類型可以是原來的,也可以是其他的
List<Person> persons = new ArrayList<Person>();
Action1<List<Match>> action1 = new Action1<List<Match>>() {
@Override
public void call(List<Match> matches) {
for (int i = 0; i < matches.size(); i++){
Log.i(TAG, matches.get(i).getName());
}
}
};
Observable.from(persons)
.map(new Func1<Person, List<Match>>() {
@Override
public List<Course> call(Person person) {
//返回coursesList
return person.getMatchesList();
}
}).subscribe(action1);

遞歸轉(zhuǎn)換
其它處理方法
filter 就是對集合進行過濾
each就是遍歷集合
take取出集合中的前幾個
skip跳過前幾個元素
unique相當于按照數(shù)學上的集合處理,去重