從 RxBus 這輛蘭博基尼深入進去

很早之前有看過別人實現(xiàn)的 RxBus , 當初也只是隨意用用而已,沒有想過去研究。今天看到 brucezz 天哥在群里分享了一把,自己也加入了討論,下來還實踐了一把,所以想借此篇進入到源碼層,深刻體驗下 RxBus 這輛 “蘭博基尼” 的設計美感和獨特魅力。

不太清楚簡書怎么生成目錄,帶目錄版本可以在我博客看下。

RxBus

準備

關于簡單的實現(xiàn)和用法,這篇文章已經(jīng)很好的說明了。

推薦先看看 RxBus 的簡單實現(xiàn)和用法。

地址在這里:RxBus 的簡單實現(xiàn)

解剖


讓我們看看這輛車到底用了些什么?

  • Subject

  • SerializedSubject

  • PublishSubject

  • CompositeSubscription

從 Subject 開始發(fā)車

官方解釋

這是 Subject 的中文解釋:

Subject可以看成是一個橋梁或者代理,在某些ReactiveX實現(xiàn)中(如RxJava),它同時充當了Observer和Observable的角色。因為它是一個Observer,它可以訂閱一個或多個Observable;又因為它是一個Observable,它可以轉(zhuǎn)發(fā)它收到(Observe)的數(shù)據(jù),也可以發(fā)射新的數(shù)據(jù)。

由于一個Subject訂閱一個Observable,它可以觸發(fā)這個Observable開始發(fā)射數(shù)據(jù)(如果那個Observable是"冷"的--就是說,它等待有訂閱才開始發(fā)射數(shù)據(jù))。因此有這樣的效果,Subject可以把原來那個"冷"的Observable變成"熱"的。

Subject 源碼

源碼:


public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
    protected Subject(OnSubscribe<R> onSubscribe) {
        super(onSubscribe);
    }

    public abstract boolean hasObservers();
    
    public final SerializedSubject<T, R> toSerialized() {
        if (getClass() == SerializedSubject.class) {
            return (SerializedSubject<T, R>)this;
        }
        return new SerializedSubject<T, R>(this);
    }

Subject 只有兩個方法。

hasObservers()方法的解釋是:

Indicates whether the {@link Subject} has {@link Observer Observers} subscribed to it.
判斷 Subject 是否已經(jīng)有 observers 訂閱了 有則返回 ture

toSerialized() 方法的解釋是:

Wraps a {@link Subject} so that it is safe to call its various {@code on} methods from different threads.
包裝 Subject 后讓它可以安全的在不同線程中調(diào)用各種方法

為什么這個方法后就可以是線程安全了呢?

我們看到 toSerialized() 返回了 SerializedSubject<T, R> 。我們先到這里打住,稍后我們再看看該類做了什么。

PublishSubject 解釋

在 RxJava 里有一個抽象類 Subject,既是 Observable 又是 Observer,可以把 Subject 理解成一個管道或者轉(zhuǎn)發(fā)器,數(shù)據(jù)從一端輸入,然后從另一端輸出。

Subject 有好幾種,這里可以使用最簡單的 PublishSubject。訂閱之后,一旦數(shù)據(jù)從一端傳入,結(jié)果會里立刻從另一端輸出。

源碼里給了用法例子:

  PublishSubject<Object> subject = PublishSubject.create();
  // observer1 will receive all onNext and onCompleted events
  subject.subscribe(observer1);
  subject.onNext("one");
  subject.onNext("two");
  // observer2 will only receive "three" and onCompleted
  subject.subscribe(observer2);
  subject.onNext("three");
  subject.onCompleted();

串行化

官方文檔推薦我們:

如果你把 Subject 當作一個 Subscriber 使用,注意不要從多個線程中調(diào)用它的onNext方法(包括其它的on系列方法),這可能導致同時(非順序)調(diào)用,這會違反Observable協(xié)議,給Subject的結(jié)果增加了不確定性。

要避免此類問題,你可以將 Subject 轉(zhuǎn)換為一個 SerializedSubject ,類似于這樣:

mySafeSubject = new SerializedSubject( myUnsafeSubject );

所以我們可以看到在 RxBus 初始化的時候我們做了這樣一件事情:

    private final Subject<Object, Object> BUS;

    private RxBus() {
        BUS = new SerializedSubject<>(PublishSubject.create());
    }

為了保證多線程的調(diào)用中結(jié)果的確定性,我們按照官方推薦將 Subject 轉(zhuǎn)換成了一個 SerializedSubject 。

SerializedSubject

