//被觀察者
Observable<String> stringObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Jenchar");
subscriber.onCompleted();
}
});
//觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.w(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.w(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.w(TAG, "onNext");
}
};
//訂閱
stringObservable.subscribe(observer);
通過(guò)Observable的create靜態(tài)方法傳入OnSubscribe的實(shí)例,OnSubscribe繼承Action1并且實(shí)現(xiàn)了其call方法,
傳入的泛型為OnSubscribe<String>的泛型,Action1<T>中傳入了Subscriber<T>,泛型作為T(mén)被層層傳遞
public interface Action1<T> extends Action {
void call(T t);
}
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
從Action1<Subscriber<? super T>>這個(gè)接口可以看出泛型已經(jīng)指定為Subscriber<? super T>,這個(gè)Subscriber泛型怎么傳遞進(jìn)來(lái)的呢?
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
//省略
}
return Subscriptions.unsubscribed();
}
}
以上這段源碼為鏈?zhǔn)浇Y(jié)構(gòu)最后的執(zhí)行方法,可以看到subscriber最終是通過(guò) RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)傳入,讓我們看看這個(gè)方法里面怎么操作
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
if (f != null) {
return f.call(instance, onSubscribe);
}
return onSubscribe;
}
最終返回的還是onSubscribe對(duì)象,再調(diào)用繼承了Action1的call方法傳入了該subscriber指定了泛型,形成了觀察者的傳遞,subscriber實(shí)現(xiàn)Observer接口。
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
public abstract class Subscriber<T> implements Observer<T>, Subscription
以下代碼省略
還有一點(diǎn)值得注意,在RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);之前還會(huì)將subscriber轉(zhuǎn)成SafeSubscriber,這一步的主要作用就是保證onComplete和onError方法只執(zhí)行一次,并且在onNext方法中如果trycatch到不在判斷范圍內(nèi)的異常不會(huì)拋出而是會(huì)進(jìn)行捕捉調(diào)用onError終止
@Override
public void onNext(T t) {
try {
if (!done) {
actual.onNext(t);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwOrReport(e, this);
}
}
public static void throwOrReport(Throwable t, Observer<?> o) {
Exceptions.throwIfFatal(t);
o.onError(t);
}
public static void throwIfFatal(Throwable t) {
if (t instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) t;
} else if (t instanceof OnErrorFailedException) {
throw (OnErrorFailedException) t;
} else if (t instanceof OnCompletedFailedException) {
throw (OnCompletedFailedException) t;
}
// values here derived from https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495
else if (t instanceof VirtualMachineError) {
throw (VirtualMachineError) t;
} else if (t instanceof ThreadDeath) {
throw (ThreadDeath) t;
} else if (t instanceof LinkageError) {
throw (LinkageError) t;
}
}