前言
如果你對(duì)RxJava1.x還不是了解,可以參考下面文章。
1. RxJava使用介紹 【視頻教程】
2. RxJava操作符
? Creating Observables(Observable的創(chuàng)建操作符) 【視頻教程】
? Transforming Observables(Observable的轉(zhuǎn)換操作符) 【視頻教程】
? Filtering Observables(Observable的過濾操作符) 【視頻教程】
? Combining Observables(Observable的組合操作符) 【視頻教程】
? Error Handling Operators(Observable的錯(cuò)誤處理操作符) 【視頻教程】
? Observable Utility Operators(Observable的輔助性操作符) 【視頻教程】
? Conditional and Boolean Operators(Observable的條件和布爾操作符) 【視頻教程】
? Mathematical and Aggregate Operators(Observable數(shù)學(xué)運(yùn)算及聚合操作符) 【視頻教程】
? 其他如observable.toList()、observable.connect()、observable.publish()等等; 【視頻教程】
3. RxJava Observer與Subcriber的關(guān)系 【視頻教程】
4. RxJava線程控制(Scheduler) 【視頻教程】
5. RxJava 并發(fā)之?dāng)?shù)據(jù)流發(fā)射太快如何辦(背壓(Backpressure)) 【視頻教程】
開始
Rxjava 已經(jīng)于2016年11月12日正式發(fā)布了2.0.1版本。
RxJava 2.0 已經(jīng)按照Reactive-Streams specification規(guī)范完全的重寫了。RxJava2.0 已經(jīng)獨(dú)立于RxJava 1.x而存在。
RxJava2.0相比RxJava1.x,它的改動(dòng)還是很大的,下面我將帶大家了解這些改動(dòng)。
RxJava2.0與1.x的區(qū)別
Maven地址
為了讓 RxJava 1.x 和 RxJava 2.x 相互獨(dú)立,我們把RxJava 2.x 被放在了maven io.reactivex.rxjava2:rxjava:2.x.y 下,類放在了 io.reactivex 包下用戶從 1.x 切換到 2.x 時(shí)需要導(dǎo)入的相應(yīng)的包,但注意不要把1.x和2.x混淆了。
接口變化
RxJava2.0 是遵循 Reactive Streams Specification 的規(guī)范完成的,新的特性依賴其提供的4個(gè)基礎(chǔ)接口。分別是:
- Publisher
- Subscriber
- Subscription
- Processor
在后邊的介紹中我們會(huì)涉及到。
Javadoc文檔
官方2.0的 Java 文檔 http://reactivex.io/RxJava/2.x/javadoc/
添加依賴
Android端使用RxJava需要依賴新的包名:
//RxJava的依賴包
compile 'io.reactivex.rxjava2:rxjava:2.0.3'
//RxAndroid的依賴包
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
Nulls
RxJava1.x中,支持 null 值,如下代碼所示:
Observable.just(null);
Single.just(null);
RxJava 2.0不再支持 null 值,如果傳入一個(gè)null會(huì)拋出 NullPointerException
Observable.fromCallable(() -> null)
.subscribe(System.out::println, Throwable::printStackTrace);
Observable.just(1).map(v -> null)
.subscribe(System.out::println, Throwable::printStackTrace);
Observable and Flowable
在本節(jié)開始之前,我們先了解下RxJava背壓(Backpressure)機(jī)制的問題。
什么是背壓(Backpressure)
在RxJava中,可以通過對(duì)Observable連續(xù)調(diào)用多個(gè)Operator組成一個(gè)調(diào)用鏈,其中數(shù)據(jù)從上游向下游傳遞。當(dāng)上游發(fā)送數(shù)據(jù)的速度大于下游處理數(shù)據(jù)的速度時(shí),就需要進(jìn)行Flow Control了。如果不進(jìn)行Flow Control,就會(huì)拋出MissingBackpressureException異常。
這就像小學(xué)做的那道數(shù)學(xué)題:一個(gè)水池,有一個(gè)進(jìn)水管和一個(gè)出水管。如果進(jìn)水管水流更大,過一段時(shí)間水池就會(huì)滿(溢出)。這就是沒有Flow Control導(dǎo)致的結(jié)果。
再舉個(gè)例子,在 RxJava1.x 中的 observeOn, 因?yàn)槭乔袚Q了消費(fèi)者的線程,因此內(nèi)部實(shí)現(xiàn)用隊(duì)列存儲(chǔ)事件。在 Android 中默認(rèn)的 buffersize 大小是16,因此當(dāng)消費(fèi)比生產(chǎn)慢時(shí), 隊(duì)列中的數(shù)目積累到超過16個(gè),就會(huì)拋出MissingBackpressureException。
如果你想了解更多關(guān)于背壓的知識(shí),請(qǐng)參考:
http://blog.csdn.net/jdsjlzx/article/details/52717636
http://www.itdecent.cn/p/2c4799fa91a4
下面我們通過一段代碼來“感受”一下背壓。
Observable.interval(1, TimeUnit.MILLISECONDS)
//將觀察者的工作放在新線程環(huán)境中
.observeOn(Schedulers.newThread())
//觀察者處理每1000ms才處理一個(gè)事件
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError :"+ e);
}
@Override
public void onNext(Long value) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("onNext value :"+ value);
}
});
Flow Control有哪些思路呢?大概是有四種:
- 背壓(Backpressure);
- 節(jié)流(Throttling);
- 打包處理;
- 調(diào)用棧阻塞(Callstack blocking)。
這里限于篇幅的問題,我們就不再一一介紹了,請(qǐng)移步:https://gold.xitu.io/post/58535b5161ff4b0063aa6b10
如何讓Observable支持Backpressure?
在RxJava 1.x中,有些Observable是支持Backpressure的,而有些不支持。但不支持Backpressure的Observable可以通過一些operator來轉(zhuǎn)化成支持Backpressure的Observable。這些operator包括:
- onBackpressureBuffer
- onBackpressureDrop
- onBackpressureLatest
- onBackpressureBlock(已過期)
它們轉(zhuǎn)化成的Observable分別具有不同的Backpressure策略。
而在RxJava2.0 中,Observable 不再支持背壓,而是改用Flowable 支持非阻塞式的背壓。Flowable是RxJava2.0中專門用于應(yīng)對(duì)背壓(Backpressure)問題而新增的(抽象)類。其中,F(xiàn)lowable默認(rèn)隊(duì)列大小為128。并且規(guī)范要求,所有的操作符強(qiáng)制支持背壓。幸運(yùn)的是, Flowable 中的操作符大多與舊有的 Observable 類似。
上面提到的四種operator的前三種分別對(duì)應(yīng)Flowable的三種Backpressure策略:
- BackpressureStrategy.BUFFER
- BackpressureStrategy.DROP
- BackpressureStrategy.LATEST
onBackpressureBuffer是不丟棄數(shù)據(jù)的處理方式。把上游收到的全部緩存下來,等下游來請(qǐng)求再發(fā)給下游。相當(dāng)于一個(gè)水庫(kù)。但上游太快,水庫(kù)(buffer)就會(huì)溢出。