該類同樣是 Subject 的子類,這里貼出該類的構(gòu)造方法。

    private final SerializedObserver<T> observer;
    private final Subject<T, R> actual;

    public SerializedSubject(final Subject<T, R> actual) {
        super(new OnSubscribe<R>() {

            @Override
            public void call(Subscriber<? super R> child) {
                actual.unsafeSubscribe(child);
            }

        });
        this.actual = actual;
        this.observer = new SerializedObserver<T>(actual);
    }

我們發(fā)現(xiàn),Subject 最后轉(zhuǎn)化成了 SerializedObserver.

SerializedObserver

When multiple threads are emitting and/or notifying they will be serialized by:
Allowing only one thread at a time to emit
Adding notifications to a queue if another thread is already emitting
Not holding any locks or blocking any threads while emitting

一次只會允許一個線程進行發(fā)送事物
如果其他線程已經(jīng)準備就緒,會通知給隊列
在發(fā)送事物中,不會持有任何鎖和阻塞任何線程

通過介紹可以知道是通過 notifications 來進行并發(fā)處理的。

SerializedObserver 類中
private final NotificationLite<T> nl = NotificationLite.instance();

重點看看 nl 在 onNext() 方法里的使用:


 @Override
    public void onNext(T t) {
   // 省略一些代碼
        for (;;) {
            for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
                FastList list;
                synchronized (this) {
                    list = queue;
                    if (list == null) {
                        emitting = false;
                        return;
                    }
                    queue = null;
                }
                for (Object o : list.array) {
                    if (o == null) {
                        break;
                    }
                    // 這里的 accept() 方法
                    try {
                        if (nl.accept(actual, o)) {
                            terminated = true;
                            return;
                        }
                    } catch (Throwable e) {
                        terminated = true;
                        Exceptions.throwIfFatal(e);
                        actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
                        return;
                    }
                }
            }
        }
    }

NotificationLite

知道哪里具體調(diào)用了之后,我們再仔細看看 NotificationLite

先來了解它到底是什么:

For use in internal operators that need something like materialize and dematerialize wholly within the implementation of the operator but don't want to incur the allocation cost of actually creating {@link rx.Notification} objects for every {@link Observer#onNext onNext} and {@link Observer#onCompleted onCompleted}.
It's implemented as a singleton to maintain some semblance of type safety that is completely non-existent.

大致意思是:作為一個單例類保持這種完全不存在的安全類型的表象。

剛我們在 SerializedObserver 的 onNext() 方法中看到 nl.accept(actual, o)
所以我們再深入到 accept() 方法中:

   public boolean accept(Observer<? super T> o, Object n) {
        if (n == ON_COMPLETED_SENTINEL) {
            o.onCompleted();
            return true;
        } else if (n == ON_NEXT_NULL_SENTINEL) {
            o.onNext(null);
            return false;
        } else if (n != null) {
            if (n.getClass() == OnErrorSentinel.class) {
                o.onError(((OnErrorSentinel) n).e);
                return true;
            }
            o.onNext((T) n);
            return false;
        } else {
            throw new IllegalArgumentException("The lite notification can not be null");
        }
    }

Unwraps the lite notification and calls the appropriate method on the {@link Observer}.
判斷 lite 通知類別,通知 observer 執(zhí)行適當方法。

通過 NotificationLite 類圖可以看到有三個標識

  • ON_NEXT_NULL_SENTINEL (onNext 標識)
  • ON_COMPLETED_SENTINEL (onCompleted 標識)
  • OnErrorSentinel (onError 標識)

與 Observer 回調(diào)一致。通過分析得知 accept() 就是通過標識來判斷,然后調(diào)用 Observer 相對應的方法。

CompositeSubscription

RxBus 這輛"蘭博基尼"與 CompositeSubscription 車間搭配更好。


構(gòu)造函數(shù):

    private Set<Subscription> subscriptions;
    private volatile boolean unsubscribed;

    public CompositeSubscription() {
    }

    public CompositeSubscription(final Subscription... subscriptions) {
        this.subscriptions = new HashSet<Subscription>(Arrays.asList(subscriptions));
    }

內(nèi)部是初始化了一個 HashSet ,按照哈希算法來存取集合中的對象,存取速度比較快,并且沒有重復對象。

所以我們推薦在基類里實例化一個 CompositeSubscription 對象,使用 CompositeSubscription 來持有所有的 Subscriptions ,然后在 onDestroy()或者 onDestroyView()里取消所有的訂閱。

參考文章

熄火休息

能力有限,文章錯誤還望指出,有任何問題都歡迎討論 :)

轉(zhuǎn)載請注明出處。

最后送上我女神 Gakki , 開心最好 ( ′?v `? )?。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容