前言
我們經(jīng)常看RxJava的文章,很多都是API性的介紹.今天我們就用一段來(lái)理解它吧,了解它的內(nèi)幕
本文編譯需要:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
public static void debug() {
1. Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
2. @Override
3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
4. emitter.onNext(1);
5. }
6. });
7. Consumer<Integer> consumer = new Consumer<Integer>() {
8. @Override
9. public void accept(Integer integer) throws Exception {
10. Log.d(TAG, "accept: " + integer);
11. }
12. };
13. observable.subscribe(consumer);
}
這篇文章的目的就是分析這段代碼,去挖掘RxJava到底做了什么
讓我們開(kāi)始吧
我們都知道RxJava最重要的東西就是Obervable(被觀察者)和Observer(觀察者), 或者針對(duì)事件來(lái)說(shuō)就是上游和下游, 上游發(fā)送事件下游處理事件. 為了便于分析,我把代碼標(biāo)了行號(hào),便于引用.
粗略的分析上面這段代碼就是一個(gè)Observable訂閱了一個(gè)Consumer
弄清Obervable的來(lái)源
來(lái)看Observable.create() 方法的原型:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
它的參數(shù)是ObservableOnSubscribe類型,它只有一個(gè)方法待實(shí)現(xiàn):
void subscribe(ObservableEmitter<T> e) throws Exception;
為什么需要ObservableEmitter形參呢.我們不用管,因?yàn)楸籲ew進(jìn)ObservableCreate構(gòu)造函數(shù)里去了,肯定是回調(diào)作用,這在我們平時(shí)寫(xiě)代碼時(shí)見(jiàn)多了.實(shí)際上我們點(diǎn)進(jìn)ObservableCreate里看到ObservableEmitter是一個(gè)內(nèi)部類, 那問(wèn)題是RxJavaPlugins.onAssembly()又是干嘛的,
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
可以看到把onObservableAssembly賦給f, onObservableAssembly是什么 , 其實(shí)就是一個(gè)屬性而已, 含有g(shù)etter和setter.而我們也沒(méi)看到調(diào)用set函數(shù),所以該函數(shù)就直接返回了source,所以我們得到最終的Observable就是ObservableCreate對(duì)象.為什么用這個(gè)代替Observabel呢,看下類聲明
public final class ObservableCreate<T> extends Observable<T> {
...
}
原來(lái)就是Observable的子類.
到了這里,終于清楚了Observable的來(lái)龍去脈.
弄清Observer的來(lái)源
我們看到這片代碼用的是Consumer,它跟Observer有什么關(guān)系呢. 看下它的聲明
public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/
void accept(T t) throws Exception;
}
這不就是一個(gè)普普通通的接口嘛,實(shí)現(xiàn)accept的方法就ok了,是的,consumer就是這么簡(jiǎn)單
是時(shí)候讓Observable和Consumer發(fā)生關(guān)聯(lián)了
只剩下observable.subscribe(consumer); 這句了, 顧名思義,被觀察者訂閱了消費(fèi)者(觀察者),我們跟進(jìn)去看看代碼怎么寫(xiě)的
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
它調(diào)用了subscribe方法,用了我們傳進(jìn)去的consumer對(duì)象即onNext,而后面三個(gè)參數(shù)是系統(tǒng)提供默認(rèn)的實(shí)現(xiàn),這也說(shuō)明,肯定還能傳進(jìn)我們自己想要的參數(shù),先不管.繼續(xù)跟:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
這片代碼先是進(jìn)行了一些非空判斷,然后把我們的onNext轉(zhuǎn)換成LambdaObserver對(duì)象了,LambdaObserver是什么呢:
public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
...
}
它實(shí)現(xiàn)了Observer, 到了這里,終于明白了,原來(lái)我們傳進(jìn)來(lái)的Consumer被處理成LambdaObserver對(duì)象了,繼續(xù)跟進(jìn)subscribe(ls);
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
記住,我們手里現(xiàn)在握著的Observable對(duì)象是什么,Observer是什么, 當(dāng)然是ObservableCreate和LambdaObserver的對(duì)象.
我們又看到了RxJavaPlugins的調(diào)用,跟我們上一步講的onAssembly是類似的,直接返回了observer.往下就是一個(gè)非空的判斷了.接著是subscribeActual方法的調(diào)用.
看下改方法的聲明:
protected abstract void subscribeActual(Observer<? super T> observer);
它是Observable類的一個(gè)抽象方法,問(wèn)題是誰(shuí)來(lái)實(shí)現(xiàn)它,就是ObservableCreate了.因?yàn)槭撬麑?shí)現(xiàn)了Observable.
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
這是ObservableCreate的代碼片段.看到它的構(gòu)造函數(shù)沒(méi),參數(shù)source不就是我們開(kāi)始傳進(jìn)來(lái)的嘛.
繼續(xù)看subscribeActual方法, 第一行用我們的LambdaObserver對(duì)象為參,構(gòu)造了CreateEmitter對(duì)象,繼續(xù)跟進(jìn). 因?yàn)閛bserver被LambdaObserver實(shí)現(xiàn)了,所以這里當(dāng)然跳進(jìn)它的onSubscribe方法
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this, s)) {
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
onError(ex);
}
}
}
這里先是用一個(gè)幫助類判斷了一下,下一步的onSubscribe是什么呢,我們看到它的構(gòu)造函數(shù),就是我們之前講的系統(tǒng)給我們提供的默認(rèn)實(shí)現(xiàn),默認(rèn)實(shí)現(xiàn)是空實(shí)現(xiàn).所以這個(gè)方法我們認(rèn)為什么都沒(méi)做.但是,如果onSubscribe不是系統(tǒng)提供的呢,那么它的執(zhí)行時(shí)間點(diǎn)是比onNext早的.
我們繼續(xù)跟進(jìn)subscribeActual方法.現(xiàn)在要處理try塊了
source.subscribe(parent);
注意這個(gè)source是ObservableOnSubscribe對(duì)象.這個(gè)對(duì)象又是啥呢?這就不是我們一開(kāi)始new出來(lái)的嘛
... new ObservableOnSubscribe<Integer>() {
2. @Override
3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
4. emitter.onNext(1);
5. }
6. }
參數(shù)剛好也是ObservableEmitter, 第4行的emitter參數(shù)就是
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
傳進(jìn)去的. 緊跟著emitter.onNext(1);就是parent對(duì)象的事了
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
...
還記得吧,這里的observer就是LambdaObserver對(duì)象也就是一開(kāi)始的Consumer了.
isDisposed()是什么,表示是否取消被觀察者和觀察者的關(guān)聯(lián),我們一路跟進(jìn)來(lái),沒(méi)有關(guān)于它的操作,肯定返回true
然后就是observer.onNext(t);即回調(diào)了Consumer的onNext方法.
結(jié)束
到了這里一個(gè)簡(jiǎn)單的流程終于完成了,謝謝大家!