關(guān)于RxJava1中的Subscription的一些誤解

通常我們使用RxJava時會這樣寫:

Subscription subscription = getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {

                }
            });

得到一個Subscription對象之后,在Activity銷毀的時候去取消訂閱以防內(nèi)存泄漏:

    if (subscription != null && !subscription.isUnsubscribed()) {
        subscription.unsubscribe();
    }  

這樣做固然沒錯, 可是真的每次都需要手動取消訂閱嗎? 答案顯然不是.
我們來看看源碼:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
   ... 
   // if not already wrapped
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    } 
}   

當(dāng)調(diào)用subscribe時,調(diào)用了第二個方法, 當(dāng)subscriber不是SafeSubscriber的實例時, 則創(chuàng)建一個SafeSubscriber對象. 那么來看看這個SafeSubscriber是個什么鬼:

  public class SafeSubscriber<T> extends Subscriber<T> {
        @Override
        public void onCompleted() {
            ...
            try {
                actual.onCompleted();
            } catch (Throwable e) {
                 ...
            } finally { // NOPMD
                try {
                     unsubscribe();
                } catch (Throwable e) {}
            }
        }

        @Override
        public void onError(Throwable e) {
                ...
                _onError(e);
        }

        @Override
        public void onNext(T args) {}

        protected void _onError(Throwable e) { // NOPMD
            ...
            try {
                unsubscribe();
            } catch (Throwable unsubscribeException) {}
        }
    }

可以看出該類僅僅是對Subscriber做了個包裝.
仔細(xì)查看代碼, 發(fā)現(xiàn)在onComplated() 方法和onError() 方法中, 都調(diào)用了unsubscribe() 方法進行取消訂閱.
因此我們可以大膽猜測, 只要OnCompleted() 和 onError() 執(zhí)行之后, 都會取消訂閱.
下面就簡單的進行一下測試:

 Subscription subscription = Observable.just(1)
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    System.out.println("complete");
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("error");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("integer:" + integer);
                }
            });
    boolean flag = subscription.isUnsubscribed();
    System.out.println("flag:" + flag);

運行結(jié)果為:

    integer:1
    complete
    flag: true

說明onCompleted執(zhí)行之后的確取消了訂閱. 再來看看OnError:

     Subscription subscription = Observable.just(1)
            .doOnNext(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    throw new RuntimeException("Oops!");
                }
            })
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    System.out.println("complete");
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("error");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("integer:" + integer);
                }
            });
    boolean flag = subscription.isUnsubscribed();
    System.out.println("flag:" + flag);

運行結(jié)果為:

    error
    flag: true

說明OnError執(zhí)行之后也取消了訂閱.

那么究竟什么時候需要手動取消訂閱呢? 我們再看一個例子:

     final Subscription subscription = Observable.just(1)
                    .subscribeOn(Schedulers.io())
                    .delay(2, SECONDS)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onCompleted() {
                            System.out.println("complete");
                        }

                        @Override
                        public void onError(Throwable e) {
                            System.out.println("error");
                        }

                        @Override
                        public void onNext(Integer integer) {
                            System.out.println("integer:" + integer);
                        }
                    });

            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        System.out.println("flag:" + subscription.isUnsubscribed());
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();

我們模擬平時App網(wǎng)絡(luò)請求的例子, 在后臺線程中發(fā)起網(wǎng)絡(luò)請求, 在主線程更新界面, 并延時兩秒模擬網(wǎng)絡(luò)不好的情況, 同時新啟一個線程, 每隔一秒打印一次當(dāng)前Subscription的狀態(tài), 下面是運行結(jié)果:

  I/System.out: flag:false
  I/System.out: flag:false
  I/System.out: integer:1
  I/System.out: complete
  I/System.out: flag:true
  I/System.out: flag:true

可以看到在前兩秒時由于還在后臺執(zhí)行網(wǎng)絡(luò)請求,所以并沒有取消訂閱, 而當(dāng)complete執(zhí)行之后, 訂閱就取消了.
因此我們在開發(fā)中, 如果有異步的操作還正在進行, 在Activity銷毀時才需要手動取消訂閱, 而如果是同步的操作, 或者異步操作已經(jīng)完成, 則并不需要手動取消訂閱.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容