我對(duì)于RxJava的異常處理和上拋方式有一些不解,而上網(wǎng)查找的文章都是RxJava的一些用于處理異常的操作符,所以只能自己去源碼里面找答案了。
雖然RxJava1已經(jīng)過時(shí)了,但是鑒于RxJava1的源碼會(huì)比RxJava2的簡潔一些,因此易于分析。所以我在這里對(duì)RxJava1的源碼進(jìn)行分析。
1 構(gòu)造Observable
1.1 create方式
Observable.create<String> { it: Subscriber<in String> ->
//上游發(fā)射數(shù)據(jù)
it.onNext("123")
it.onCompleted()
}.subscribe { it: String ->
//下游處理數(shù)據(jù)
LogUtils.d(it)
}
這里看兩個(gè)方法:create和subscribe
1.1.1 create
create方法需要OnSubscribe接口作為參數(shù),然后再返回一個(gè)Observable類型的對(duì)象(這個(gè)對(duì)象待會(huì)再調(diào)用subscribe()方法啟動(dòng)數(shù)據(jù)的發(fā)射)。
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
那么首先看:hook.onCreate(f),根據(jù)注釋,這是一個(gè)有裝飾者模式味道的的鉤子方法。
public abstract class RxJavaObservableExecutionHook {
/**
* Invoked during the construction by {@link Observable#create(OnSubscribe)}
* <p>
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
* logging, metrics and other such things and pass-thru the function.
*
* @param f
* original {@link OnSubscribe}<{@code T}> to be executed
* @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
* returned as a pass-thru
*/
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}
}
默認(rèn)情況下,傳進(jìn)來什么就返回什么,即沒有加任何的裝飾的邏輯。
Observable的構(gòu)造函數(shù):將OnSubscribe保存了起來。
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
/**
* Creates an Observable with a Function to execute when it is subscribed to.
* <p>
* <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor,
* unless you specifically have a need for inheritance.
*
* @param f
* {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
*/
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
}
1.1.2 subscribe
public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
//構(gòu)造一個(gè)Subscriber,銜接onNext的方法,并調(diào)用subscribe方法返回Subscription
return subscribe(new Subscriber<T>() {
@Override
public final void onCompleted() {
// do nothing
}
@Override
public final void onError(Throwable e) {
throw new OnErrorNotImplementedException(e);
}
@Override
public final void onNext(T args) {
onNext.call(args);
}
});
}
返回的是Subscription,和RxJava2中的Disposable是一個(gè)東西:用來取消訂閱
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
接著看
public final Subscription subscribe(Subscriber<? super T> subscriber) {
//調(diào)用了靜態(tài)方法:
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
對(duì)這個(gè)subsribe()方法做的事情做一個(gè)總結(jié):
- 對(duì)參數(shù)進(jìn)行驗(yàn)證,保證Not null
- 調(diào)用subscriber.onStart();
- 將
Subscriber封裝成SafeSubscriber,用裝飾者模式包裝了一層,在這一層加了一些額外的邏輯,但是不影響主要邏輯的執(zhí)行,所以這一層的邏輯我們稍后再分析。 - 調(diào)用
OnSubsribe接口的call方法。并捕捉異常,關(guān)于異常捕捉,稍后再分析
而由于調(diào)用到第四點(diǎn)的call方法,call方法就是create方法的參數(shù)傳遞進(jìn)去的代碼塊:
Observable.create<String> { it: Subscriber<in String> ->
//上游發(fā)射數(shù)據(jù)
it.onNext("123")
it.onCompleted()
}.subscribe { it: String ->
//下游處理數(shù)據(jù)
LogUtils.d(it)
}
因此我們調(diào)用onNext傳遞的數(shù)據(jù)就能夠在下游被處理到了。
1.2 just方式
Observable.just("2")
.doOnNext {
LogUtils.d(it)
}
.subscribe {
val s: String? = null
s!!
s.toString()
}
當(dāng)調(diào)用just方法的時(shí)候,就不需要在上游手動(dòng)調(diào)用onNext了,那么一定是RxJava的內(nèi)部調(diào)用了onNext,來看下吧。
public final static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
返回了一個(gè)ScalarSynchronousObservable的create方法:
public final class ScalarSynchronousObservable<T> extends Observable<T> {
public static final <T> ScalarSynchronousObservable<T> create(T t) {
return new ScalarSynchronousObservable<T>(t);
}
private final T t;
protected ScalarSynchronousObservable(final T t) {
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> s) {
/*
* We don't check isUnsubscribed as it is a significant performance impact in the fast-path use cases.
* See PerfBaseline tests and https://github.com/ReactiveX/RxJava/issues/1383 for more information.
* The assumption here is that when asking for a single item we should emit it and not concern ourselves with
* being unsubscribed already. If the Subscriber unsubscribes at 0, they shouldn't have subscribed, or it will
* filter it out (such as take(0)). This prevents us from paying the price on every subscription.
*/
s.onNext(t);
s.onCompleted();
}
});
this.t = t;
}
//...
}
ScalarSynchronousObservable的構(gòu)造方法中傳入的OnSubscribe的實(shí)現(xiàn)中,已經(jīng)調(diào)用了onNext和onCompleted了。
2 SafeSubscriber
事件監(jiān)聽就是選擇性地重寫三個(gè)方法:
void onNext(T t);,void onError(Throwable e);,void onCompleted();。
而這三個(gè)方法的關(guān)系,例如onError和onCompleted有調(diào)用互斥性等,都借由SafeSubscriber類實(shí)現(xiàn):
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//...
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
//...
}
public class SafeSubscriber<T> extends Subscriber<T> {
private final Subscriber<? super T> actual;
//事件流是否結(jié)束
boolean done = false;
//用裝飾者模式,封裝真正的Subscriber在actual變量中
public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}
@Override
public void onCompleted() {
//如果事件流沒有結(jié)束
if (!done) {
done = true;
//將onCompleted用try catch
try {
actual.onCompleted();
} catch (Throwable e) {
//拋出致命異常
Exceptions.throwIfFatal(e);
//調(diào)用內(nèi)部_onError
_onError(e);
} finally {
unsubscribe();
}
}
}
@Override
public void onError(Throwable e) {
//拋出致命異常
Exceptions.throwIfFatal(e);
if (!done) {
done = true;
//調(diào)用內(nèi)部_onError
_onError(e);
}
}
@Override
public void onNext(T args) {
try {
if (!done) {
actual.onNext(args);
}
} catch (Throwable e) {
//拋出致命異常
Exceptions.throwIfFatal(e);
//回調(diào)到onError
onError(e);
}
}
//有兩處會(huì)調(diào)用這里,1. onCompleted。2. onError
protected void _onError(Throwable e) {
try {
//首先調(diào)用RxJavaPlugins中的錯(cuò)誤統(tǒng)一處理
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
} catch (Throwable pluginException) {
//捕捉RxJavaPlugins中的錯(cuò)誤,并打印出來。
handlePluginException(pluginException);
}
try {
//調(diào)用真正的actual的onError
actual.onError(e);
} catch (Throwable e2) {
//補(bǔ)貨onError中的異常
if (e2 instanceof OnErrorNotImplementedException) {
//如果異常是OnErrorNotImplementedException
//unsubscribe
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
}
//將異常拋出
throw (OnErrorNotImplementedException) e2;
} else {
//否則,還是進(jìn)行錯(cuò)誤統(tǒng)一處理
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
//unsubscirbe
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
}
//再將異常拋出
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
}
}
//unsubscribe
try {
unsubscribe();
} catch (RuntimeException unsubscribeException) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
throw new OnErrorFailedException(unsubscribeException);
}
}
private void handlePluginException(Throwable pluginException) {
System.err.println("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException.getMessage());
pluginException.printStackTrace();
}
public Subscriber<? super T> getActual() {
return actual;
}
}
3 RxJava如何處理異常,如何上拋異常
上文的對(duì)SafeSubscriber的分析可以看出RxJava對(duì)處理數(shù)據(jù)下游異常的方式:
- 轉(zhuǎn)到onError將異常拋出。
- 如果onError未實(shí)現(xiàn),那么直接將異常拋出。
- 如果onError實(shí)現(xiàn)了,但是onError中又有異常,那么RxJava又會(huì)將異常拋出。
那么如果在數(shù)據(jù)的上游,即數(shù)據(jù)發(fā)射處就發(fā)生異常了,要如何處理呢:
在Observable的構(gòu)造類的函數(shù)中,最終會(huì)調(diào)用到:
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
看一下上述的這段代碼:
try {
//將call的調(diào)用用try catch保護(hù)起來
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
//拋出致命異常
Exceptions.throwIfFatal(e);
try {
//調(diào)用給onError
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
throw e2;
} catch (Throwable e2) {
RuntimeException r = new RuntimeException("Error occurred attempting to subscrib
hook.onSubscribeError(r);
throw r;
}
//unsubscribe
return Subscriptions.unsubscribed();
}
可以看到:數(shù)據(jù)發(fā)射處也有異常處理:交給觀察者的onError處理,然后處理邏輯就又轉(zhuǎn)交給觀察了。
4 為什么能夠用操作符追加代碼邏輯
4.1 圖和大致流程分析
在進(jìn)行代碼分析之前,先看下這個(gè)大致的調(diào)用流程圖:

這幅圖表現(xiàn)出來的幾個(gè)點(diǎn):
- 在使用RxJava寫調(diào)用鏈代碼的時(shí)候,
onNext等代碼塊是從上往下執(zhí)行的,但是每當(dāng)往調(diào)用鏈上拼接一個(gè)RxJava的處理方法例如:doOnNext或者map的時(shí)候,都會(huì)生成一個(gè)新的Subscriber,而當(dāng)調(diào)用最底部的subscribe()方法的時(shí)候,調(diào)用鏈上每一個(gè)Subscriber中的OnSubscribe.call()方法實(shí)際上是從下往上調(diào)用的。 - 在圖中,
Observable3是最終的觀察者創(chuàng)建的對(duì)象。當(dāng)調(diào)用subscribe方法的時(shí)候,由Observable.subscribe()的源碼開始,要調(diào)用Observable2的call_2()方法,而call_2()方法的邏輯要參考Observable#lift()方法的邏輯,他會(huì)將Observable2的onNext()等代碼塊保存并拼接在Observable3的onNext之前。然后又調(diào)用Observable1的call_1()方法...一直這么重復(fù)著往上調(diào)用各個(gè)鏈上的call方法。最后,調(diào)用到頂部的數(shù)據(jù)發(fā)起處的函數(shù):Observable.create,他的call()方法使我們寫好的:it.onNext("123")...(或者Observable.just等構(gòu)造方法,里面內(nèi)定了如何調(diào)用onNext方法來發(fā)射數(shù)據(jù))。此時(shí),頂部開始發(fā)射數(shù)據(jù)。此時(shí),遇到他的觀察者:Observable1的onNext(),那么執(zhí)行內(nèi)部的邏輯,并調(diào)用了Observable2的onNext(),然后執(zhí)行后者內(nèi)部的邏輯,然后又調(diào)用Observable3的...,就這樣把數(shù)據(jù)不斷地往調(diào)用鏈下部調(diào)用,最終到達(dá)底部的觀察者的代碼塊。 - 大的邏輯是:從上往下增加每一個(gè)操作符,就會(huì)構(gòu)造一個(gè)
Subscriber,然后在最后調(diào)用subscribe()方法的時(shí)候,遞歸上去一個(gè)一個(gè)地調(diào)用call方法,最終到頂部的onNext,再遞歸下來,一個(gè)一個(gè)地調(diào)用開發(fā)者調(diào)用每個(gè)操作符時(shí)加入的邏輯。
那么先來從簡單的開始看好了:
Observable.create<String> { it: Subscriber<in String> ->
it.onNext("123")
it.onCompleted()
}.subscribe { it: String ->
LogUtils.d(it)
}
這種類型的流程上面已經(jīng)分析過了,非常簡單,就是在subscribe()方法調(diào)用的時(shí)候,調(diào)用call方法里面的:
it.onNext("123")
it.onCompleted()
然后自然地?cái)?shù)據(jù)就發(fā)送到了下游了。
上述是沒有添加任何操作符的情況,那么如果添加操作符了呢?例如添加一個(gè)doOnNext()
Observable
.create<String> { it: Subscriber<in String> ->
it.onNext("123")
it.onCompleted()
}
.doOnNext {
LogUtils.d("doOnNext=$it")
}
.subscribe { it: String ->
LogUtils.d(it)
}
這里我們看一下doOnNext中的代碼塊是如何追加到調(diào)用鏈上的。
看實(shí)現(xiàn):
public final Observable<T> doOnNext(final Action1<? super T> onNext) {
Observer<T> observer = new Observer<T>() {
@Override
public final void onCompleted() {
}
@Override
public final void onError(Throwable e) {
}
@Override
public final void onNext(T args) {
onNext.call(args);
}
};
//上述代碼是將onNext封裝到了一個(gè)Observer里面。
return lift(new OperatorDoOnEach<T>(observer));
}
這個(gè)封裝過的observer,作為OperatorDoOnEach類的構(gòu)造器的參數(shù)被傳遞進(jìn)去,然后又作為lift()方法被調(diào)用,并返回一個(gè)Observable類型。(是的,因?yàn)檫@個(gè)操作符是可以直接調(diào)用subscribe()的)
4.2 OperatorDoOnEach類型
public class OperatorDoOnEach<T> implements Operator<T, T>
他的父類型是Operator:
/**
* Operator function for lifting into an Observable.
*/
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
Operator實(shí)現(xiàn)了Func1接口:
public interface Func1<T, R> extends Function {
R call(T t);
}
Func1接口的作用是:轉(zhuǎn)化
調(diào)用call方法的時(shí)候:輸入T,返回R。
那么Operator的作用也可以說是:轉(zhuǎn)化。但是他是Func1<Subscriber<? super R>, Subscriber<? super T>>,因此他的轉(zhuǎn)化是:輸入一個(gè)觀察者T,返回另一個(gè)觀察者R。
那么我們也可以說Operator的作用是:給原有的觀察者添加額外的邏輯。
那么說具體點(diǎn):客戶端的調(diào)用是:
Operator concreteOperator ;
SubscriberB = concreteOperator.call(SubscriberA);
即獲取到Operator接口,然后調(diào)用call方法,進(jìn)行轉(zhuǎn)換。
而doOnNext()方法用的是OperatorDoOnEach:
public class OperatorDoOnEach<T> implements Operator<T, T> {
private final Observer<? super T> doOnEachObserver;
//構(gòu)造方法中,保存了一個(gè)觀察者,稱為doOnEachObserver
public OperatorDoOnEach(Observer<? super T> doOnEachObserver) {
this.doOnEachObserver = doOnEachObserver;
}
//調(diào)用call方法,開始轉(zhuǎn)換。call方法返回的新的觀察者的每個(gè)實(shí)現(xiàn),都是在參數(shù)observer的方法之前
//拼接上構(gòu)造函數(shù)的doOnEachObserver的對(duì)應(yīng)的方法。
@Override
public Subscriber<? super T> call(final Subscriber<? super T> observer) {
//傳入的是observer
return new Subscriber<T>(observer) {
private boolean done = false;
@Override
public void onCompleted() {
if (done) {
return;
}
//先調(diào)用doOnEachObserver.onCompleted()
try {
doOnEachObserver.onCompleted();
} catch (Throwable e) {
onError(e);
return;
}
// Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
done = true;
//再調(diào)用observer.onCompleted()
observer.onCompleted();
}
@Override
public void onError(Throwable e) {
// need to throwIfFatal since we swallow errors after terminated
Exceptions.throwIfFatal(e);
if (done) {
return;
}
done = true;
//先調(diào)用doOnEachObserver.onError
try {
doOnEachObserver.onError(e);
} catch (Throwable e2) {
observer.onError(e2);
return;
}
//再調(diào)用observer.onError
observer.onError(e);
}
@Override
public void onNext(T value) {
if (done) {
return;
}
//先調(diào)用doOnEachObserver.onNext
try {
doOnEachObserver.onNext(value);
} catch (Throwable e) {
onError(OnErrorThrowable.addValueAsLastCause(e, value));
return;
}
//再調(diào)用observer.onNext
observer.onNext(value);
}
};
}
}
分析完了OperatorDoOnEach的具體實(shí)現(xiàn),接下來要看下他的call方法是如何被調(diào)用的:
4.3 lift()方法
接著看下doOnNext()
public final Observable<T> doOnNext(final Action1<? super T> onNext) {
Observer<T> observer = new Observer<T>() {
@Override
public final void onCompleted() {
}
@Override
public final void onError(Throwable e) {
}
@Override
public final void onNext(T args) {
onNext.call(args);
}
};
//上述代碼是將onNext封裝到了一個(gè)Observer里面。
return lift(new OperatorDoOnEach<T>(observer));
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
//這個(gè)call會(huì)被上層調(diào)用
@Override
public void call(Subscriber<? super R> o) {
//這個(gè)o是上游調(diào)用這個(gè)return new Observable返回的觀察者中的OnSubscribe的call方法傳遞下來的
//觀察者,在本例中,由于onNext之前就是Observable.create,因此o中的call方法就是:
// {
// it.onNext("123");
// it.onCompleted();
// }
try {
//調(diào)用call,將o轉(zhuǎn)換成st。
//st中的call方法的邏輯參照著OperatorDoOnEach的邏輯就是:將operator的調(diào)用邏輯追加在o的調(diào)用邏輯之前。
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
//繼續(xù)調(diào)用call方法
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
st.onError(e);
}
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
注意,onLift方法是一個(gè)全局鉤子。
public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> lift) {
//默認(rèn)實(shí)現(xiàn)是啥都不處理直接返回。
return lift;
}
5 常用操作符源碼分析
5.1 filter
過濾
public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
//調(diào)用lift
return lift(new OperatorFilter<T>(predicate));
}
public final class OperatorFilter<T> implements Operator<T, T> {
private final Func1<? super T, Boolean> predicate;
public OperatorFilter(Func1<? super T, Boolean> predicate) {
this.predicate = predicate;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
return new Subscriber<T>(child) {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(T t) {
try {
//如果call方法返回true,才繼續(xù)將數(shù)據(jù)向下傳遞
if (predicate.call(t)) {
child.onNext(t);
} else {
// TODO consider a more complicated version that batches these
request(1);
}
} catch (Throwable e) {
child.onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}
};
}
}
5.2 map
映射
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
//transformer就是映射,映射后的數(shù)據(jù)將繼續(xù)向下游發(fā)射。
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}
};
}
}