RxJava1源碼分析

我對(duì)于RxJava的異常處理和上拋方式有一些不解,而上網(wǎng)查找的文章都是RxJava的一些用于處理異常的操作符,所以只能自己去源碼里面找答案了。

雖然RxJava1已經(jīng)過時(shí)了,但是鑒于RxJava1的源碼會(huì)比RxJava2的簡潔一些,因此易于分析。所以我在這里對(duì)RxJava1的源碼進(jìn)行分析。

1 構(gòu)造Observable

1.1 create方式

Observable.create<String> { it: Subscriber<in String> ->
    //上游發(fā)射數(shù)據(jù)
    it.onNext("123")
    it.onCompleted()
}.subscribe { it: String ->
    //下游處理數(shù)據(jù)
    LogUtils.d(it)
}

這里看兩個(gè)方法:create和subscribe

1.1.1 create

create方法需要OnSubscribe接口作為參數(shù),然后再返回一個(gè)Observable類型的對(duì)象(這個(gè)對(duì)象待會(huì)再調(diào)用subscribe()方法啟動(dòng)數(shù)據(jù)的發(fā)射)。

public final static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}

那么首先看:hook.onCreate(f),根據(jù)注釋,這是一個(gè)有裝飾者模式味道的的鉤子方法。

public abstract class RxJavaObservableExecutionHook {
     /**
     * Invoked during the construction by {@link Observable#create(OnSubscribe)}
     * <p>
     * This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
     * logging, metrics and other such things and pass-thru the function.
     * 
     * @param f
     *            original {@link OnSubscribe}<{@code T}> to be executed
     * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
     *         returned as a pass-thru
     */
    public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
        return f;
    }
}

默認(rèn)情況下,傳進(jìn)來什么就返回什么,即沒有加任何的裝飾的邏輯。

Observable的構(gòu)造函數(shù):將OnSubscribe保存了起來。

public class Observable<T> {
    final OnSubscribe<T> onSubscribe;

    /**
     * Creates an Observable with a Function to execute when it is subscribed to.
     * <p>
     * <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor,
     * unless you specifically have a need for inheritance.
     * 
     * @param f
     *            {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
     */
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
}
1.1.2 subscribe
public final Subscription subscribe(final Action1<? super T> onNext) {
    if (onNext == null) {
        throw new IllegalArgumentException("onNext can not be null");
    }
    //構(gòu)造一個(gè)Subscriber,銜接onNext的方法,并調(diào)用subscribe方法返回Subscription
    return subscribe(new Subscriber<T>() {
        @Override
        public final void onCompleted() {
            // do nothing
        }
        @Override
        public final void onError(Throwable e) {
            throw new OnErrorNotImplementedException(e);
        }
        @Override
        public final void onNext(T args) {
            onNext.call(args);
        }
    });
}

返回的是Subscription,和RxJava2中的Disposable是一個(gè)東西:用來取消訂閱

public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}

接著看

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    //調(diào)用了靜態(tài)方法:
    return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    // validate and proceed
    if (subscriber == null) {
        throw new IllegalArgumentException("observer can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
        /*
         * the subscribe function can also be overridden but generally that's not the appropriate approach
         * so I won't mention that in the exception
         */
    }
    
    // new Subscriber so onStart it
    subscriber.onStart();
    
    /*
     * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
     * to user code from within an Observer"
     */
    // if not already wrapped
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }
    // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
    try {
        // allow the hook to intercept and/or decorate
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        // special handling for certain Throwable/Error/Exception types
        Exceptions.throwIfFatal(e);
        // if an unhandled error occurs executing the onSubscribe we will propagate it
        try {
            subscriber.onError(hook.onSubscribeError(e));
        } catch (OnErrorNotImplementedException e2) {
            // special handling when onError is not implemented ... we just rethrow
            throw e2;
        } catch (Throwable e2) {
            // if this happens it means the onError itself failed (perhaps an invalid function implementation)
            // so we are unable to propagate the error correctly and will just throw
            RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
            // TODO could the hook be the cause of the error in the on error handling.
            hook.onSubscribeError(r);
            // TODO why aren't we throwing the hook's return value.
            throw r;
        }
        return Subscriptions.unsubscribed();
    }
}