onBackpressureDrop和onBackpressureLatest比較類似,都會(huì)丟棄數(shù)據(jù)。這兩種策略相當(dāng)于一種令牌機(jī)制(或者配額機(jī)制),下游通過request請(qǐng)求產(chǎn)生令牌(配額)給上游,上游接到多少令牌,就給下游發(fā)送多少數(shù)據(jù)。當(dāng)令牌數(shù)消耗到0的時(shí)候,上游開始丟棄數(shù)據(jù)。但這兩種策略在令牌數(shù)為0的時(shí)候有一點(diǎn)微妙的區(qū)別:onBackpressureDrop直接丟棄數(shù)據(jù),不緩存任何數(shù)據(jù);而onBackpressureLatest則緩存最新的一條數(shù)據(jù),這樣當(dāng)上游接到新令牌的時(shí)候,它就先把緩存的上一條“最新”數(shù)據(jù)發(fā)送給下游??梢越Y(jié)合下面兩幅圖來理解。


onBackpressureBlock是看下游有沒有需求,有需求就發(fā)給下游,下游沒有需求,不丟棄,但試圖堵住上游的入口(能不能真堵得住還得看上游的情況了),自己并不緩存。這種策略已經(jīng)廢棄不用。
注意:在RxJava2.0中,舊的Observable也保留了,你還可以像以前那樣使用,同時(shí)要注意接口的變化。
需要說明的是,RxJava2.0中,F(xiàn)lowable是對(duì)Observable的補(bǔ)充(而不是替代),也可以這么說,F(xiàn)lowable是能夠支持Backpressure的Observable。
何時(shí)用Observable
- 當(dāng)上游在一段時(shí)間發(fā)送的數(shù)據(jù)量不大(以1000為界限)的時(shí)候優(yōu)先選擇使用Observable;
- 在處理GUI相關(guān)的事件,比如鼠標(biāo)移動(dòng)或觸摸事件,這種情況下很少會(huì)出現(xiàn)backpressured的問題,用Observable就足以滿足需求;
- 獲取數(shù)據(jù)操作是同步的,但你的平臺(tái)不支持Java流或者相關(guān)特性。使用Observable的開銷低于Flowable。
何時(shí)用Flowable
- 當(dāng)上游在一段時(shí)間發(fā)送的數(shù)據(jù)量過大的時(shí)候(這個(gè)量我們往往無法預(yù)計(jì)),此時(shí)就要使用Flowable以限制它所產(chǎn)生的量的元素10K +處理。
- 當(dāng)你從本地磁盤某個(gè)文件或者數(shù)據(jù)庫(kù)讀取數(shù)據(jù)時(shí)(這個(gè)數(shù)據(jù)量往往也很大),應(yīng)當(dāng)使用Flowable,這樣下游可以根據(jù)需求自己控制一次讀取多少數(shù)據(jù);
- 以讀取數(shù)據(jù)為主且有阻塞線程的可能時(shí)用Flowable,下游可以根據(jù)某種條件自己主動(dòng)讀取數(shù)據(jù)。
Single、Completable
Single 與 Completable 都基于新的 Reactive Streams 的思想重新設(shè)計(jì)了接口,主要是消費(fèi)者的接口, 現(xiàn)在他們是這樣的:
interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable error);
}
interface CompletableObserver<T> {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable error);
}
Subscriber
對(duì)比一下 Subscriber :
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
我們會(huì)發(fā)現(xiàn)和以前不一樣的是多了一個(gè) onSubscribe 的方法, Subscription 如下:
Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
熟悉 RxJava 1.x 的朋友能發(fā)現(xiàn), 新的 Subscription 更像是綜合了舊的 Producer 與 Subscription 的綜合體。他既可以向上游請(qǐng)求(request)數(shù)據(jù),又可以打斷并釋放(cancel)資源。而舊的 Subscription 在這里因?yàn)槊直徽迹恢匦旅闪?Disposable。
注意:Subscription 不再有訂閱subcribe和unSubcribe的概念。
Disposable
public interface Disposable {
void dispose();
boolean isDisposed();
}
這里最大的不同就是這個(gè) onSubscribe ,根據(jù) Specification, 這個(gè)函數(shù)一定是第一個(gè)被調(diào)用的, 然后就會(huì)傳給調(diào)用方一個(gè) Subscription ,通過這種方式組織新的背壓關(guān)系。當(dāng)我們消費(fèi)數(shù)據(jù)時(shí),可以通過 Subscription 對(duì)象,自己決定請(qǐng)求數(shù)據(jù)。
這里就可以解釋上面的非阻塞的背壓。舊的阻塞式的背壓,就是根據(jù)下游的消費(fèi)速度,中游可以選擇阻塞住等待下游的消費(fèi),隨后向上游請(qǐng)求數(shù)據(jù)。而新的非阻塞就不在有中間阻塞的過程,由下游自己決定取多少,還有背壓策略,如拋棄最新、拋棄最舊、緩存、拋異常等。
而新的接口帶來的新的調(diào)用方式與舊的也不太一樣, subscribe 后不再會(huì)有 Subscription 也就是如今的 Disposable,為了保持向后的兼容, Flowable 提供了 subscribeWith方法 返回當(dāng)前的 Subscriber 對(duì)象, 并且同時(shí)提供了 DefaultSubscriber , ResourceSubscriber , DisposableSubscriber ,讓他們提供了 Disposable 接口,并且可以從外面取消 dispose()。 現(xiàn)在也可以完成和以前類似的代碼:
ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {
@Override
public void onStart() {
request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer t) {
System.out.println(t);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
};
Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber);
subscriber.dispose();
注意,由于Reactive-Streams的兼容性,方法onCompleted被重命名為onComplete。另外注意dispose()方法,這個(gè)方法允許你釋放資源。
RxJava2.x中提供了幾個(gè)Subcriber對(duì)象,如下所示:
- DefaultSubscriber:通過實(shí)現(xiàn)Subscriber接口,可以通過調(diào)用request(long n)方法請(qǐng)求或者cancel()方法取消訂閱(同步請(qǐng)求)
public abstract class DefaultSubscriber<T> implements Subscriber<T>
- DisposableSubscriber:通過實(shí)現(xiàn)Desposable異步刪除。
public abstract class DisposableSubscriber<T> implements Subscriber<T>, Disposable
- ResourceSubscriber:允許異步取消其訂閱相關(guān)資源,節(jié)省內(nèi)存而且是線程安全。
public abstract class ResourceSubscriber<T> implements Subscriber<T>, Disposable
- SafeSubscriber:包裝另一個(gè)訂閱者,并確保所有onXXX方法遵守協(xié)議(序列化要求訪問除外)。
public final class SafeSubscriber<T> implements Subscriber<T>, Subscription
- SerializedSubscriber:序列化訪問另一個(gè)訂閱者的onNext,onError和onComplete方法。
public final class SerializedSubscriber<T> implements Subscriber<T>, Subscription
在onSubscribe/onStart中調(diào)用request
注意,在Subscriber.onSubscribe或ResourceSubscriber.onStart中調(diào)用request(n)將會(huì)立即調(diào)用onNext,實(shí)例代碼如下:
Flowable.range(1, 3).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("OnSubscribe start");
s.request(Long.MAX_VALUE);
System.out.println("OnSubscribe end");
}
@Override
public void onNext(Integer v) {
System.out.println(v);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
});
輸出結(jié)果如下:
OnSubscribe start
1
2
3
Done
OnSubscribe end
當(dāng)你在onSubscribe/onStart中做了一些初始化的工作,而這些工作是在request后面時(shí),會(huì)出現(xiàn)一些問題,在onNext執(zhí)行時(shí),你的初始化工作的那部分代碼還沒有執(zhí)行。為了避免這種情況,請(qǐng)確保你調(diào)用request時(shí),已經(jīng)把所有初始化工作做完了。
這個(gè)行為不同于1.x中的 request要經(jīng)過延遲的邏輯直到上游的Producer到達(dá)時(shí)。在2.0中,總是Subscription先傳遞下來,90%的情況下沒有延遲請(qǐng)求的必要。
Subscription
在RxJava 1.x中,接口rx.Subscription負(fù)責(zé)流和資源的生命周期管理,即退訂和釋放資源,例如scheduled tasks。Reactive-Streams規(guī)范用這個(gè)名稱指定source和consumer之間的關(guān)系: org.reactivestreams.Subscription 允許從上游請(qǐng)求一個(gè)正數(shù),并支持取消。
為了避免名字沖突,1.x的rx.Subscription被改成了 io.reactivex.Disposable。
因?yàn)镽eactive-Streams的基礎(chǔ)接口org.reactivestreams.Publisher 定義subscribe()為無返回值,F(xiàn)lowable.subscribe(Subscriber)不再返回任何Subscription。其他的基礎(chǔ)類型也遵循這種規(guī)律。
在2.x中其他的subscribe的重載方法返回Disposable。
原始的Subscription容器類型已經(jīng)被重命名和修改。
- CompositeSubscription 改成 CompositeDisposable,
- SerialSubscription 和MultipleAssignmentSubscription 被合并到了 SerialDisposable。 set() 方法取消了舊值,而replace()方法沒有。
- RefCountSubscription 已被刪除。
收回 create 方法權(quán)限
在RxJava 1.x 最明顯的問題就是由于 create 的太過開放,導(dǎo)致其被開發(fā)者濫用,而不是學(xué)習(xí)使用提供的操作符。并且用戶對(duì) RxJava 不夠了解,導(dǎo)致各種各樣的問題,如背壓、異常處理等。
由于規(guī)范要求所有的操作符強(qiáng)制支持背壓,因此新的 create 采用了保守的設(shè)計(jì),讓用戶實(shí)現(xiàn) FlowableOnSubscribe 接口,并選取背壓策略,然后在內(nèi)部實(shí)現(xiàn)封裝支持背壓,簡(jiǎn)單的例子如下:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER);
Functions可以拋出異常
不同于RxJava1.x,RxJava2.x中沒有了一系列的Action/Func接口,取而代之的是與Java8命名類似的函數(shù)式接口,如下圖:
而Consumer即消費(fèi)者,用于接收單個(gè)值,BiConsumer則是接收兩個(gè)值,F(xiàn)unction用于變換對(duì)象,Predicate用于判斷。這些接口命名大多參照了Java8,熟悉Java8新特性的應(yīng)該都知道意思,這里也就不再贅述了。
public interface Consumer<T> {
void accept(T t) throws Exception;
}
新的ActionX、FunctionX的方法聲明都增加了一個(gè)throws Exception,這帶來了顯而易見的好處,現(xiàn)在我們可以這樣寫:
Flowable.just("qq.txt")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String value) throws Exception {
File file = new File(value);
file.createNewFile();
return 99;
}
});
而createNewFile方法顯式的拋出了一個(gè)IOException,而在以前是不可以這樣寫的。
Schedulers
在2.0的API中仍然支持主要的默認(rèn)scheduler: computation, io, newThread 和 trampoline,可以通過io.reactivex.schedulers.Schedulers這個(gè)實(shí)用的工具類來調(diào)度。
2.0中不存在immediate 調(diào)度器。 它被頻繁的誤用,并沒有正常的實(shí)現(xiàn) Scheduler 規(guī)范;它包含用于延遲動(dòng)作的阻塞睡眠,并且不支持遞歸調(diào)度。你可以使用Schedulers.trampoline()來代替它。
Schedulers.test()已經(jīng)被移除,這樣避免了默認(rèn)調(diào)度器休息的概念差異。那些返回一個(gè)”global”的調(diào)度器實(shí)例是鑒于test()總是返回一個(gè)新的TestScheduler實(shí)例?,F(xiàn)在我們鼓勵(lì)測(cè)試人員使用這樣簡(jiǎn)單的代碼new TestScheduler()。
io.reactivex.Scheduler抽象類現(xiàn)在支持直接調(diào)度任務(wù),不需要先創(chuàng)建然后通過Worker調(diào)度。
操作符的差別
2.0中大部分操作符仍然被保留,實(shí)際上大部分行為和1.x一樣。
關(guān)于操作符,引用JakeWharton的總結(jié)就是:
All the same operators(you konw and love or hate and despise) are still there.
Transformer
RxJava 1.x 中Transformer實(shí)際上就是Func1<Observable,Observable>,換句話說就是提供給他一個(gè)Observable它會(huì)返回給你另一個(gè)Observable,這和內(nèi)聯(lián)一系列操作符有著同等功效。
相關(guān)API如下:
public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
// cover for generics insanity
}
public interface Func1<T, R> extends Function {
R call(T t);
}
實(shí)際操作下,寫個(gè)方法,創(chuàng)建一個(gè)Transformer調(diào)度器:
//子線程運(yùn)行,主線程回調(diào)
public Observable.Transformer<T, T> io_main(final RxAppCompatActivity context) {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
Observable<T> observable = (Observable<T>) tObservable
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
DialogHelper.showProgressDlg(context, mMessage);
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.compose(RxLifecycle.bindUntilEvent(context.lifecycle(), ActivityEvent.STOP));
return observable;
}
};
}
上面這個(gè)方法出自本人的Community框架,用法和源碼詳見:RxHelper.java
在實(shí)際應(yīng)用中,Transformer 經(jīng)常和 Observable.compose() 一起使用。本人的Community框架也有使用,這里就不多介紹了。
在RxJava2.0中,Transformer劃分的更加細(xì)致了,每一種“Observable”都對(duì)應(yīng)的有自己的Transformer,相關(guān)API如下所示:
public interface ObservableTransformer<Upstream, Downstream> {
ObservableSource<Downstream> apply(Observable<Upstream> upstream);
}
public interface CompletableTransformer {
CompletableSource apply(Completable upstream);
}
public interface FlowableTransformer<Upstream, Downstream> {
Publisher<Downstream> apply(Flowable<Upstream> upstream);
}
public interface MaybeTransformer<Upstream, Downstream> {
MaybeSource<Downstream> apply(Maybe<Upstream> upstream);
}
public interface SingleTransformer<Upstream, Downstream> {
SingleSource<Downstream> apply(Single<Upstream> upstream);
}
這里以FlowableTransformer為例,創(chuàng)建一個(gè)Transformer調(diào)度器:
//子線程運(yùn)行,主線程回調(diào)
public FlowableTransformer<T, T> io_main(final RxAppCompatActivity context) {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> flowable) {
return flowable
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription subscription) throws Exception {
DialogHelper.showProgressDlg(context, mMessage);
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.compose(RxLifecycle.<T, ActivityEvent>bindUntilEvent(context.lifecycle(), ActivityEvent.DESTROY));
}
};
}
上面這個(gè)方法出自本人的CommunityRxJava2框架,用法和源碼詳見:RxHelper.java
其他改變
doOnCancel/doOnDispose/unsubscribeOn
在1.x中,doOnUnsubscribe總是執(zhí)行終端事件,因?yàn)镾afeSubscriber調(diào)用了unsubscribe。這實(shí)際上是沒有必要的。Reactive-Streams規(guī)范中,一個(gè)終端事件到達(dá)Subscriber,上游的Subscription會(huì)取消,因此調(diào)用 cancel()是一個(gè)空操作。
由于同樣的原因unsubscribeOn也沒被在終端路徑上調(diào)用,但只有實(shí)際在鏈上調(diào)用cancel時(shí),才會(huì)調(diào)用unsubscribeOn。
因此,下面的序列不會(huì)被調(diào)用
doOnCancel
Flowable.just(1,2,3)
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, " doOnCancel " );
}
})
.subscribe(new DisposableSubscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG, " onNext : " + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
Log.e(TAG, " onComplete isDisposed() = " + isDisposed());
}
});
輸出結(jié)果如下:
onNext : 1
onNext : 2
onNext : 3
onComplete isDisposed() = false
然而,下面將會(huì)調(diào)用take操作符在傳送過程中取消onNext
Flowable.just(1,2,3)
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, " doOnCancel " );
}
})
.take(2)
.subscribe(new DisposableSubscriber<Integer>() {
@Override
public void onNext(Integer integer) {
Log.e(TAG, " onNext : " + integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
Log.e(TAG, " onComplete isDisposed() = " + isDisposed());
}
});
輸出結(jié)果如下:
onNext : 1
onNext : 2
doOnCancel
onComplete isDisposed() = false
使用take操作符,調(diào)用了cancel方法,我們看一下take操作符的源碼:
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL) // may trigger UNBOUNDED_IN
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> take(long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}
return RxJavaPlugins.onAssembly(new FlowableTake<T>(this, count));
}
關(guān)鍵點(diǎn)就是這個(gè)FlowableTake類,這里限于篇幅的原因就不看源碼了,大家可以自己看一下,然后找找是什么地方調(diào)用了cancel。
同樣的,如果你需要在終端或者取消時(shí)執(zhí)行清理,考慮使用using操作符代替。
以上就是RxJava2.0中的改動(dòng),下面我們重點(diǎn)介紹下RxJava2.0中的觀察者模式。
RxJava2.0中的觀察者模式
RxJava始終以觀察者模式為骨架,在2.0中依然如此。
在RxJava2.0中,有五種觀察者模式:
Observable/ObserverFlowable/SubscriberSingle/SingleObserverCompletable/CompletableObserverMaybe/MaybeObserver
后面三種觀察者模式差不多,Maybe/MaybeObserver可以說是Single/SingleObserver和Completable/CompletableObserver的復(fù)合體。
下面列出這五個(gè)觀察者模式相關(guān)的接口。
Observable/Observer
public abstract class Observable<T> implements ObservableSource<T>{...}
public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable e);
void onComplete();
}
Completable/CompletableObserver
//代表一個(gè)延遲計(jì)算沒有任何價(jià)值,但只顯示完成或異常。類似事件模式Reactive-Streams:onSubscribe(onError | onComplete)?
public abstract class Completable implements CompletableSource{...}
//沒有子類繼承Completable
public interface CompletableSource {
void subscribe(CompletableObserver cs);
}
public interface CompletableObserver {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable e);
}
Flowable/Subscriber
public abstract class Flowable<T> implements Publisher<T>{...}
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Maybe/MaybeObserver
//Maybe類似Completable,它的主要消費(fèi)類型是MaybeObserver順序的方式,遵循這個(gè)協(xié)議:onSubscribe(onSuccess | onError | onComplete)
public abstract class Maybe<T> implements MaybeSource<T>{...}
public interface MaybeSource<T> {
void subscribe(MaybeObserver<? super T> observer);
}
public interface MaybeObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T t);
void onError(Throwable e);
void onComplete();
}
Single/SingleObserver
//Single功能類似于Observable,除了它只能發(fā)出一個(gè)成功的值,或者一個(gè)錯(cuò)誤(沒有“onComplete”事件),這個(gè)特性是由SingleSource接口決定的。
public abstract class Single<T> implements SingleSource<T>{...}
public interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}
public interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T t);
void onError(Throwable e);
}
其實(shí)從API中我們可以看到,每一種觀察者都繼承自各自的接口(都有一個(gè)共同的方法subscrib()),但是參數(shù)不一樣),正是各自接口的不同,決定了他們功能不同,各自獨(dú)立(特別是Observable和Flowable),同時(shí)保證了他們各自的拓展或者配套的操作符不會(huì)相互影響。
下面我們重點(diǎn)說說在實(shí)際開發(fā)中經(jīng)常會(huì)用到的兩個(gè)模式:Observable/Observer和Flowable/Subscriber。
Observable/Observer
Observable正常用法:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
需要注意的是,這類觀察模式不支持背壓,下面我們具體分析下。
當(dāng)被觀察者快速發(fā)送大量數(shù)據(jù)時(shí),下游不會(huì)做其他處理,即使數(shù)據(jù)大量堆積,調(diào)用鏈也不會(huì)報(bào)MissingBackpressureException,消耗內(nèi)存過大只會(huì)OOM。
在測(cè)試的時(shí)候,快速發(fā)送了100000個(gè)整形數(shù)據(jù),下游延遲接收,結(jié)果被觀察者的數(shù)據(jù)全部發(fā)送出去了,內(nèi)存確實(shí)明顯增加了,遺憾的是沒有OOM。
所以,當(dāng)我們使用Observable/Observer的時(shí)候,我們需要考慮的是,數(shù)據(jù)量是不是很大(官方給出以1000個(gè)事件為分界線,供各位參考)。
Flowable/Subscriber
Flowable.range(0, 10)
.subscribe(new Subscriber<Integer>() {
Subscription subscription;
//當(dāng)訂閱后,會(huì)首先調(diào)用這個(gè)方法,其實(shí)就相當(dāng)于onStart(),
//傳入的Subscription s參數(shù)可以用于請(qǐng)求數(shù)據(jù)或者取消訂閱
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onsubscribe start");
subscription = s;
subscription.request(1);
Log.d(TAG, "onsubscribe end");
}
@Override
public void onNext(Integer o) {
Log.d(TAG, "onNext--->" + o);
subscription.request(3);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
輸出結(jié)果如下:
onsubscribe start
onNext--->0
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onNext--->6
onNext--->7
onNext--->8
onNext--->9
onComplete
onsubscribe end
Flowable是支持背壓的,也就是說,一般而言,上游的被觀察者會(huì)響應(yīng)下游觀察者的數(shù)據(jù)請(qǐng)求,下游調(diào)用request(n)來告訴上游發(fā)送多少個(gè)數(shù)據(jù)。這樣避免了大量數(shù)據(jù)堆積在調(diào)用鏈上,使內(nèi)存一直處于較低水平。
當(dāng)然,F(xiàn)lowable也可以通過create()來創(chuàng)建:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER);//指定背壓策略
Flowable雖然可以通過create()來創(chuàng)建,但是你必須指定背壓的策略,以保證你創(chuàng)建的Flowable是支持背壓的(這個(gè)在1.0的時(shí)候就很難保證,可以說RxJava2.0收緊了create()的權(quán)限)。
根據(jù)上面的代碼的結(jié)果輸出中可以看到,當(dāng)我們調(diào)用subscription.request(n)方法的時(shí)候,不等onSubscribe()中后面的代碼執(zhí)行,就會(huì)立刻執(zhí)行onNext方法,因此,如果你在onNext方法中使用到需要初始化的類時(shí),應(yīng)當(dāng)盡量在subscription.request(n)這個(gè)方法調(diào)用之前做好初始化的工作;
當(dāng)然,這也不是絕對(duì)的,我在測(cè)試的時(shí)候發(fā)現(xiàn),通過create()自定義Flowable的時(shí)候,即使調(diào)用了subscription.request(n)方法,也會(huì)等onSubscribe()方法中后面的代碼都執(zhí)行完之后,才開始調(diào)用onNext。
平滑升級(jí)
RxJava1.x 如何平滑升級(jí)到RxJava2.0呢?
由于RxJava2.0變化較大無法直接升級(jí),幸運(yùn)的是,官方提供了RxJava2Interop這個(gè)庫(kù),可以方便地將RxJava1.x升級(jí)到RxJava2.0,或者將RxJava2.0轉(zhuǎn)回RxJava1.x。
地址:https://github.com/akarnokd/RxJava2Interop
總結(jié)
可以明顯的看到,RxJava2.0最大的改動(dòng)就是對(duì)于backpressure的處理,為此將原來的Observable拆分成了新的Observable和Flowable,同時(shí)其他相關(guān)部分也同時(shí)進(jìn)行了拆分。
除此之外,就是我們最熟悉和喜愛的RxJava。