RxJava的門檻相對應(yīng)于其他技術(shù)是要高一點,想想自己以前第一次看完很大一篇文章的時候,也是三個字_很難受!但經(jīng)過一段時間的使用現(xiàn)在也比較熟練了。我相信初學者通過不斷的努力也是可以的。本文主要是對RxJava的一個復(fù)習,對API整理解釋。

RxJava 是什么?
RxJava is a Java VM implementation of Reactive Extensions:
a library for composing asynchronous and event-based programs by using observable sequences.
(RxJava是Reactive Extensions的Java VM實現(xiàn):一個通過使用可觀察序列來編寫異步和基于事件的程序的 庫。)
It extends the observer pattern to support sequences of data/events and adds operators that allow you
to compose sequences together declaratively while abstracting away concerns about things like low-level
threading, synchronization, thread-safety and concurrent data structures.
(它擴展了觀察者模式以支持數(shù)據(jù)/事件序列,并添加了運算符,使您可以聲明性地組合序列,同時抽象出對低級線程,同步,
線程安全性和并發(fā)數(shù)據(jù)結(jié)構(gòu)等問題的關(guān)注。)
總結(jié)來說它是一個異步的響應(yīng)式編程庫,可以讓你的代碼邏輯更加簡潔清晰,擴展性也更強,并且在線程間的調(diào)度,事件的處理上都具有十分的優(yōu)雅性!
Rx如何使用?
在這里我就直接開門見山去除那么多的文字描述用最直接的代碼來告訴大家當然這里是快速入門,如果大家想要更加精細的學習本文后面會給大家推薦連接,大家可以去深入學習!好了,話已至此,我們開始吧!
下面用一張圖表示RxJava最簡單的運作