對(duì)這個(gè)subsribe()方法做的事情做一個(gè)總結(jié):

  1. 對(duì)參數(shù)進(jìn)行驗(yàn)證,保證Not null
  2. 調(diào)用subscriber.onStart();
  3. Subscriber封裝成SafeSubscriber,用裝飾者模式包裝了一層,在這一層加了一些額外的邏輯,但是不影響主要邏輯的執(zhí)行,所以這一層的邏輯我們稍后再分析。
  4. 調(diào)用OnSubsribe接口的call方法。并捕捉異常,關(guān)于異常捕捉,稍后再分析

而由于調(diào)用到第四點(diǎn)的call方法,call方法就是create方法的參數(shù)傳遞進(jìn)去的代碼塊:

Observable.create<String> { it: Subscriber<in String> ->
    //上游發(fā)射數(shù)據(jù)
    it.onNext("123")
    it.onCompleted()
}.subscribe { it: String ->
    //下游處理數(shù)據(jù)
    LogUtils.d(it)
}

因此我們調(diào)用onNext傳遞的數(shù)據(jù)就能夠在下游被處理到了。

1.2 just方式

Observable.just("2")
    .doOnNext {
        LogUtils.d(it)
    }
    .subscribe {
        val s: String? = null
        s!!
        s.toString()
    }

當(dāng)調(diào)用just方法的時(shí)候,就不需要在上游手動(dòng)調(diào)用onNext了,那么一定是RxJava的內(nèi)部調(diào)用了onNext,來看下吧。

public final static <T> Observable<T> just(final T value) {
    return ScalarSynchronousObservable.create(value);
}

返回了一個(gè)ScalarSynchronousObservable的create方法:

public final class ScalarSynchronousObservable<T> extends Observable<T> {

    public static final <T> ScalarSynchronousObservable<T> create(T t) {
        return new ScalarSynchronousObservable<T>(t);
    }

    private final T t;

    protected ScalarSynchronousObservable(final T t) {
        super(new OnSubscribe<T>() {

            @Override
            public void call(Subscriber<? super T> s) {
                /*
                 *  We don't check isUnsubscribed as it is a significant performance impact in the fast-path use cases.
                 *  See PerfBaseline tests and https://github.com/ReactiveX/RxJava/issues/1383 for more information.
                 *  The assumption here is that when asking for a single item we should emit it and not concern ourselves with 
                 *  being unsubscribed already. If the Subscriber unsubscribes at 0, they shouldn't have subscribed, or it will 
                 *  filter it out (such as take(0)). This prevents us from paying the price on every subscription. 
                 */
                s.onNext(t);
                s.onCompleted();
            }

        });
        this.t = t;
    }
    //...
}

ScalarSynchronousObservable的構(gòu)造方法中傳入的OnSubscribe的實(shí)現(xiàn)中,已經(jīng)調(diào)用了onNextonCompleted了。

2 SafeSubscriber

事件監(jiān)聽就是選擇性地重寫三個(gè)方法:

void onNext(T t);,void onError(Throwable e);,void onCompleted();

而這三個(gè)方法的關(guān)系,例如onError和onCompleted有調(diào)用互斥性等,都借由SafeSubscriber類實(shí)現(xiàn):

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        //...
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }
        //...
    }
public class SafeSubscriber<T> extends Subscriber<T> {

    private final Subscriber<? super T> actual;

    //事件流是否結(jié)束
    boolean done = false;
    //用裝飾者模式,封裝真正的Subscriber在actual變量中
    public SafeSubscriber(Subscriber<? super T> actual) {
        super(actual);
        this.actual = actual;
    }

    @Override
    public void onCompleted() {
        //如果事件流沒有結(jié)束
        if (!done) {
            done = true;
            //將onCompleted用try catch
            try {
                actual.onCompleted();
            } catch (Throwable e) {
                //拋出致命異常
                Exceptions.throwIfFatal(e);
                //調(diào)用內(nèi)部_onError
                _onError(e);
            } finally {
                unsubscribe();
            }
        }
    }

