在ReactiveX中,一個觀察者(Observer)訂閱一個可觀察對象(Observable)。觀察者對Observable發(fā)射的數(shù)據(jù)或數(shù)據(jù)序列作出響應(yīng)。這種模式可以極大地簡化并發(fā)操作,因為它創(chuàng)建了一個處于待命狀態(tài)的觀察者哨兵,在未來某個時刻響應(yīng)Observable的通知,不需要阻塞等待Observable發(fā)射數(shù)據(jù)。

- 這是Observable的時間軸,從左到右。
- 這些是Obserable發(fā)出的事件
- 此垂直線表示Observable已成功發(fā)送處事件,事件發(fā)送完畢
- these dotted lines and this box indicate that a transformation is being applied to the Observable The text inside the box shows the nature of the transformation
這些虛線和此框表示正在對Observable數(shù)據(jù)轉(zhuǎn)換。
框內(nèi)的文本顯示轉(zhuǎn)換的性質(zhì) - this Observable is the result of the transformation
對Observable轉(zhuǎn)換的結(jié)果 - is for some reason the Observable terinates abnormally,with an error the vertical line is replaced by an x
由于某種原因,Observable異常中斷,垂直線x代替錯誤。
創(chuàng)建Observables
-
Create— create an Observable from scratch by calling observer methods programmatically

創(chuàng)建一個默認(rèn)的Observable
-
Defer— do not create the Observable until the observer subscribes, and create a fresh Observable for each observer

直到有觀察者訂閱時才創(chuàng)建Observable,并且為每個觀察者創(chuàng)建一個新的Observable
Defer操作符會一直等待直到有觀察者訂閱它,然后它使用Observable工廠方法生成一個Observable。它對每個觀察者都這樣做,因此盡管每個訂閱者都以為自己訂閱的是同一個Observable,事實上每個訂閱者獲取的是它們自己的單獨的數(shù)據(jù)序列。
-
Empty/Never/Throw— create Observables that have very precise and limited behavior
Empty 創(chuàng)建一個不發(fā)射任何數(shù)據(jù)但是正常終止的Observable
never 創(chuàng)建一個不發(fā)射數(shù)據(jù)也不終止的Observable
Throw 創(chuàng)建一個不發(fā)射數(shù)據(jù)以一個錯誤終止的Observable
-
From— convert some other object or data structure into an Observable
image.png
一對多的關(guān)系
將一個Iterable, 一個Future, 或者一個數(shù)組轉(zhuǎn)換成一個Observable
產(chǎn)生的Observable會發(fā)射Iterable或數(shù)組的每一項數(shù)據(jù)。
-
Interval— create an Observable that emits a sequence of integers spaced by a particular time interval

創(chuàng)建一個按固定時間間隔發(fā)射整數(shù)序列的Observable
-
Just— convert an object or a set of objects into an Observable that emits that or those objects

創(chuàng)建一個發(fā)射指定值的Observable
Just類似于From,但是From會將數(shù)組或Iterable的數(shù)據(jù)取出然后逐個發(fā)射,而Just只是簡單的原樣發(fā)射,將數(shù)組或Iterable當(dāng)做單個數(shù)據(jù)。
注意:如果你傳遞null給Just,它會返回一個發(fā)射null值的Observable。不要誤認(rèn)為它會返回一個空Observable(完全不發(fā)射任何數(shù)據(jù)的Observable),如果需要空Observable你應(yīng)該使用Empty操作符。
-
Range— create an Observable that emits a range of sequential integers
創(chuàng)建一個發(fā)射指定范圍的整數(shù)序列的Observable
Range操作符發(fā)射一個范圍內(nèi)的有序整數(shù)序列,你可以指定范圍的起始和長度。
RxJava將這個操作符實現(xiàn)為range函數(shù),它接受兩個參數(shù),一個是范圍的起始值,一個是范圍的數(shù)據(jù)的數(shù)目。如果你將第二個參數(shù)設(shè)為0,將導(dǎo)致Observable不發(fā)射任何數(shù)據(jù)(如果設(shè)置為負(fù)數(shù),會拋異常)。
-
Repeat— create an Observable that emits a particular item or sequence of items repeatedly
創(chuàng)建一個重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable
RxJava將這個操作符實現(xiàn)為repeat方法。它不是創(chuàng)建一個Observable,而是重復(fù)發(fā)射原始Observable的數(shù)據(jù)序列,這個序列或者是無限的,或者通過repeat(n)指定重復(fù)次數(shù)。
-
Start— create an Observable that emits the return value of a function

創(chuàng)建一個Observable,它發(fā)出類似函數(shù)的指令的返回值
-
Timer— create an Observable that emits a single item after a given delay
創(chuàng)建一個Observable,它在一個給定的延遲后發(fā)射一個特殊的值。

