基本用法
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(123);
emitter.onComplete();
}
});
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
LogUtils.d("onSubscribe");
}
@Override
public void onNext(@NonNull Integer o) {
System.out.println(o);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
LogUtils.d("onComplete");
}
};
observable.subscribe(observer);
我們以這段簡單的代碼為基礎(chǔ),講解下貫穿整個ReactiveX設(shè)計的四個概念:觀察者,被觀察者,事件,訂閱。
-
觀察者
對事件進行響應(yīng)的對象,也可以稱作消費者,在上述的代碼中,subscirbe方法有很多個重載,其參數(shù)要么是一個Consumer對象,要么是一個Observer對象,Consumer對象后續(xù)也會被包裝成一個LambdaObserver對象。因此可以說subscribe方法的參數(shù)即為Observer(觀察者)。 -
被觀察者
產(chǎn)生事件的對象,也可以稱作生產(chǎn)者,在上述代碼中,Observable.create(...)返回的是一個Observable對象,即為這段程序的被觀察者(生產(chǎn)者)。 -
事件
RxJava中存在四種事件流:onSubscribe(訂閱事件),onNext(正常事件),onError(異常事件),onComplete(完成事件)。在上述代碼中,是將一個整型元素作為onNext事件中的數(shù)據(jù)進行發(fā)送。 -
訂閱
創(chuàng)建觀察者與被觀察者之間觀察關(guān)系,對應(yīng)著上述代碼中的subscribe()方法。RxJava的事件驅(qū)動模型是一種“拉模型”,在觀察者沒有進行事件訂閱之前是不會有事件產(chǎn)生的,只有觀察者進行訂閱后,才會觸發(fā)被觀察者生產(chǎn)事件。
集大成者Observable
在整個數(shù)據(jù)處理的過程中,Observable可以說是最重要的一個對象??蛻舳耍ㄏ⒌纳a(chǎn)者或者消費者)只和Observable進行交互,觀察者和被觀察者之間關(guān)系的創(chuàng)建也是由Observable去實現(xiàn),而不用我們顯示的編碼實現(xiàn),這大大降低了我們使用觀察者模式的成本。
從圖中我們可以看出:
Observable實現(xiàn)了ObservableSource接口,該接口中只有一個方法:
subscribe()。從字面意思就可以理解,這是一個提供觀察能力的接口,所以O(shè)bservable的一大能力是供觀察者進行事件訂閱,而進行事件訂閱的方法實現(xiàn)就是調(diào)用Observable的subscribe()方法Observable是一個抽象類,它提供了
subscribeActual模板方法供子類實現(xiàn),從源碼中可以看出,Observable的subscribe()方法最終會委托子類的subscribeActual()方法實現(xiàn),這個方法會建立生產(chǎn)者與消費者之間的關(guān)聯(lián)關(guān)系。除此之外,Observable還是一個工廠類,它提供了靜態(tài)方法
fromArray()、create()等用來創(chuàng)建具體的可觀察對象,同時還提供了flatMap()、concatMap()等操作方法對可觀察對象進行包裝。
Observable的存在讓生產(chǎn)者和消費者完全的解耦了,生產(chǎn)者只需關(guān)注自己生成何種Observable對象,而消費者也只需關(guān)注自己觀察的是哪種Observable。
在實際的應(yīng)用中,Rxjava已經(jīng)提供了各種各樣的操作符供我們使用,生產(chǎn)者只需要調(diào)用Observable中相應(yīng)的方法即可以生成所需的可觀察對象,供消費者進行事件訂閱。消費者只需調(diào)用可觀察對象的subscribe()方法即可與生產(chǎn)者建立觀察關(guān)系,極其方便。
真實的觀察
觀察者模式是RxJava設(shè)計的核心思想,在觀察者模式中總是存在觀察的對象和被觀察的對象,從上文的解析中也可以看出Observable更多的是一個控制器的作用,而并非真正的事件的來源。那么在RxJava中,什么才是真正的生產(chǎn)者,什么才是真正的消費者呢。
我們來分析下以下三種常見的Observable:
Observable arrayObservable = Observable.fromArray(1,2,3,4,5);
Observable createObservable = Observable.create(emmit->emmit.onNext(1));
Observable justObservable = Observable.just(1);
先簡單介紹下這幾個Observable的作用,fromArray的作用是將數(shù)組中的元素作為onNext事件發(fā)送,create的作用是發(fā)送自定義事件,just的作用是發(fā)送單個事件。
上一小節(jié)有講到實際的訂閱行為是由各個Observable類中subscribeActual()方法實現(xiàn)的,我們來看下這三個類的subscribeActual()方法。
除去細枝末節(jié),這三個方法都可以分成以下三步
1.創(chuàng)建被觀察者對象,并傳入觀察者observer,建立兩者的關(guān)聯(lián)關(guān)系;
2.觸發(fā)onSubscribe事件,觀察者響應(yīng)該事件;
3.進行事件的拉取,我們可以進入到d.run(),source.subscribe(parent),sd.run()這些方法的內(nèi)部看一些,可以看到這些方法就是在發(fā)送onNext(),onError(),onComplete()等事件。
下圖是整個流程中的相關(guān)類圖。實際事件的發(fā)送者是FromArrayDisposable等對象,而實際的觀察者,則是一個實現(xiàn)了Observer接口的實體類。如果我們在subscribe時傳入的是一個lambda表達式,之后會被包裝成一個默認(rèn)的LambdaObserver對象,進行事件消費。
包裝
RxJava中提供了豐富的操作符,比如flatMap,concatMap等可以對事件轉(zhuǎn)換,subscribeOn,observableOn等可以對生產(chǎn)和消費的線程進行控制。這些操作符實際上調(diào)用了Observable中的包裝方法對原有的可觀察對象進行包裝,返回了一個增強了的可觀察對象。
操作符種類繁多,在這就不一一舉例,我們以flatMap為例,分析一下這些操作符是如何工作的。
首先,flatMap操作會返回一個ObservableFlatMap對象,在創(chuàng)建這個對象時,會將原始的Observable對象作為構(gòu)造函數(shù)的參數(shù)傳入。
查看其核心方法subscribeActual,
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
可以看到這一類對象的subscribeActual方法和上一節(jié)中的方法不太一樣,這里面并沒有去實際的創(chuàng)建觀察關(guān)系,而是做了兩件事:
1.對觀察者進行增強,將其包裝成為MergeObserver對象
2.再調(diào)用source的subscribe方法,這里source就是前面構(gòu)造函數(shù)中傳入的Observable對象,由其再進行觀察關(guān)系的建立。
下圖是RxJava中裝飾器模式的相關(guān)類圖:所有的包裝類都繼承了AbstractObservableWithUpstream類,該抽象類有一個類型為ObservableSource的成員函數(shù),用來持有被裝飾的對象。
Observable是支持鏈?zhǔn)讲僮鞯?,就和Java 8中的Stream一樣,我們來考慮這樣一行代碼。
Observable.fromArray(1,2,3,4,5).flatMap(num->Observable.just(num)).observeOn(Schedulers.newThread()).subscribe(num-> System.out.println(num));
我們在分析上面這串代碼時,一定會凌亂非常,在看源碼時也會看到前面忘掉后面,但是如果我們對RxJava的包裝流程足夠了解的話,就可以很輕松的對上述代碼進行分析。
流程淺析
我們以一段常見的代碼分析事件產(chǎn)生和消費的流程
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@io.reactivex.annotations.NonNull Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(@io.reactivex.annotations.NonNull Integer integer) {
System.out.println(integer);
}
@Override
public void onError(@io.reactivex.annotations.NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
在分析之前,我們先定義幾個概念,source代表著被觀察者;downStream代表著觀察者;它們在下面的代碼將會重復(fù)出現(xiàn)。
Observable.create(ObservableOnSubscribe source)
Observable.create(ObservableOnSubscribe source)創(chuàng)建了一個ObservableCreate(ObservableOnSubscribe<T> source)對象,其內(nèi)部持有了真實的被觀察者ObservableOnSubscribe,我們將這個ObservableOnSubscribe稱為原始被觀察者,它是一個接口,內(nèi)部定義了subscribe()方法:
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
小結(jié):
ObservableCreate持有的source為ObservableOnSubscribe
ObservableCreate.subscribeOn(Schedulers.IO)
接著我們調(diào)用了ObservableCreate對象的subscribeOn,并傳入了IO線程調(diào)度器,subscribeOn()方法的實現(xiàn)在Observable類,ObservableCreate繼承了Observable類
#Observable
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
其實就是創(chuàng)建了ObservableSubscribeOn對象,并傳入了ObservableCreate對象和IO線程調(diào)度器。查看ObservableSubscribeOn的構(gòu)造函數(shù):
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
可以看到ObservableSubscribeOn把source傳給了父類,
ObservableSubscribeOn繼承了AbstractObservableWithUpstream,因此AbstractObservableWithUpstream持有了ObservableCreate對象,但我們可以理解為ObservableSubscribeOn持有了ObservableCreate對象。
小結(jié):
ObservableSubscribeOn持有了source為ObservableCreate
ObservableSubscribeOn.observerOn(AndroidSchedulers.mainThread())
observerOn()方法的實現(xiàn)同樣是在Observable類中:
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
可以看到也只是創(chuàng)建了一個ObservableObserveOn對象,并傳入了ObservableObserveOn對象和主線程調(diào)度器,這里的delayError和bufferSize傳入的是默認(rèn)值,暫且不表,查看ObservableObserveOn的構(gòu)造函數(shù):
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
這里同樣的是把source傳給了父類,ObservableObserveOn同樣也是繼承了AbstractObservableWithUpstream。
小結(jié):
ObservableObserveOn持有了source為ObservableSubscribeOn
ObservableObserveOn<T>.subscribe(observer)
接著我們調(diào)用了ObservableObserveOn對象的subscribe()方法,并傳入了observer,該observer是最外層的Observer<T>的實現(xiàn)類:
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
subscribe(observer)方法的實現(xiàn)是在Observable類中,其內(nèi)部調(diào)用了subscribeActual(observer),subscribeActual(observer)是抽象方法,具體實現(xiàn)是在子類中,查看ObservableObserveOn類的subscribeActual(observer):
# ObservableObserveOn
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
1.創(chuàng)建一個任務(wù),scheduler為AndroidSchedulers(主線程);
createWorker()的實現(xiàn)是在HandlerScheduler中,該方法創(chuàng)建了一個HandlerWorker對象,它ji'c持有一個主線程關(guān)聯(lián)的handler(new Handler(Looper.getMainLooper()));
2.創(chuàng)建一個Observer觀察者 => ObserveOnObserver,持有一個傳進來的observer和worker,傳進來的observer是最開始最外面的Observer<T> 接口實現(xiàn)類,woker是HandlerWorker對象。
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
3.為被觀察者和觀察者之間建立訂閱關(guān)系
這里的
source為ObservableSubscribeOn
小結(jié):
ObserveOnObserver持有了observer為interface Observer<T>
ObservableSubscribeOn.subscribe(observer)
接著查看ObservableSubscribeOn的subscribeActual(observer)方法,源碼如下:
# ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
1.創(chuàng)建一個Observer觀察者 =>SubscribeOnObserver<T>
SubscribeOnObserver<T>持有一個傳進來的observer,這個傳進來的observer是我們上一步創(chuàng)建的ObserveOnObserver對象
2.調(diào)用ObserveOnObserver.onSubscribe(parent)
查看源碼:
# ObserveOnObserver.java
@Override
public void onSubscribe(Disposable d) {
// ...
downstream.onSubscribe(this);
}
}
這里的downstream下游即我們傳進來的觀察者observer,也就是
ObserveOnObserver所持有的interface Observer<T>的實現(xiàn)了,也就是最外層的代碼:
@Override
public void onSubscribe(@io.reactivex.annotations.NonNull Disposable d) {
System.out.println("onSubscribe");
}
-
scheduler.scheduleDirect(new SubscribeTask(parent))其實就是執(zhí)行線程,scheduler就是我們傳進來的線程調(diào)度器Schedulers.IO,
SubscribeTask實現(xiàn)了Runnable,查看它的run方法:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
它的run方法只有一句代碼:
source.subscribe(parent);parent是SubscribeOnObserver對象,source是傳遞給ObservableSubscribeOn的source,也就是最開始的ObservableCreate對象。因此實際上是調(diào)用了ObservableCreate的subscribe()方法,并傳入了SubscribeOnObserver對象;
小結(jié)
SubscribeOnObserver持有了observer為ObserveOnObserver
onSubscribe方法是最先執(zhí)行的方法,并且它的線程和當(dāng)前線程一樣,因為此時還沒有執(zhí)行線程切換。
ObservableCreate.subscribeActual(observer)方法是運行在子線程的。
ObservableCreate.subscribe(observer)
查看ObservableCreate的subscribeActual方法源碼如下:
# ObservableCreate.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
- 創(chuàng)建了一個事件發(fā)射器
CreateEmitter,并傳入了observer;
這里的
observer是SubscribeOnObserver對象
2.SubscribeOnObserver的onSubscribe()方法只是為了設(shè)置Dispoable的值,不影響流程;
3.為被觀察者和觀察者之間建立訂閱關(guān)系
這里的
source是ObservableOnSubscribe
這里開始執(zhí)行,外層頁面代碼的subscribe部分,開始發(fā)射數(shù)據(jù)
@Override
public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
}
小結(jié)
emitter 發(fā)射器發(fā)送數(shù)據(jù)是在子線程。
CreateEmitter.onNext()
上一步的emitter是CreateEmitter<T>,createEmitter的onNext()源碼如下:
# CreateEmitter.java
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
這里的observer是SubscribeOnObserver,查看SubscribeOnObserver的onNext()
# SubscribeOnObserver.java
final Observer<? super T> downstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
這里的downStream在SubscribeOnObserver的構(gòu)造函數(shù)傳進來,SubscribeOnObserver是在ObservableSubscribeOn.subscribeActual創(chuàng)建的,downStream就是ObserveOnObserver對象,查看ObserveOnObserver的onNext:
#ObserveOnObserver.java
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
最終是調(diào)用了worker執(zhí)行任務(wù),ObserveOnObserver本身實現(xiàn)了Runnable,它的run()方法中調(diào)用了ObserveOnObserver所持有的observer的onNext方法,ObservableObserveOn所持有的正是最外層的Observer<T> 接口實現(xiàn)類,因此最后執(zhí)行了
@Override
public void onNext(@io.reactivex.annotations.NonNull Integer integer) {
System.out.println(integer);
}
小結(jié)
worker.schedule(this);這里又切換了線程到主線程,worker正是之前AndroidThread創(chuàng)建的HandlerWorker,其內(nèi)部持有主線程關(guān)聯(lián)的handler,因此最后的onNext是在主線程執(zhí)行。

只需要理解,每次 observerOn 和 subscribeOn 的時候,內(nèi)部都會創(chuàng)建一個新的 observable 和 observer。
新創(chuàng)建的 observable 會引用前面的 observable,就是代碼中我們分析的 source 變量。
新創(chuàng)建的 observer 會引用前面的 observer,就是代碼中我們分析的 observer 變量。
最后我們 subscribe 的時候,是調(diào)用的最后創(chuàng)建的 observable 的方法。而每個 observable 內(nèi)部又調(diào)用了 source 的 subscribe 方法,這樣就形成了一層一層往前傳遞的調(diào)用鏈。當(dāng)調(diào)用到最前面的一個 observable 的時候,就是我們自己創(chuàng)建的 observable,在這里我們需要手動觸發(fā)與該 observable 對應(yīng)的 observer 對象的 onNext 方法。而 observer 的 onNext 方法的內(nèi)部又調(diào)用了 downstream 的 onNext 方法,這樣就形成了一層一層往后傳遞的調(diào)用鏈。