    @Override
    public void onError(Throwable e) {
        //拋出致命異常
        Exceptions.throwIfFatal(e);
        if (!done) {
            done = true;
            //調(diào)用內(nèi)部_onError
            _onError(e);
        }
    }

    @Override
    public void onNext(T args) {
        try {
            if (!done) {
                actual.onNext(args);
            }
        } catch (Throwable e) {
            //拋出致命異常
            Exceptions.throwIfFatal(e);
            //回調(diào)到onError
            onError(e);
        }
    }

    //有兩處會(huì)調(diào)用這里,1. onCompleted。2. onError
    protected void _onError(Throwable e) {
        try {
            //首先調(diào)用RxJavaPlugins中的錯(cuò)誤統(tǒng)一處理
            RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
        } catch (Throwable pluginException) {
            //捕捉RxJavaPlugins中的錯(cuò)誤,并打印出來。
            handlePluginException(pluginException);
        }
        try {
            //調(diào)用真正的actual的onError
            actual.onError(e);
        } catch (Throwable e2) {
            //補(bǔ)貨onError中的異常
            if (e2 instanceof OnErrorNotImplementedException) {
                //如果異常是OnErrorNotImplementedException
                //unsubscribe
                try {
                    unsubscribe();
                } catch (Throwable unsubscribeException) {
                    try {
                        RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
                    } catch (Throwable pluginException) {
                        handlePluginException(pluginException);
                    }
                    throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
                }
                //將異常拋出
                throw (OnErrorNotImplementedException) e2;
            } else {
                //否則,還是進(jìn)行錯(cuò)誤統(tǒng)一處理
                try {
                    RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
                } catch (Throwable pluginException) {
                    handlePluginException(pluginException);
                }
                //unsubscirbe
                try {
                    unsubscribe();
                } catch (Throwable unsubscribeException) {
                    try {
                        RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
                    } catch (Throwable pluginException) {
                        handlePluginException(pluginException);
                    }
                    throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
                }
                //再將異常拋出
                throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
            }
        }
        //unsubscribe
        try {
            unsubscribe();
        } catch (RuntimeException unsubscribeException) {
            try {
                RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
            } catch (Throwable pluginException) {
                handlePluginException(pluginException);
            }
            throw new OnErrorFailedException(unsubscribeException);
        }
    }

    private void handlePluginException(Throwable pluginException) {
        System.err.println("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException.getMessage());
        pluginException.printStackTrace();
    }
    
    public Subscriber<? super T> getActual() {
        return actual;
    }
}

3 RxJava如何處理異常,如何上拋異常

上文的對(duì)SafeSubscriber的分析可以看出RxJava對(duì)處理數(shù)據(jù)下游異常的方式:

  1. 轉(zhuǎn)到onError將異常拋出。
  2. 如果onError未實(shí)現(xiàn),那么直接將異常拋出。
  3. 如果onError實(shí)現(xiàn)了,但是onError中又有異常,那么RxJava又會(huì)將異常拋出。

那么如果在數(shù)據(jù)的上游,即數(shù)據(jù)發(fā)射處就發(fā)生異常了,要如何處理呢:

在Observable的構(gòu)造類的函數(shù)中,最終會(huì)調(diào)用到:

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
 // validate and proceed
    if (subscriber == null) {
        throw new IllegalArgumentException("observer can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
        /*
         * the subscribe function can also be overridden but generally that's not the appropriate approach
         * so I won't mention that in the exception
         */
    }
    
    // new Subscriber so onStart it
    subscriber.onStart();
    
    /*
     * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
     * to user code from within an Observer"
     */
    // if not already wrapped
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }
    // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
    try {
        // allow the hook to intercept and/or decorate
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        // special handling for certain Throwable/Error/Exception types
        Exceptions.throwIfFatal(e);
        // if an unhandled error occurs executing the onSubscribe we will propagate it
        try {
            subscriber.onError(hook.onSubscribeError(e));
        } catch (OnErrorNotImplementedException e2) {
            // special handling when onError is not implemented ... we just rethrow
            throw e2;
        } catch (Throwable e2) {
            // if this happens it means the onError itself failed (perhaps an invalid function implementation)
            // so we are unable to propagate the error correctly and will just throw
            RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
            // TODO could the hook be the cause of the error in the on error handling.
            hook.onSubscribeError(r);
            // TODO why aren't we throwing the hook's return value.
            throw r;
        }
        return Subscriptions.unsubscribed();
    }
}

