RxJava2 中多種取消訂閱 dispose 的方法梳理( 源碼分析 )

Github 相關(guān)代碼: Github地址

一直感覺 RxJava2 的取消訂閱有點(diǎn)混亂, 這樣也能取消, 那樣也能取消, 沒能系統(tǒng)起來的感覺就像掉進(jìn)了盤絲洞, 迷亂...
下面說說這幾種情況


幾種取消的情況

  1. subscribe 時返回了 disposable:
  2. subscribe 不返回 disposable, 從 observer 的 onSubscribe 中獲取:
  3. 之前從網(wǎng)上看的, 使用繼承 DisposableObserver 的 observer, 這個 observer 可以直接 dispose

之前剛了解到到這幾種方式的時候表情是這樣的

今天打起精神, 看了點(diǎn)源碼, 搞懂了這到底是什么鬼.


源碼分析

" 啰嗦啥啊, 這么簡單的東西還需要貼源碼? "
" 大哥, 下面有總結(jié).... "

第一種情況開始看, 我們進(jìn)入到 .subscribe((s) -> {}) 中看, 發(fā)現(xiàn)它是返回了一個四參數(shù)的重載方法

    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ...
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
        subscribe(ls);
        return ls;
    }

可以看到, 這個方法里創(chuàng)建了一個 LambdaObserver, 這個 Observer 實(shí)現(xiàn)了Disposable 接口, 所以可以直接作為 Disposable 返回到最上級, 這就是為什么第一種情況中的 subscribe 能返回 disposable 的原因.

public final class LambdaObserver<T> extends AtomicReference<Disposable>
        implements Observer<T>, Disposable, LambdaConsumerIntrospection {
            ...
}

第二種情況subscribe 其實(shí)就是上面方法里的 subscribe(LambdaObserver)

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

這個方法內(nèi)執(zhí)行了 subscribeActual(observer) 抽象方法, 交給了 Observale 的子類重寫.

第三種情況 中出現(xiàn)的 DisposableObserverLambdaObserver 差不多, 甚至更簡單

public abstract class DisposableObserver<T> implements Observer<T>, Disposable {
    final AtomicReference<Disposable> s = new AtomicReference<Disposable>();

    @Override
    public final void onSubscribe(@NonNull Disposable s) {
        if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
            onStart();
        }
    }

    protected void onStart() { }

    @Override
    public final boolean isDisposed() {
        return s.get() == DisposableHelper.DISPOSED;
    }

    @Override
    public final void dispose() {
        DisposableHelper.dispose(s);
    }
}

簡單總結(jié)

稍微一看我們就明白了, 當(dāng)傳入 Observer 接口的四個方法時, subscribe 在內(nèi)部構(gòu)建了一個 LambdaObserver , 而這個 LambdaObserver 和第三種情況的 DisposableObserver 都實(shí)現(xiàn)了 Disposable 接口, 所以可以作為 Disposable 返回, 就是這么簡單.

另外第三種情況里出現(xiàn)的 CompositeDisposable, 簡單說就是一個 Disposable 集合( 由 RxJava 內(nèi)部提供的OpenHashSet 維護(hù), 線程安全 ), CompositeDisposable.dispose() 時會遍歷內(nèi)部的所有 Disposable 執(zhí)行 dispose 操作.

    /**
     * Dispose the contents of the OpenHashSet by suppressing non-fatal
     * Throwables till the end.
     * @param set the OpenHashSet to dispose elements of
     */
    void dispose(OpenHashSet<Disposable> set) {
        ...
        for (Object o : array) {
            if (o instanceof Disposable) {
                try {
                    ((Disposable) o).dispose();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    if (errors == null) {
                        errors = new ArrayList<Throwable>();
                    }
                    errors.add(ex);
                }
            }
        }
        ...
    }

幾種方式的適用情況##

  1. 如果是零星使用的話, 第一種最方便, observer 的四個方法可以按需使用, 相同邏輯的方法有多種可供選擇

    image

  2. 如果使用的 Observer 有很多共同邏輯, 則可以寫一個 BaseObserver 繼承 DisposableObserver 或者 LambdaObserver, 直接使用 xxObserver.dispose()

open class BaseObserver<T>() : DisposableObserver<T>()
  1. 如果有很多的 diposable 需要取消的話, 使用 CompositeDisposable 會更簡單一些
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容