RxJava2系列第二篇---異步

目錄

第一篇---基本使用
第二篇---異步
第一篇---操作符

異步

在該系列第一篇最開(kāi)始,我們已經(jīng)說(shuō)了RxJava是一個(gè)異步編程框架,之所以這么說(shuō),就是因?yàn)樗诰€程的切換方面非常方便。
介紹異步之前,我們先看看下面幾個(gè)方法

subscribe(Observer<? super T> observer)
subscribe(Consumer<? super T> onNext)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe)

在基本使用中,我們用的是第一個(gè)方法,但此方法需要重寫所有的事件,但有的時(shí)候我們并不需要對(duì)所有的事件進(jìn)行處理,因此就有了下面幾種方法,看參數(shù)我們就知道每個(gè)方法分別處理的是什么事件,比如第二個(gè)方法,只處理next事件,第三個(gè)方法,只處理next和error事件等等。

為了代碼的簡(jiǎn)潔性,接下來(lái)我將使用Consumer作為觀察者。

Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d("TAG","subscribe:"+Thread.currentThread().getName());
            emitter.onNext("1");
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("TAG","accept:"+Thread.currentThread().getName());
        }
});

輸出結(jié)果

D/TAG: subscribe:main
D/TAG: accept:main

結(jié)果分析
如果我將上述代碼,放在一個(gè)子線程中去,發(fā)現(xiàn)結(jié)果線程的名字將不再是main。說(shuō)明在哪創(chuàng)建上述代碼,則上游和下游就會(huì)處于那個(gè)線程,并且它們處于同一個(gè)線程。

如果我們要在子線程中發(fā)送交易,主線程更新UI,這種情況就滿足不了我們的需求了。我們需要的是上線處于子線程,負(fù)責(zé)發(fā)送網(wǎng)絡(luò)請(qǐng)求,下游處于主線程,負(fù)責(zé)更新UI。通過(guò)RxJava的線程調(diào)度器可以輕松實(shí)現(xiàn)上述需求。

在Observable中有兩個(gè)方法

subscribeOn(Scheduler scheduler)  //指定上游所在的線程
observeOn(Scheduler scheduler)  //指定下游所在的線程

先來(lái)看看下面的代碼

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d("TAG", "subscribe:" + Thread.currentThread().getName());
            emitter.onNext("1");
        }
    });
    Consumer<String> consumer = new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
        }
    };
//關(guān)注點(diǎn)
observable.subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(consumer);

輸出結(jié)果

D/TAG: subscribe:RxNewThreadScheduler-1
D/TAG: accept:1:main

我們來(lái)看看Schedulers和AndroidSchedulers
這兩個(gè)類并無(wú)繼承關(guān)系,是相互獨(dú)立的兩個(gè)final類

AndroidSchedulers

/** A {@link Scheduler} which executes actions on the Android main thread.*/
public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
    if (looper == null) throw new NullPointerException("looper == null");
    return new HandlerScheduler(new Handler(looper));
}

前者指定方法在主線程中執(zhí)行
后者指定方法在哪個(gè)線程執(zhí)行,由Looper所在的線程決定

Schedulers
該調(diào)度器里面有下面幾個(gè)主要方法

//新的子線程
public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
//計(jì)算密集型任務(wù)
public static Scheduler computation() {
    return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}
//io密集型任務(wù)
 public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}
public static Scheduler trampoline() {
    return TRAMPOLINE;
}
public static Scheduler single() {
    return RxJavaPlugins.onSingleScheduler(SINGLE);
}

其他兩個(gè)暫時(shí)沒(méi)用到,就先不說(shuō)明了。

回到開(kāi)始的異步代碼
修改關(guān)注點(diǎn)

observable.subscribeOn(Schedulers.newThread())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .observeOn(Schedulers.io())
            .subscribe(consumer);

輸出結(jié)果

D/TAG: subscribe:RxNewThreadScheduler-1
D/TAG: accept:1:RxCachedThreadScheduler-2