看一下上述的這段代碼:

try {
    //將call的調(diào)用用try catch保護(hù)起來
    hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
    return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
    //拋出致命異常
    Exceptions.throwIfFatal(e);
    try {
        //調(diào)用給onError
        subscriber.onError(hook.onSubscribeError(e));
    } catch (OnErrorNotImplementedException e2) {
        throw e2;
    } catch (Throwable e2) {
        RuntimeException r = new RuntimeException("Error occurred attempting to subscrib
        hook.onSubscribeError(r);
        throw r;
    }
    //unsubscribe
    return Subscriptions.unsubscribed();
}

可以看到:數(shù)據(jù)發(fā)射處也有異常處理:交給觀察者的onError處理,然后處理邏輯就又轉(zhuǎn)交給觀察了。

4 為什么能夠用操作符追加代碼邏輯

4.1 圖和大致流程分析

在進(jìn)行代碼分析之前,先看下這個(gè)大致的調(diào)用流程圖:

這幅圖表現(xiàn)出來的幾個(gè)點(diǎn):

  1. 在使用RxJava寫調(diào)用鏈代碼的時(shí)候,onNext等代碼塊是從上往下執(zhí)行的,但是每當(dāng)往調(diào)用鏈上拼接一個(gè)RxJava的處理方法例如:doOnNext或者map的時(shí)候,都會(huì)生成一個(gè)新的Subscriber,而當(dāng)調(diào)用最底部的subscribe()方法的時(shí)候,調(diào)用鏈上每一個(gè)Subscriber中的OnSubscribe.call()方法實(shí)際上是從下往上調(diào)用的。
  2. 在圖中,Observable3是最終的觀察者創(chuàng)建的對(duì)象。當(dāng)調(diào)用subscribe方法的時(shí)候,由Observable.subscribe()的源碼開始,要調(diào)用Observable2call_2()方法,而call_2()方法的邏輯要參考Observable#lift()方法的邏輯,他會(huì)將Observable2onNext()等代碼塊保存并拼接在Observable3onNext之前。然后又調(diào)用Observable1call_1()方法...一直這么重復(fù)著往上調(diào)用各個(gè)鏈上的call方法。最后,調(diào)用到頂部的數(shù)據(jù)發(fā)起處的函數(shù):Observable.create,他的call()方法使我們寫好的:it.onNext("123")...(或者Observable.just等構(gòu)造方法,里面內(nèi)定了如何調(diào)用onNext方法來發(fā)射數(shù)據(jù))。此時(shí),頂部開始發(fā)射數(shù)據(jù)。此時(shí),遇到他的觀察者:Observable1onNext(),那么執(zhí)行內(nèi)部的邏輯,并調(diào)用了Observable2onNext(),然后執(zhí)行后者內(nèi)部的邏輯,然后又調(diào)用Observable3的...,就這樣把數(shù)據(jù)不斷地往調(diào)用鏈下部調(diào)用,最終到達(dá)底部的觀察者的代碼塊。
  3. 大的邏輯是:從上往下增加每一個(gè)操作符,就會(huì)構(gòu)造一個(gè)Subscriber,然后在最后調(diào)用subscribe()方法的時(shí)候,遞歸上去一個(gè)一個(gè)地調(diào)用call方法,最終到頂部的onNext,再遞歸下來,一個(gè)一個(gè)地調(diào)用開發(fā)者調(diào)用每個(gè)操作符時(shí)加入的邏輯。

那么先來從簡單的開始看好了:

Observable.create<String> { it: Subscriber<in String> ->
    it.onNext("123")
    it.onCompleted()
}.subscribe { it: String ->
    LogUtils.d(it)
}

這種類型的流程上面已經(jīng)分析過了,非常簡單,就是在subscribe()方法調(diào)用的時(shí)候,調(diào)用call方法里面的:

    it.onNext("123")
    it.onCompleted()

然后自然地?cái)?shù)據(jù)就發(fā)送到了下游了。

