準(zhǔn)備工作
資料
https://www.gitbook.com/book/mcxiaoke/rxdocs/details
https://zhuanlan.zhihu.com/p/20687178
http://gank.io/post/560e15be2dca930e00da1083
RxJava 到底是什么?
其實(shí) RxJava 不是什么。
實(shí)際上,Rx 是 Reactive Extensions(反應(yīng)式擴(kuò)展框架) 的縮寫。
RxJava 不過(guò)是其中支持Java 語(yǔ)言的 Reactive Extensions而已。
也就是說(shuō) ,Rx 是一個(gè)反應(yīng)式擴(kuò)展框架?;蛘叻Q為響應(yīng)式擴(kuò)展框架。
Rx就是一個(gè)典型的函數(shù)式編程。
RxJava 怎么火起來(lái)了
RxJava 從13年發(fā)布,15年大家都在討論,RxJava 逐漸的火起來(lái)了。
RxJava能火起來(lái)其實(shí)主要就是異步。對(duì)于多線程的操作,我相信沒(méi)有幾個(gè)人你能真正掌握的,特別是線程的同步,絕對(duì)折磨人,而RXJava 能解決的就是這些問(wèn)題,使得你只需關(guān)心業(yè)務(wù)而不去關(guān)心這些細(xì)節(jié)的東西。
另一個(gè)是簡(jiǎn)潔,無(wú)論多么復(fù)雜的業(yè)務(wù),Rx都能做到很簡(jiǎn)潔。
舉個(gè)例子:
碰到過(guò)這樣的場(chǎng)景,Android端需要展示的數(shù)據(jù)在二個(gè)不同的服務(wù)器上,我需要從二個(gè)服務(wù)器拿到數(shù)據(jù)后,合并展示。
對(duì)于這樣的需求,如果采用傳統(tǒng)的方法,我相信沒(méi)有幾個(gè)同學(xué)能搞定的,線程間的相互等待,同步,沒(méi)有幾個(gè)人搞的清楚,那么有了RxJava 呢?
看看下面的偽代碼:
Observable<String> work1 = Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> t) {
String value1 = network1();
t.onNext(value1);
}
})
.subscribeOn(Schedulers.io());
Observable<String> work2 = Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> t) {
String value2 = network2();
t.onNext(value2);
}
})
.subscribeOn(Schedulers.io());
Observable.zip(work1, work2, new Func2<String, String, String>() {
@Override
public String call(String t1, String t2) {
return t1+t2;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String t) {
//更新數(shù)據(jù)到ui的相關(guān)代碼
}
});
怎么樣相當(dāng)?shù)暮?jiǎn)潔吧,幾行代碼解決了問(wèn)題。我不在去考慮線程的等待同步問(wèn)題了。
Rx能解決什么問(wèn)題,為什么要使用它
從上面的描述和例子可以看出,Rx 勝在異步。有了它,你不必花費(fèi)太多的心思在線程上。線程的同步,安全等這些問(wèn)題都交給它吧,它會(huì)為你干好的。
實(shí)際上,RxJava 在github上是這么介紹的:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
英語(yǔ)好的同學(xué)自己翻譯,我就不翻譯了。
開始學(xué)習(xí)RxJava
在學(xué)習(xí)RXJava 之前,首先要深刻的理解 觀察者設(shè)計(jì)模式,如果連觀察者設(shè)計(jì)模式都搞不清,那么還是停下來(lái)去看看觀察者設(shè)計(jì)模式是什么樣子的。
Rx 的使用分為三個(gè)步驟:
- 創(chuàng)建 被觀察者 Observer。
- 創(chuàng)建觀察者 Observable
- 被觀察者注冊(cè)觀察者。
例子如下:
Observable<String> observable = Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> t) {
// TODO Auto-generated method stub
t.onNext("測(cè)試");
t.onCompleted();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String t) {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable e) {
// TODO Auto-generated method stub
}
@Override
public void onCompleted() {
// TODO Auto-generated method stub
}
};
observable.subscribe(observer);
需要注意的是,在觀察者模式中,是觀察者去訂閱 被觀察者對(duì)象的,而RxJava中,是被觀察的對(duì)象去訂閱觀察者,這一點(diǎn)非常的重要。
RxJava 從簡(jiǎn)單的角度講就是那些,其它的操作符都是在上面進(jìn)行了擴(kuò)展,本質(zhì)上講 就是那三個(gè)步驟。
這三個(gè)步驟中,會(huì)涉及到三個(gè)對(duì)象:
- Observer
- Observable
- 在創(chuàng)建Observer的時(shí)候 涉及到的一個(gè)對(duì)象OnSubscribe。
對(duì)于OnSubscribe對(duì)象,其實(shí)相當(dāng)于在觀察者模式中被觀察者 notifyDateChange方法的功能。
操作符號(hào)
RxJava 的操作符號(hào)比較多。全部記住這些操作符號(hào),沒(méi)有必要,更多的是在實(shí)際應(yīng)用中去查找那個(gè)操作符適合目前的業(yè)務(wù)場(chǎng)景,用多了自然記住了,總的來(lái)說(shuō),這些操作符號(hào)分為下面幾種。
看懂圖
對(duì)于操作符號(hào),真的記不住那么多,用的時(shí)候去拿,關(guān)鍵是要看懂圖