結(jié)果分析
修改后的代碼指定了2次上游發(fā)送事件的線程,下游也指定了2次線程,通過(guò)輸出結(jié)果,我們可以得出結(jié)論:上游線程只有第一次指定的有效,下游線程最終會(huì)切換至最后一個(gè)指定的線程。

為了更加清晰的知道下游線程的切換過(guò)程,我們修改代碼如下

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d("TAG", "subscribe:" + Thread.currentThread().getName());
            emitter.onNext("1");
            emitter.onNext("2");
            emitter.onNext("3");
        }
    });
    Consumer<String> consumer = new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
        }
    };
    observable.subscribeOn(Schedulers.newThread())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
                }
            })
            .observeOn(Schedulers.io())
            .doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
                }
            })
            .subscribe(consumer);

輸出結(jié)果如下

D/TAG: subscribe:RxNewThreadScheduler-1
D/TAG: accept:1:main
D/TAG: accept:2:main
D/TAG: accept:3:main
D/TAG: accept:1:RxCachedThreadScheduler-2
D/TAG: accept:1:RxCachedThreadScheduler-2
D/TAG: accept:2:RxCachedThreadScheduler-2
D/TAG: accept:2:RxCachedThreadScheduler-2
D/TAG: accept:3:RxCachedThreadScheduler-2
D/TAG: accept:3:RxCachedThreadScheduler-2

結(jié)果分析
從輸出結(jié)果,我們可以看出,每一個(gè)doOnNext都會(huì)接受到全部事件,并且每一個(gè)observeOn指定的是它下面的那個(gè)事件所處的線程。

線程切換原理分析

RxJava線程切換圖.png

來(lái)一段RxJava的調(diào)用鏈代碼

Observable.just(1)
    .map(new Function<Integer, String>() {
      @Override
      public String apply(Integer integer) throws Exception {
        LogUtil.d("rxjava", "map1: " + Thread.currentThread().getId());
        return integer.toString();
      }
    })
    .subscribeOn(Schedulers.newThread()) // s1
    .map(new Function<String, Integer>() {
      @Override
      public Integer apply(String s) throws Exception {
        LogUtil.d("rxjava", "map2: " + Thread.currentThread().getId());
        return s.hashCode();
      }
    })
    .observeOn(Schedulers.newThread()) // o1
    .map(new Function<Integer, String>() {
      @Override
      public String apply(Integer integer) throws Exception {
        LogUtil.d("rxjava", "map3: " + Thread.currentThread().getId());
        return integer.toString();
      }
    })
    .subscribeOn(Schedulers.newThread()) // s2
    .observeOn(Schedulers.newThread()) // o2
    .subscribe(new Consumer<String>() {
      @Override
      public void accept(String s) throws Exception {
        LogUtil.d("rxjava", "accept: " + Thread.currentThread().getId());
      }
    });
結(jié)合圖和代碼來(lái)分析線程的切換過(guò)程

這段代碼中包含了很多操作符,每一個(gè)點(diǎn)后面的都是RxJava的操作符,如just,map,subscribe等等,對(duì)應(yīng)圖中的lift
在這些操作符中,每調(diào)用一次操作符,都返回Observable,這就像Builder模式,只有subscribe返回的不是Observable,而是Disposable

subscribe意味著RxJava調(diào)用鏈開(kāi)始啟動(dòng),對(duì)應(yīng)圖中的底端的actual-subscriber。

自下而上找subscribeOn,每經(jīng)過(guò)一個(gè)subscribeOn就切換一次線程(如果一個(gè)都沒(méi)有,則線程默認(rèn)為當(dāng)前線程),直到到達(dá)頂端的Observable,對(duì)應(yīng)圖中的onSubscribe

自上而下找observeOn(圖中的Subscribe),同樣是每經(jīng)過(guò)一個(gè)observeOn就切換一次線程

通過(guò)這個(gè)思路,大家想一想上述程序的打印結(jié)果
just,map1和map2處于一個(gè)線程,并且是s1所指定的線程
map3處于o1所處的線程
accept處于o2所處的線程

打印驗(yàn)證一下

map1: 732
map2: 732
map3: 733
accept: 734

結(jié)果一致

[]

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容