上述是沒有添加任何操作符的情況,那么如果添加操作符了呢?例如添加一個(gè)doOnNext()

Observable
    .create<String> { it: Subscriber<in String> ->
        it.onNext("123")
        it.onCompleted()
    }
    .doOnNext {
        LogUtils.d("doOnNext=$it")
    }
    .subscribe { it: String ->
        LogUtils.d(it)
    }

這里我們看一下doOnNext中的代碼塊是如何追加到調(diào)用鏈上的。

看實(shí)現(xiàn):

public final Observable<T> doOnNext(final Action1<? super T> onNext) {
    Observer<T> observer = new Observer<T>() {
        @Override
        public final void onCompleted() {
        }
        @Override
        public final void onError(Throwable e) {
        }
        @Override
        public final void onNext(T args) {
            onNext.call(args);
        }
    };
    //上述代碼是將onNext封裝到了一個(gè)Observer里面。
    return lift(new OperatorDoOnEach<T>(observer));
}

這個(gè)封裝過的observer,作為OperatorDoOnEach類的構(gòu)造器的參數(shù)被傳遞進(jìn)去,然后又作為lift()方法被調(diào)用,并返回一個(gè)Observable類型。(是的,因?yàn)檫@個(gè)操作符是可以直接調(diào)用subscribe()的)

4.2 OperatorDoOnEach類型

public class OperatorDoOnEach<T> implements Operator<T, T>

他的父類型是Operator:

/**
 * Operator function for lifting into an Observable.
 */
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
    // cover for generics insanity
}

Operator實(shí)現(xiàn)了Func1接口:

public interface Func1<T, R> extends Function {
    R call(T t);
}

Func1接口的作用是:轉(zhuǎn)化

調(diào)用call方法的時(shí)候:輸入T,返回R。

那么Operator的作用也可以說是:轉(zhuǎn)化。但是他是Func1<Subscriber<? super R>, Subscriber<? super T>>,因此他的轉(zhuǎn)化是:輸入一個(gè)觀察者T,返回另一個(gè)觀察者R。

那么我們也可以說Operator的作用是:給原有的觀察者添加額外的邏輯。

那么說具體點(diǎn):客戶端的調(diào)用是:

Operator concreteOperator ;
SubscriberB = concreteOperator.call(SubscriberA);

即獲取到Operator接口,然后調(diào)用call方法,進(jìn)行轉(zhuǎn)換。

doOnNext()方法用的是OperatorDoOnEach

public class OperatorDoOnEach<T> implements Operator<T, T> {
    private final Observer<? super T> doOnEachObserver;

    //構(gòu)造方法中,保存了一個(gè)觀察者,稱為doOnEachObserver
    public OperatorDoOnEach(Observer<? super T> doOnEachObserver) {
        this.doOnEachObserver = doOnEachObserver;
    }

    //調(diào)用call方法,開始轉(zhuǎn)換。call方法返回的新的觀察者的每個(gè)實(shí)現(xiàn),都是在參數(shù)observer的方法之前
    //拼接上構(gòu)造函數(shù)的doOnEachObserver的對(duì)應(yīng)的方法。
    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> observer) {
        //傳入的是observer
        return new Subscriber<T>(observer) {

            private boolean done = false;

            @Override
            public void onCompleted() {
                if (done) {
                    return;
                }
                //先調(diào)用doOnEachObserver.onCompleted()
                try {
                    doOnEachObserver.onCompleted();
                } catch (Throwable e) {
                    onError(e);
                    return;
                }
                // Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
                done = true;
                //再調(diào)用observer.onCompleted()
                observer.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                // need to throwIfFatal since we swallow errors after terminated
                Exceptions.throwIfFatal(e);
                if (done) {
                    return;
                }
                done = true;
                //先調(diào)用doOnEachObserver.onError
                try {
                    doOnEachObserver.onError(e);
                } catch (Throwable e2) {
                    observer.onError(e2);
                    return;
                }
                //再調(diào)用observer.onError
                observer.onError(e);
            }

            @Override
            public void onNext(T value) {
                if (done) {
                    return;
                }
                //先調(diào)用doOnEachObserver.onNext
                try {
                    doOnEachObserver.onNext(value);
                } catch (Throwable e) {
                    onError(OnErrorThrowable.addValueAsLastCause(e, value));
                    return;
                }
                //再調(diào)用observer.onNext
                observer.onNext(value);
            }
        };
    }
}