- 首先你需要創(chuàng)建一個上游對象Observable(被觀察者)
Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
}
});
- 然后你創(chuàng)建一個下游對象 Observer(觀察者)
Observer<Object> observer = new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
3.建立上下游關(guān)系
沒錯只需要下面這一行代碼就可以
observable.subscribe(observer);
實戰(zhàn)
你已經(jīng)學會使用最簡單的Rxjava代碼代碼并且建立他們的關(guān)系,下面用同樣的方式來寫一段RxJava引以為傲的鏈式調(diào)用:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
// onComplete(); onError(); 如果下游觸發(fā)那么下游將收不到事件了
e.onNext(4);
}
})
.subscribe(new Observer<Integer>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
mDisposable = d; //用于切斷上下游的通道
}
@Override
public void onNext(Integer o) {
if (o == 3) {
mDisposable.dispose();//切斷通道 就收不到上游發(fā)出的事件了
}
Log.d(TAG, "onNext: "+o);
}
// 當事件處理中出現(xiàn)任何錯誤回調(diào)此方法后下游將收不到事件了
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
// 當事件處理中回調(diào)此方法后下游將收不到事件了,可以自己手動調(diào)取次方法
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
線程調(diào)度:
上面的代碼是最基礎(chǔ)的上游收發(fā)事件下游處理事件,大家可以仿照寫一下自己,然后調(diào)試一下加深記憶,接下來要開始RxJava里面的線程調(diào)度了.
在RxJava中, 已經(jīng)內(nèi)置了很多線程選項供我們選擇, 例如有:
Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作
Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作
Schedulers.newThread() 代表一個常規(guī)的新線程
AndroidSchedulers.mainThread() 代表Android的主線程
我們可以根據(jù)我執(zhí)行代碼的具體情況去指定所在的線程,我下面寫一段指定線程的示列代碼:
observable.subscribeOn(Schedulers.newThread()) //指定obserable開啟新線程執(zhí)行的代碼,第一次指定后以后在指定無效
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io()) //指定observer 在IO線程執(zhí)行的代碼,最后一次指定的線程才是observer運行的有效線程
.subscribe(observer);
操作符:
上面的代碼里面有一個.map 這個就是RxJava中的操作符,就是將上游發(fā)出的事件進行轉(zhuǎn)換,可以轉(zhuǎn)換成我們需要的東西在發(fā)送到下游去
常用的操作符有那些呢?
創(chuàng)建類操作符:
create 是RxJava最基本的創(chuàng)建操作符了,直接使用即可.
just 將對象轉(zhuǎn)化為Observable對象,并且將其發(fā)射出去,可以使一個數(shù)字、一個字符串、數(shù)組、Iterate對象等,是一種非??旖莸膭?chuàng)建Observable對象的方法
from 操作符用來將某個對象轉(zhuǎn)化為Observable對象,并且依次將其內(nèi)容發(fā)射出去,from的接收值可以是集合或者數(shù)組,這個類似于just,但是just會將這個對象整個發(fā)射出去。比如說一個含有3個元素的集合,from會將集合分成3次發(fā)射,而使用just會發(fā)射一次來將整個的數(shù)組發(fā)射出去~
defer 操作符只有當有Subscriber來訂閱的時候才會創(chuàng)建一個新的Observable對象,也就是說每次訂閱都會得到一個剛創(chuàng)建的最新的Observable對象,這可以確保Observable對象里的數(shù)據(jù)是最新的,而just則沒有創(chuàng)建新的Observable對象,這樣說可能并不利于大家消化,看下邊與just對比示例~
range 操作符根據(jù)輸入的初始值【initial】和數(shù)量【number】發(fā)射number次、大于等于initial的值~
Interval 所創(chuàng)建的Observable對象會從0開始,每隔固定的時間發(fā)射一個數(shù)字,需要注意的是這個對象是運行在computation Scheduler,所以要更新UI需要在主線程中進行訂閱~
Timer 會在指定時間后發(fā)射一個數(shù)字0,注意其也是運行在computation Scheduler~
empty 創(chuàng)建一個Observable不發(fā)射任何數(shù)據(jù)、而是立即調(diào)用onCompleted方法終止~
never 創(chuàng)建一個Observable不發(fā)射任何數(shù)據(jù)、也不給訂閱ta的Observer發(fā)出任何通知~
error 返回一個Observable,當有Observer訂閱ta時直接調(diào)用Observer的onError方法終止
Repeat 會將一個Observable對象重復(fù)發(fā)射,接收值是發(fā)射的次數(shù),依然訂閱在 computation Scheduler~
delay 功能與timer操作符一樣,但是delay用于在事件中,可以延遲發(fā)送事件中的某一次發(fā)送~
轉(zhuǎn)換類操作符:
Buffer 可以簡單的理解為緩存,它可以批量或者按周期性從Observable收集數(shù)據(jù)到一個集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個數(shù)據(jù)~
FlatMap 扁平映射,作用是將一個原始Observable發(fā)射的數(shù)據(jù)進行變化,輸出一個或多個Observable,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進一個單獨的Observable(參數(shù)一般是Func1)~
Map 映射,一般用于對原始的數(shù)據(jù)進行加工處理,返回一個加工過后的數(shù)據(jù)~
GroupBy 用于對對象進行分組
Sacn sacn操作符是遍歷源Observable產(chǎn)生的結(jié)果,通過自定義轉(zhuǎn)換規(guī)則,依次輸出結(jié)果給訂閱者,
Window 窗口,它可以批量或者按周期性從Observable收集數(shù)據(jù)到一個集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個數(shù)據(jù),類似于Buffer,但Buffer發(fā)射的是數(shù)據(jù),Window發(fā)射的是Observable~
過濾類操作符:
Debounce debounce操作符在源Observable產(chǎn)品一個結(jié)果時開始計時,如果在規(guī)定的間隔時間內(nèi)沒有別的結(jié)果產(chǎn)生或者在此期間調(diào)用了onCompleted,則發(fā)射數(shù)據(jù),否則忽略發(fā)射。
Distinct 去重,過濾掉重復(fù)數(shù)據(jù)項~
ElementAt 取值,取特定位置的數(shù)據(jù)項,索引是從0開始的~
Filter 對發(fā)射的數(shù)據(jù)進行過濾,只發(fā)射符合條件的數(shù)據(jù)~
First 首項,只發(fā)射首項或滿足條件的首項數(shù)據(jù)~
Last 末項,只發(fā)射末項或滿足條件的末項數(shù)據(jù)~
IgnoreElements 忽略所有數(shù)據(jù),只保留終止通知(onError或onCompleted)~
Sample 取樣,定期掃描源Observable產(chǎn)生的數(shù)據(jù),發(fā)射最新的數(shù)據(jù)~
Skip 跳過前面的n項數(shù)據(jù)不進行處理~
SkipLast 跳過后面的n項數(shù)據(jù)不進行處理~
Take 與skip用法相反,保留前面的n項數(shù)據(jù)進行發(fā)射,而忽略后面的結(jié)果~
TakeLast 與skipLast用法相反,只保留后面的n項數(shù)據(jù)進行發(fā)射,而忽略前面的結(jié)果~
RxJava常用的操作符就介紹到這了,最終還是要大家多多練習才能掌握。