如下這張圖:
這張圖 是 flatMap操作符的。我們需要明白下面這些東西

每個(gè)Observer 通過(guò) map規(guī)則轉(zhuǎn)其它的Observer,其轉(zhuǎn)換規(guī)則是,將 圓轉(zhuǎn)成成一個(gè)菱形加上一個(gè)正方形。
把上面的圖看懂,基本上可以熟練的使用 RxJava了,請(qǐng)記住。無(wú)論是什么操作符都是針對(duì)Observer 的。
一些常用操作符號(hào)
創(chuàng)建操作符
創(chuàng)建操作符,用來(lái)創(chuàng)建 Observer 對(duì)象的,常見的創(chuàng)建操作符有
- Create.
- Defer
- From
- just
- Range
- Repeat
- Interval
轉(zhuǎn)換操作符號(hào)
轉(zhuǎn)換操作符的作用,是將一個(gè)ObServer 轉(zhuǎn)換成另一個(gè)Observer。
如 將Observer<String> 轉(zhuǎn)成成另一個(gè) Observer<Long> 可以是如下代碼:
Observable.just("1")
.map(new Func1<String, Long>() {
@Override
public Long call(String t) {
// TODO Auto-generated method stub
return Long.valueOf(t);
}
})
.subscribe(new Action1<Long>() {
@Override
public void call(Long t) {
// TODO Auto-generated method stub
}
});
轉(zhuǎn)換符號(hào)中,用到的最多的是 map 和 flatMap。
轉(zhuǎn)換符有下面這些:
- Buffer
- FlatMap
- Map
- Scan
- Window
過(guò)濾符號(hào)
過(guò)濾符號(hào)用于過(guò)濾那些條件不滿足的Observer。
- Take
- Filter
- First
- ....
組合操作符
組合操作符用于將組合多個(gè)Observer 。
startWith() — 在數(shù)據(jù)序列的開頭增加一項(xiàng)數(shù)據(jù)
merge() — 將多個(gè)Observable合并為一個(gè)
mergeDelayError() 合并多個(gè)Observables,讓沒(méi)有錯(cuò)誤的Observable都完成后再發(fā)射錯(cuò)誤通知
zip() — 使用一個(gè)函數(shù)組合多個(gè)Observable發(fā)射的數(shù)據(jù)集合,然后再發(fā)射這個(gè)結(jié)果
and(), then(), and when() — (rxjava-joins) 通過(guò)模式和計(jì)劃組合多個(gè)Observables發(fā)射的數(shù)據(jù)集合
combineLatest() — 當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了一個(gè)數(shù)據(jù)時(shí),通過(guò)一個(gè)指定的函數(shù)組合每個(gè)Observable發(fā)射的最新數(shù)據(jù)(一共兩個(gè)數(shù)據(jù)),然后發(fā)射這個(gè)函數(shù)的結(jié)果
join() and groupJoin() — 無(wú)論何時(shí),如果一個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)項(xiàng),只要在另一個(gè)Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時(shí)間窗口內(nèi),就將兩個(gè)Observable發(fā)射的數(shù)據(jù)合并發(fā)射
switchOnNext() — 將一個(gè)發(fā)射Observables的Observable轉(zhuǎn)換成另一個(gè)Observable,后者發(fā)射這些Observables最近發(fā)射的數(shù)據(jù)
錯(cuò)誤處理
- Catch
- retry
其它相關(guān)的操作符號(hào)
https://www.gitbook.com/book/mcxiaoke/rxdocs/details
后記
RxJava2 發(fā)布了,RxJava也要成為歷史了。