分析完了OperatorDoOnEach的具體實(shí)現(xiàn),接下來要看下他的call方法是如何被調(diào)用的:

4.3 lift()方法

接著看下doOnNext()

public final Observable<T> doOnNext(final Action1<? super T> onNext) {
    Observer<T> observer = new Observer<T>() {
        @Override
        public final void onCompleted() {
        }
        @Override
        public final void onError(Throwable e) {
        }
        @Override
        public final void onNext(T args) {
            onNext.call(args);
        }
    };
    //上述代碼是將onNext封裝到了一個(gè)Observer里面。
    return lift(new OperatorDoOnEach<T>(observer));
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        //這個(gè)call會(huì)被上層調(diào)用
        @Override
        public void call(Subscriber<? super R> o) {
            //這個(gè)o是上游調(diào)用這個(gè)return new Observable返回的觀察者中的OnSubscribe的call方法傳遞下來的
            //觀察者,在本例中,由于onNext之前就是Observable.create,因此o中的call方法就是:
            //  {
            //      it.onNext("123");
            //      it.onCompleted();
            //  }
            try {
                //調(diào)用call,將o轉(zhuǎn)換成st。
                //st中的call方法的邏輯參照著OperatorDoOnEach的邏輯就是:將operator的調(diào)用邏輯追加在o的調(diào)用邏輯之前。
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                try {
                    // new Subscriber created and being subscribed with so 'onStart' it
                    st.onStart();
                    //繼續(xù)調(diào)用call方法
                    onSubscribe.call(st);
                } catch (Throwable e) {
                    // localized capture of errors rather than it skipping all operators 
                    // and ending up in the try/catch of the subscribe method which then
                    // prevents onErrorResumeNext and other similar approaches to error handling
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    st.onError(e);
                }
            } catch (Throwable e) {
                if (e instanceof OnErrorNotImplementedException) {
                    throw (OnErrorNotImplementedException) e;
                }
                // if the lift function failed all we can do is pass the error to the final Subscriber
                // as we don't have the operator available to us
                o.onError(e);
            }
        }
    });
}

注意,onLift方法是一個(gè)全局鉤子。

public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> lift) {
    //默認(rèn)實(shí)現(xiàn)是啥都不處理直接返回。
    return lift;
}

5 常用操作符源碼分析

5.1 filter

過濾

public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
    //調(diào)用lift
    return lift(new OperatorFilter<T>(predicate));
}
public final class OperatorFilter<T> implements Operator<T, T> {

    private final Func1<? super T, Boolean> predicate;

    public OperatorFilter(Func1<? super T, Boolean> predicate) {
        this.predicate = predicate;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> child) {
        return new Subscriber<T>(child) {

            @Override
            public void onCompleted() {
                child.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    //如果call方法返回true,才繼續(xù)將數(shù)據(jù)向下傳遞
                    if (predicate.call(t)) {
                        child.onNext(t);
                    } else {
                        // TODO consider a more complicated version that batches these
                        request(1);
                    }
                } catch (Throwable e) {
                    child.onError(OnErrorThrowable.addValueAsLastCause(e, t));
                }
            }

        };
    }

}

5.2 map

映射

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}
public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    //transformer就是映射,映射后的數(shù)據(jù)將繼續(xù)向下游發(fā)射。
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    onError(OnErrorThrowable.addValueAsLastCause(e, t));
                }
            }

        };
    }

}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容