??樓主最近在找實習(xí)工作,由于簡歷上說了解RxJava,所以在面試的時候應(yīng)該會問到RxJava的知識,于是樓主結(jié)合RxJava的源碼,對RxJava的工作原理進(jìn)行初步的了解。也只敢說是初步了解,因為自己也是第一次看RxJava的源碼,理解的程度肯定不是很深。還是那樣,如果有錯誤之處,希望各位指正!
??本文參考:
??1.除非特殊說明,源碼來自:2.2.0版本
??2.RxJava從源碼到應(yīng)用 移動端開發(fā)效率秒提速
1.概述
??樓主打算將RxJava的源碼分析寫成一個系列文章,所以這個是這個系列的第一篇文章,在概述里面還是對RxJava是什么簡單的介紹一下,本系列文章不會對RxJava的基本用法進(jìn)行展開,如果有老哥對RxJava的基本使用掌握的不是很好的話,推薦這個系列的文章:給初學(xué)者的RxJava2.0教程(一)。
??簡單的說一下RxJava,RxJava是基于觀察者模式的一個框架,在RxJava中有兩個角色,一個Observable,通常被稱為被觀察者,一個是Observer,通常被稱為觀察者??傮w的架構(gòu)是,由Observable來處理任務(wù)或者發(fā)送事件,然后在Observer里面來接受到Observable發(fā)送過來的信息。
??RxJava有很多的優(yōu)勢,比如線程調(diào)度,在Android里面,耗時操作必須放在子線程中,但是同時還需要主線程來更細(xì)UI,所以線程調(diào)度就顯得尤為重要。當(dāng)然RxJava還有很多重要的操作符,使得我們的開發(fā)變得非常的方便。本系列文章不會對每個操作符的基本使用展開,而是對一些比較常用的操作源碼分析,所說的常用,也是指樓主用到的??!畢竟是菜雞,肯定有很多的東西都不太懂。
2.基本元素
??想要對RxJava的基本原理有一個更好的了解,必須對它的基本有一個大概的了解。我們先通過一個簡單的案例,來對RxJava的基本元素進(jìn)行提取。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
??在這個簡單的案例當(dāng)中,我們可以提取的元素有:Observable, ObservableOnSubscribe, ObservableEmitter,Observer。
??元素還是挺少的,我們現(xiàn)在對每個元素的類結(jié)構(gòu)來進(jìn)行簡單的分析一下。
(1).Observable
public abstract class Observable<T> implements ObservableSource<T> {
}
??我們發(fā)現(xiàn)Observable本身是一個抽象類,并且實現(xiàn)了ObservableSource接口,在來看看ObservableSource接口里面有什么。
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
??ObservableSource接口里面只有一個subscribe方法,也就是說,RxJava將注冊觀察者這部分的功能提取成一個接口,從而可以看出來,面向接口編程是多么的重要????。。。
??再分別來看看我們上面案例中使用的兩個方法--create和subscribe。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// 先省略代碼部分,待會詳細(xì)的分析。
}
??啊,嚇我一跳,我以為create方法的參數(shù)又是一個接口類型,還好是ObservableOnSubscribe類型,也是上面提取出來的元素其中之一,關(guān)于這個類,待會會詳細(xì)的分析。
public final void subscribe(Observer<? super T> observer) {
//...
}
??這個方法就更加的簡單了,就是傳遞了一個Observer接口的對象。不過需要注意的是這個方法有很多的重載,其中以Consumer類型的操作最為多,不過這個也沒什么,最后還是Consumer轉(zhuǎn)換成為了Observer,這個就涉及到Observer接口的一個實現(xiàn)類--LambdaObserver。不要害怕,待會都會一一的講解的。
(2).Observer
??說了被觀察者,我們先來看看觀察者--Observer。
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
??哎呀呀,更加的簡單了, Observer只是簡單的接口,不過我們需要注意的是這個接口定義的4個方法,這里不講解四個方法的作用,畢竟我們這里將Observable的基本原理????。
(3).ObservableOnSubscribe
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
??一如既往的接口,subscribe方法里面就是具體做事情的地方,這個相信大佬們應(yīng)該都知道,我這里就班門弄斧的提醒一下????。
(4).ObservableEmitter
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
boolean tryOnError(@NonNull Throwable t);
}
??ObservableEmitter也是一個接口,同時繼承了Emitter接口,我們來看看Emitter接口的定義
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
??作為一個發(fā)射器,Emitter里面定義了很多關(guān)于發(fā)送消息給Observer的方法,Emitter的onNext對應(yīng)著Observer的onNext方法,其他的方法也是類似的。
3.Observable的工作原理
(1).create方法
??我們對相關(guān)部分的基本元素有了一個基本的了解,現(xiàn)在我們來對整個流程的工作原理進(jìn)行分析。首先我們create方法入手
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
??create方法沒有我們想象中的那么難,就只有兩行代碼,還有一行用來check的????。對于ObservableCreate類這里先不進(jìn)行分析,我們來看看 RxJavaPlugins的onAssembly方法。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
??這里提醒一下,onAssembly方法的參數(shù)類型是Observable類型,也就是說ObservableCreate本身就是一個Observable。好了,扯了題外話,來看看onAssembly方法具體是干嘛的。
??整個方法的執(zhí)行過程比較簡單,如果onObservableAssembly為null,直接就返回了source,也就是說返回了ObservableCreate本身。而我們在整個Observable的源碼中發(fā)現(xiàn),onObservableAssembly初始值本身為null。
public static void reset() {
//······
setOnObservableAssembly(null);
//······
}
??為什么需要這樣子繞圈子的做呢?這里就是做了鉤子,以便于以后的擴(kuò)展。
??所以Observable的create方法就是返回了一個ObservableCreate對象,不過需要注意的是ObservableCreate包裹了一個ObservableOnSubscribe對象,也就是我們在create方法里面new的那個ObservableOnSubscribe對象。
??我們先來不急著去理解ObservableCreate是什么,還是來看看subscribe方法為我們做了什么。
(2). subscribe方法
??當(dāng)我們通過Observable的create方法來獲取一個Observable對象時,通常還會調(diào)用Observable的subscribe方法來注冊一個觀察者?,F(xiàn)在我們來看看subscribe方法的實現(xiàn)。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
??整個過程也不是想象中的那么神秘,除去check相關(guān)的方法不看,歸根結(jié)底就是兩行代碼,先是通過RxJavaPlugins的onSubscribe方法來獲取Observer對象,具體操作這里就不說了,肯定跟RxJavaPlugins的onAssembly方法差不多,最后返回的是observer本身,最后調(diào)用了subscribeActual方法。這個subscribeActual方法是干嘛的?
protected abstract void subscribeActual(Observer<? super T> observer);
??臥了個槽?抽象方法!那我怎么知道調(diào)用的是哪個類的subscribeActual方法?不急哈,記得我們之前在create方法返回的Observable對象是哪個類的對象嗎?想起來了吧,是ObservableCreate
(3). ObservableCreate
??先來看看ObservableCreate類結(jié)構(gòu)。
public final class ObservableCreate<T> extends Observable<T> {
}
??我們發(fā)現(xiàn),ObservableCreate繼承了Observable,其實在分析create方法時,我也說過喲。
??在ObservableCreate類中,只有一個ObservableOnSubscribe類型的成員變量,這個成員變量就是我們在create方法里面new的ObservableOnSubscribe對象
??我們再來看看ObservableCreate對subscribeActual方法的實現(xiàn)
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
??在subscribeActual方法里面,先是對Observer對象進(jìn)行一次包裝,將它包裝在CreateEmitter類中。然后我們會發(fā)現(xiàn)兩個比較眼熟的方法onSubscribe方法和subscribe方法。其中onSubscribe方法在Observer里面看到過,而這里恰好是通過Observer對象來調(diào)用的,沒錯,這個的observer就是在subscribe方法里面new的對象??墒俏覀冇浀?code>onSubscribe方法的參數(shù)類型是Disposable,而這里是一個CreateEmitter。我們來看看CreateEmitter的類結(jié)構(gòu):
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
//······
}
??沒錯,CreateEmitter實現(xiàn)了Disposable接口,所以CreateEmitter本身可以充當(dāng)Disposable的角色。
??調(diào)用了Observer的onSubscribe方法之后,然后就會調(diào)用ObservableOnSubscribe的subscribe方法。
??到這里,我們應(yīng)該徹底的明白了整個Observable的工作流程。我們通過create方法創(chuàng)建一個ObservableCreate方法,然后調(diào)用了subscribe方法來注冊了一個觀察者,在subscribe方法里面又調(diào)用了subscribeActual方法,在subscribeActual方法里面先是調(diào)用了Observer的onSubscribe方法,然后調(diào)用了
ObservableOnSubscribe的subscribe方法,在ObservableOnSubscribe的subscribe方法當(dāng)中,具體的做的事有兩件:1.做我們自己的事情,比如從服務(wù)器上獲取數(shù)據(jù)之類;2.將發(fā)送信息到Observer去。
??理解了整個流程的工作原理,我們現(xiàn)在來看看CreateEmitter是怎么信息發(fā)給Observer的。
4. CreateEmitter的工作原理
??我們知道,我們在ObservableOnSubscribe的subscribe方法里面使用ObservableEmitter來發(fā)射信息到Observer?,F(xiàn)在我們來看看整個CreateEmitter的工作原理,不過,我們還是先來看看這個類的結(jié)構(gòu),雖然上面已經(jīng)看了,但是擔(dān)心大佬們忘了:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
//······
}
??在上面已經(jīng)說了CreateEmitter實現(xiàn)了Disposable接口,可以作為Disposable對象來操作,在接下來,我們將重點介紹Disposable是怎么控制Observer對信息的接收,同時還會介紹CreateEmitter作為ObservableEmitter接口的那部分功能。
??之前在分析基本元素時,已經(jīng)說了ObservableEmitter這個接口,它實現(xiàn)了Emitter接口。在Emitter接口里面有三個方法用來發(fā)送信息給Observer,分別是:onNext,onError,onComplete。而CreateEmitter類則是具體的實現(xiàn)了這三個方法,我們來看看。
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
??代碼是非常的簡單,直接調(diào)用了Observer的onNext方法,也沒用什么高逼格的東西????。其余兩個方法也是如此。只不過是,在調(diào)用onNext方法時做了一個isDisposed的判斷。
??所以感覺Disposable才是這個類的核心。我們來看看isDisposed方法:
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
??在isDisposed方法里面調(diào)用了DisposableHelper的isDisposed方法。不過這里需要注意的是這里傳遞過去的是get方法的返回值,這個返回值什么意思?
??回到CreateEmitter的類結(jié)構(gòu),發(fā)現(xiàn)它繼承了AtomicReference類,所以get方法返回的是一個Disposable對象。
??同時,我們發(fā)現(xiàn)CreateEmitter的dispose方法也是通過DisposableHelper類進(jìn)行進(jìn)行操作的,看看要理解Disposable的功能,必須了解DisposableHelper是怎么操作的。
5.DisposableHelper
??從感官上來說,一個發(fā)射器是否dispose,直接設(shè)置一個boolean類型的flag就OK了,為什么搞得這么復(fù)雜,又是AtomicReference,又是DisposableHelper。這一切,我們從DisposableHelper來尋找答案。
??首先我們還是來看看DisposableHelper的結(jié)構(gòu):
public enum DisposableHelper implements Disposable {
DISPOSED
;
}
??DisposableHelper本身是一個enum類型,同時實現(xiàn)了Disposable接口。這里使用enum主要是為了做一個DISPOSED的單例。然后在通過isDisposed方法來判斷是否dispose,可以直接與DISPOSED比較。
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
??既然判斷是否dispose是直接與DISPOSED比較,那么如果dispose的話,應(yīng)該是將AtomicReference里面的值設(shè)置為DISPOSED吧?我們來看一下dispose方法:
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
??果然,跟我們猜測一樣的,AtomicReference里面的值設(shè)置為DISPOSED。只是,這里為了線程安全,做了很多的判斷操作。
??從這里我們可以得到,為什么需要設(shè)置DisposableHelper來控制dispose的狀態(tài),那是因為線程安全,如果直接設(shè)置一個flag,在有些情況下,可能存在線程不安全的風(fēng)險。同時為了代碼的優(yōu)雅,如果這部分的邏輯寫在CreateEmitter里面,會不會顯得冗雜呢?
6.總結(jié)
??寫到這里,我感覺也差不多了。這里對著部分的知識做一個總結(jié)。
??1.在整個流程中,基本有Observable,ObservableOnSubscribe,ObservableEmitter,Observer,如果想要對整個過程有一個大概的理解,必須對這幾個元素有基本的認(rèn)識。
??2.Observer的onNext之類方法的觸發(fā)時機(jī),實際上是Observable的subscribe方法,因為subscribe方法調(diào)用了Observable的subscribeActual方法,而在subscribeActual方法里面做了兩部分的操作:1.直接調(diào)用了Observer的onSubscribe方法;2.使用ObservableEmitter將Observer包裹起來,所以我們在ObservableOnSubscribe的subscribe方法用ObservableEmitter來發(fā)射信息,相當(dāng)于調(diào)用了Observer的相關(guān)方法。
??3.在ObservableEmitter的onNext之類方法里面,存在一種類似AOP的代碼,因為在調(diào)用Observer的相關(guān)方法,做了一些其他的操作。