Rx系列<第十三篇>:RxJava之線程控制(切換/調(diào)度)

RxJava的線程控制主要設(shè)計(jì)到兩種操作符:subscribeOnobserveOn

subscribeOn:如果多次調(diào)用,則只有第一次調(diào)用有效;
observeOn:如果多次調(diào)用,每次有可以切換線程。

(1)默認(rèn)情況下
    Observable.just("A")
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "threadName:"+Thread.currentThread().getName());
                }
            });

打印日志:

圖片.png

默認(rèn)情況下被觀察者和觀察者是運(yùn)行在主線程的,如果阻塞50秒(耗時(shí)操作)

    Observable.just("A")
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Thread.sleep(50000);
                    Log.d("aaa", "threadName:"+Thread.currentThread().getName());
                }
            });

這樣會(huì)阻塞主線程。

這時(shí),我們就需要用到線程控制的知識(shí)了。

(2)Scheduler的種類
  • Schedulers.io(?):
    用于IO密集型的操作,例如讀寫SD卡文件,查詢數(shù)據(jù)庫,訪問網(wǎng)絡(luò)等,具有線程緩存機(jī)制,在此調(diào)度器接收到任務(wù)后,先檢查線程緩存池中,是否有空閑的線程,如果有,則復(fù)用,如果沒有則創(chuàng)建新的線程,并加入到線程池中,如果每次都沒有空閑線程使用,可以無上限的創(chuàng)建新線程。

  • Schedulers.newThread(?):
    在每執(zhí)行一個(gè)任務(wù)時(shí)創(chuàng)建一個(gè)新的線程,不具有線程緩存機(jī)制,因?yàn)閯?chuàng)建一個(gè)新的線程比復(fù)用一個(gè)線程更耗時(shí)耗力,雖然使用Schedulers.io(?)的地方,都可以使用Schedulers.newThread(?),但是,Schedulers.newThread(?)的效率沒有Schedulers.io(?)高。

  • Schedulers.computation():
    用于CPU 密集型計(jì)算任務(wù),即不會(huì)被 I/O 等操作限制性能的耗時(shí)操作,例如xml,json文件的解析,Bitmap圖片的壓縮取樣等,具有固定的線程池,大小為CPU的核數(shù)。不可以用于I/O操作,因?yàn)镮/O操作的等待時(shí)間會(huì)浪費(fèi)CPU。

  • Schedulers.trampoline():
    在當(dāng)前線程立即執(zhí)行任務(wù),如果當(dāng)前線程有任務(wù)在執(zhí)行,則會(huì)將其暫停,等插入進(jìn)來的任務(wù)執(zhí)行完之后,再將未完成的任務(wù)接著執(zhí)行。

  • Schedulers.single():
    擁有一個(gè)線程單例,所有的任務(wù)都在這一個(gè)線程中執(zhí)行,當(dāng)此線程中有任務(wù)執(zhí)行時(shí),其他任務(wù)將會(huì)按照先進(jìn)先出的順序依次執(zhí)行。

  • Scheduler.from(@NonNull Executor executor):
    指定一個(gè)線程調(diào)度器,由此調(diào)度器來控制任務(wù)的執(zhí)行策略。

  • AndroidSchedulers.mainThread():
    在Android UI線程中執(zhí)行任務(wù),為Android開發(fā)定制。

  • 注:
    在RxJava2中,廢棄了RxJava1中的Schedulers.immediate(?)
    在RxJava1中,Schedulers.immediate(?)的作用為在當(dāng)前線程立即執(zhí)行任務(wù),功能等同于RxJava2中的Schedulers.trampoline(?)。
    而Schedulers.trampoline(?)在RxJava1中的作用是當(dāng)其它排隊(duì)的任務(wù)完成后,在當(dāng)前線程排隊(duì)開始執(zhí)行接到的任務(wù),有點(diǎn)像RxJava2中的Schedulers.single(),但也不完全相同,因?yàn)镾chedulers.single()不是在當(dāng)前線程而是在一個(gè)線程單例中排隊(duì)執(zhí)行任務(wù)。

(3)subscribeOn的使用
    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .subscribeOn(Schedulers.io())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

代碼中添加了subscribeOn(Schedulers.io())這句代碼,這樣就可以從默認(rèn)主線程切換到IO線程。

我們看一下打印結(jié)果

圖片.png

所以, 如果單純用subscribeOn來控制線程,那么被觀察者和觀察者都會(huì)被切換到指定的線程。

如果添加多個(gè), 比如

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .subscribeOn(Schedulers.io())
            .subscribeOn(Schedulers.trampoline())
            .subscribeOn(Schedulers.newThread())
            .subscribeOn(Schedulers.computation())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

那么只有第一次調(diào)用subscribeOn有效果。

(4)observeOn的使用
    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .observeOn(Schedulers.newThread())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

打印效果

圖片.png

我們發(fā)現(xiàn)被觀察者在主線程運(yùn)行,觀察者在子線程運(yùn)行。

結(jié)論:結(jié)合(3)總結(jié)的結(jié)論是,subscribeOn可以控制被觀察者和觀察者的線程,observeOn僅可以控制觀察者的線程。

(5)subscribeOn和observeOn結(jié)合使用
    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(Schedulers.computation())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

打印效果如下:

圖片.png

這樣觀察者就從主線程切換到子線程了。

我們再來舉一個(gè)稍微復(fù)雜的例子。

   Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
            e.onNext("A");

        }
    })
            .subscribeOn(AndroidSchedulers.mainThread())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map1----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.computation())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map2----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.newThread())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map3----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.single())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map4----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.io())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map1----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.computation())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map2----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.newThread())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map3----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.single())
            .map(new Function<String, String>() {

                @Override
                public String apply(String s) throws Exception {
                    Log.d("aaa", "map4----threadName:"+Thread.currentThread().getName());
                    return s;
                }
            })
            .observeOn(Schedulers.io())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
                }
            });

執(zhí)行效果如下:

圖片.png

我們發(fā)現(xiàn)

  • 多次調(diào)用Schedulers.single(),都是在同一個(gè)線程執(zhí)行。
  • 多次調(diào)用Schedulers.computation()、Schedulers.newThread()、Schedulers.io()都會(huì)重新新建線程。

Schedulers.from()和AndroidSchedulers.mainThread()就不介紹了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容