最基本使用
- 創(chuàng)建 被觀察者
- 創(chuàng)建 觀察者
- 訂閱事件 連接 被觀察者 和觀察者
//創(chuàng)建 被觀察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
Log.d(TAG, "subscribe: 1");
emitter.onNext("2");
Log.d(TAG, "subscribe: 2");
emitter.onNext("3");
Log.d(TAG, "subscribe: 3");
emitter.onNext("4");
Log.d(TAG, "subscribe: 4");
emitter.onComplete();
}
}).subscribe(//訂閱
new Observer<String>() {//觀察者
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + d.toString());
this.disposable = d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
Log.d(TAG, "onNext: disposed 前" + disposable.isDisposed());
if (s.equals("2")) {
disposable.dispose();
}
Log.d(TAG, "onNext: disposed 后" + disposable.isDisposed());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
在 Observer觀察者 中,多了一個回調(diào)方法:onSubscribe,傳遞參數(shù)為Disposable,Disposable 相當(dāng)于 RxJava 1.x 中的 Subscription, 用于解除訂閱。當(dāng)s等于2時,解除訂閱,觀察者不會在接收到被觀察者發(fā)出的消息,也包裹 onComplete事件。
在請求的過程中Activity已經(jīng)退出了, 這個時候如果回到主線程去更新UI, 那么APP肯定就崩潰了, 怎么辦呢, 上一節(jié)我們說到了Disposable , 說它是個開關(guān), 調(diào)用它的dispose()方法時就會切斷水管, 使得下游收不到事件, 既然收不到事件, 那么也就不會再去更新UI了. 因此我們可以在Activity中將這個Disposable 保存起來, 當(dāng)Activity退出時, 切斷它即可.
那如果有多個Disposable 該怎么辦呢, RxJava中已經(jīng)內(nèi)置了一個容器CompositeDisposable, 每當(dāng)我們得到一個Disposable時就調(diào)用CompositeDisposable.add()將它添加到容器中, 在退出的時候, 調(diào)用CompositeDisposable.clear() 即可切斷所有的水管.
上游可以發(fā)送無限個onNext, 下游也可以接收無限個onNext.
當(dāng)上游發(fā)送了一個onComplete后, 上游onComplete之后的事件將會繼續(xù)發(fā)送, 而下游收到onComplete事件之后將不再繼續(xù)接收事件.
當(dāng)上游發(fā)送了一個onError后, 上游onError之后的事件將繼續(xù)發(fā)送, 而下游收到onError事件之后將不再繼續(xù)接收事件.
上游可以不發(fā)送onComplete或onError.
最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError, 反之亦然
下面是log信息。
onNext: 1
onNext: disposed 前false
onNext: disposed 后false
subscribe: 1
onNext: 2
onNext: disposed 前false
onNext: disposed 后true
subscribe: 2
subscribe: 3
subscribe: 4
form just 輸出一樣
Observable<Integer> form = Observable.fromArray(items);
Observable<Integer> just = Observable.just(1, 2, 3, 4, 5);
Observable<Integer> range = Observable.range(1, 10);
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onNext: 10
onComplete:
subscribe 觀察者
public final Disposable subscribe() {}
// 表示觀察者不對被觀察者發(fā)送的事件作出任何響應(yīng)(但被觀察者還是可以繼續(xù)發(fā)送事件)
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示觀察者只對被觀察者發(fā)送的Next事件作出響應(yīng)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示觀察者只對被觀察者發(fā)送的Next事件 & Error事件作出響應(yīng)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示觀察者只對被觀察者發(fā)送的Next事件、Error事件 & Complete事件作出響應(yīng)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示觀察者只對被觀察者發(fā)送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出響應(yīng)
public final void subscribe(Observer<? super T> observer) {}
// 表示觀察者對被觀察者發(fā)送的任何事件都作出響應(yīng)
調(diào)度器 Scheduler
Schedulers.computation(?)
用于計算任務(wù),如事件循環(huán)或和回調(diào)處理,不要用于IO操作(IO操作請使用Schedulers.io());默認(rèn)線程數(shù)等于處理器的數(shù)量Schedulers.from(executor)
使用指定的Executor作為調(diào)度器Schedulers.immediate(?)
在當(dāng)前線程立即開始執(zhí)行任務(wù)Schedulers.io(?)
用于IO密集型任務(wù),如異步阻塞IO操作,這個調(diào)度器的線程池會根據(jù)需要增長;對于普通的計算任務(wù),請使用Schedulers.computation();Schedulers.io(?)默認(rèn)是一個CachedThreadScheduler,很像一個有線程緩存的新線程調(diào)度器Schedulers.newThread(?)
為每個任務(wù)創(chuàng)建一個新線程Schedulers.trampoline(?)
當(dāng)其它排隊的任務(wù)完成后,在當(dāng)前線程排隊開始執(zhí)行
簡單的來說, subscribeOn() 指定的是上游發(fā)送事件的線程, observeOn() 指定的是下游接收事件的線程.
多次指定上游的線程只有第一次指定的有效, 也就是說多次調(diào)用subscribeOn() 只有第一次的有效, 其余的會被忽略.
多次指定下游的線程是可以的, 也就是說每調(diào)用一次observeOn() , 下游的線程就會切換一次.
