Rxjava-源碼淺嘗

        //被觀察者
        Observable<String> stringObservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello");
                subscriber.onNext("Jenchar");
                subscriber.onCompleted();
            }
        });
        //觀察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.w(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.w(TAG, "onError");
            }

            @Override
            public void onNext(String s) {
                Log.w(TAG, "onNext");
            }
        };
        //訂閱
        stringObservable.subscribe(observer);

通過(guò)Observable的create靜態(tài)方法傳入OnSubscribe的實(shí)例,OnSubscribe繼承Action1并且實(shí)現(xiàn)了其call方法,
傳入的泛型為OnSubscribe<String>的泛型,Action1<T>中傳入了Subscriber<T>,泛型作為T(mén)被層層傳遞

public interface Action1<T> extends Action {
    void call(T t);
}

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
}

從Action1<Subscriber<? super T>>這個(gè)接口可以看出泛型已經(jīng)指定為Subscriber<? super T>,這個(gè)Subscriber泛型怎么傳遞進(jìn)來(lái)的呢?

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
    
     static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }

        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
          //省略
         }
            return Subscriptions.unsubscribed();
        }
    }

以上這段源碼為鏈?zhǔn)浇Y(jié)構(gòu)最后的執(zhí)行方法,可以看到subscriber最終是通過(guò) RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)傳入,讓我們看看這個(gè)方法里面怎么操作

    public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
        Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
        if (f != null) {
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }

最終返回的還是onSubscribe對(duì)象,再調(diào)用繼承了Action1的call方法傳入了該subscriber指定了泛型,形成了觀察者的傳遞,subscriber實(shí)現(xiàn)Observer接口。

public interface Observer<T> {

    void onCompleted();

  
    void onError(Throwable e);

   
    void onNext(T t);

}

public abstract class Subscriber<T> implements Observer<T>, Subscription 
以下代碼省略

還有一點(diǎn)值得注意,在RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);之前還會(huì)將subscriber轉(zhuǎn)成SafeSubscriber,這一步的主要作用就是保證onComplete和onError方法只執(zhí)行一次,并且在onNext方法中如果trycatch到不在判斷范圍內(nèi)的異常不會(huì)拋出而是會(huì)進(jìn)行捕捉調(diào)用onError終止

 @Override
    public void onNext(T t) {
        try {
            if (!done) {
                actual.onNext(t);
            }
        } catch (Throwable e) {
            // we handle here instead of another method so we don't add stacks to the frame
            // which can prevent it from being able to handle StackOverflow
            Exceptions.throwOrReport(e, this);
        }
    }
    
 public static void throwOrReport(Throwable t, Observer<?> o) {
        Exceptions.throwIfFatal(t);
        o.onError(t);
    }

 public static void throwIfFatal(Throwable t) {
        if (t instanceof OnErrorNotImplementedException) {
            throw (OnErrorNotImplementedException) t;
        } else if (t instanceof OnErrorFailedException) {
            throw (OnErrorFailedException) t;
        } else if (t instanceof OnCompletedFailedException) {
            throw (OnCompletedFailedException) t;
        }
        // values here derived from https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495
        else if (t instanceof VirtualMachineError) {
            throw (VirtualMachineError) t;
        } else if (t instanceof ThreadDeath) {
            throw (ThreadDeath) t;
        } else if (t instanceof LinkageError) {
            throw (LinkageError) t;
        }
    }

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容