讀RxJava源碼:理解subscribe原理

前言

使用RxJava也有一段時間了,通過這種訂閱數(shù)據(jù)的思想編寫代碼,避免了大量的接口回調(diào),使得數(shù)據(jù)處理更加方便,對外提供數(shù)據(jù)的方式更加統(tǒng)一,回避了同步接口和異步接口的不同。
本文是閱讀拋物線的《給 Android 開發(fā)者的 RxJava 詳解》一文后,結(jié)合閱讀源碼理解觀察、訂閱實現(xiàn)原理的筆記。

最簡單的觀察、訂閱

下面是一個Observable的創(chuàng)建和完成訂閱的示例代碼

        Observable
                .create(new Observable.OnSubscribe<Integer>() {
                    @Override
                    public void call(Subscriber<? super Integer> subscriber) {
                        subscriber.onNext(0);
                        subscriber.onCompleted();
                    }
                })
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer integer) {

                    }
                });

需要關(guān)注的就三個:

  1. Observable
  2. OnSubscriber
  3. Subscriber

Observable

首先看create()如何創(chuàng)建了一個了一個Observable。

    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

過程很簡單,就是將傳遞給 create()OnSubscribe 保存了起來就結(jié)束了。RxJavaHooks主要是用于性能優(yōu)化,在RxJava的源代碼中很常見。

subscribe()

// 核心代碼
    public final Subscription subscribe(Subscriber<? super T> subscriber)   {
        subscriber.onStart();
        onSubscribe.call(subscriber);
        return subscriber;
    }

通過核心代碼,可見流程十分簡單,首先是調(diào)用傳入的 subscriber#onStart 方法,該方法默認不做任何操作。之后就是將Subscriber當(dāng)作參數(shù)調(diào)用Observable中的OnSubscriber#call,而在 call() 中調(diào)用了subscriber的 onNext()onCompelte() 。數(shù)據(jù)就完成了從了Observable.OnSubscribeSubscriber的數(shù)據(jù)的傳遞。最后返回的Subscriber是為了方便取消訂閱等操作。

給subscriber添加Subscription

在實際應(yīng)用中,會有這樣一個需求:在subscriber退訂時需要清理Observable被訂閱時一起創(chuàng)建的資源,例如關(guān)閉socket等。示例代碼如下:

        Observable.create(new Observable.OnSubscribe<Integer>() {
                    @Override
                    public void call(Subscriber<? super Integer> subscriber) {
                        // 創(chuàng)建資源
                        
                        subscriber.add(Subscriptions.create(new Action0() {
                            @Override
                            public void call() {
                                // 在退訂時被調(diào)用,清理資源        
                            }
                        }));
                        
                        // do something
                    }
                });

代碼中給subscriber添加了一個Subscription,Subscription接口有兩個方法:

  1. void unsubscribe();
  2. boolean isUnsubscribed();

void unsubscribe();在退訂時被調(diào)用。通過 Subscriptions#create 創(chuàng)建的Subscription會在退訂時調(diào)用 Action0#call 。實現(xiàn)代碼如下:

// 構(gòu)造方法
    private BooleanSubscription(Action0 action) {
        actionRef = new AtomicReference<Action0>(action);
    }

    @Override
    public boolean isUnsubscribed() {
        return actionRef.get() == EMPTY_ACTION;
    }

        @Override
    public void unsubscribe() {
        Action0 action = actionRef.get();
        if (action != EMPTY_ACTION) {
            action = actionRef.getAndSet(EMPTY_ACTION);
            if (action != null && action != EMPTY_ACTION) {
                action.call();
            }
        }
    }

可見在退訂時會清除對action的引用,并且是通過判斷action是否為空引用來判斷是否已經(jīng)被退訂的,并且使用了AtomicReference類來保存引用,保證了該類線程安全。

退訂時,Subscription#unsubscribe被調(diào)用的原理可以查看 SubscriptionList 的源碼知曉:

// subscriber#unsubscribe
    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

// SubscriptionList
    @Override
    public void unsubscribe() {
        if (!unsubscribed) {
            List<Subscription> list;
            synchronized (this) {
                if (unsubscribed) {
                    return;
                }
                unsubscribed = true;
                list = subscriptions;
                subscriptions = null;
            }
            // we will only get here once
            unsubscribeFromAll(list);
        }
    }

    private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
        if (subscriptions == null) {
            return;
        }
        List<Throwable> es = null;
        for (Subscription s : subscriptions) {
            try {
                s.unsubscribe();
            } catch (Throwable e) {
                if (es == null) {
                    es = new ArrayList<Throwable>();
                }
                es.add(e);
            }
        }
        Exceptions.throwIfAny(es);
    }

核心思想就是退訂時遍歷subscriptions中的Subcription并調(diào)用 unsubscribe() 。

總結(jié)

可見完成一個基本的觀察、訂閱原理并不復(fù)雜,而在源碼中會有很多性能優(yōu)化,錯誤處理相關(guān)的代碼,在閱讀源碼時需要學(xué)會挑重點、優(yōu)先關(guān)注核心的邏輯代碼。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 我從去年開始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy閱讀 5,759評論 7 62
  • 前言 我從去年開始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboar...
    AWeiLoveAndroid閱讀 2,991評論 4 42
  • 作者寄語 很久之前就想寫一個專題,專寫Android開發(fā)框架,專題的名字叫 XXX 從入門到放棄 ,沉淀了這么久,...
    戴定康閱讀 7,731評論 13 85
  • Github:https://github.com/ReactiveX/RxJavahttps://github....
    才兄說閱讀 1,713評論 2 10
  • (二) 今晨7:40 ,寫完昨日筆記后出門,沿著13大道公路的步行道往前走,走了大約2公里處,繼續(xù)感受甘斯威爾這座...
    聞丁閱讀 554評論 4 2

友情鏈接更多精彩內(nèi)容