Rx中的線程切換

初學(xué)者在使用RxJava的過(guò)程中,經(jīng)常搞不清Observable的事件序列和每次操作應(yīng)該怎樣切換線程,切換哪個(gè)線程
首先需要搞懂在RxJava.subscribeOn()observeOn() 之間的區(qū)別:

  • .subscribeOn() 用來(lái)指定Observable應(yīng)該操作的調(diào)度器(Scheduler)
  • .observeOn() 指定 Observable在一個(gè)指定的調(diào)度器(Scheduler)上給觀察者發(fā)送通知
  • 默認(rèn)情況下, 事件序列操作的線程與調(diào)用.subscribe()的線程一致

沒理解?

英文原文: https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2#.nn1m7lrb8</br>
翻譯: hanks
注: 不是完全翻譯,添加了具體例子

例子

1.主線程 / .subscribe() 線程

在 Activity的 onCreate()(主線程) 方法中添加以下代碼:

Observable.just(1,2,3)
  .subscribe();

調(diào)用情況如下:

圖片
圖片

實(shí)驗(yàn):

Observable.just(1,2,3)
            .doOnNext(new Action1<Integer>() {
                @Override public void call(Integer integer) {
                    Log.i("RxThread", "doOnNext:" + integer +", run In :" + Thread.currentThread().getName() );
                }
            })
            .subscribe(new Action1<Integer>() {
                @Override public void call(Integer integer) {
                    Log.i("RxThread", "get result:" + integer +", run In :" + Thread.currentThread().getName() );
                }
            });

輸出結(jié)果:

12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:1, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:2, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:3, run In :main

2. .subscribeOn()

即使你在主線程中添加下面的代碼,但是整段代碼將運(yùn)行在 .subscribeOn()定義的線程上

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .subscribe();
圖片
圖片

實(shí)驗(yàn):

Observable.just(1,2,3)
           .doOnNext(new Action1<Integer>() {
               @Override public void call(Integer integer) {
                   Log.i("RxThread", "doOnNext:" + integer +", run In :" + Thread.currentThread().getName() );
               }
           })
           .subscribeOn(Schedulers.newThread())
           .subscribe(new Action1<Integer>() {
               @Override public void call(Integer integer) {
                   Log.i("RxThread", "get result:" + integer +", run In :" + Thread.currentThread().getName() );
               }
           });

輸出結(jié)果:

12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:1, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:2, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:3, run In :RxNewThreadScheduler-1

3. .observeOn()

加入在主線程中添加下面的代碼,首先 Observable 將在 .subscribe() 的線程上創(chuàng)建,但是 .observeOn()方法被調(diào)用之后,代碼將運(yùn)行在指定的線程上:

Observable.just(1,2,3)
  .observeOn(Schedulers.newThread())
  .subscribe();
圖片
圖片

實(shí)驗(yàn):

new Thread() {
           @Override public void run() {
               Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {
                   @Override public void call(Integer integer) {
                       Log.i("RxThread", "doOnNext:" + integer + ", run In :" + Thread.currentThread()
                               .getName());
                   }
               })
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Action1<Integer>() {
                   @Override public void call(Integer integer) {
                       Log.i("RxThread", "get result:" + integer + ", run In :" + Thread.currentThread()
                               .getName());
                   }
               });

           }
       }.start();

輸出結(jié)果:

12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :Thread-155
12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :Thread-155
12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :Thread-155
12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:1, run In :main
12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:2, run In :main
12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:3, run In :main

3. Combined logic

由于操作可以被組合使用,于是有了下面的代碼:

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .observeOn(Schedulers.newThread())
  .subscribe();
圖片
圖片

實(shí)驗(yàn):

new Thread() {
     @Override public void run() {
         Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {
             @Override public void call(Integer integer) {
                 Log.i("RxThread", "doOnNext:" + integer + ", run In :" + Thread.currentThread()
                         .getName());
             }
         })
         .subscribeOn(Schedulers.newThread())
         .observeOn(AndroidSchedulers.mainThread())
         .subscribe(new Action1<Integer>() {
             @Override public void call(Integer integer) {
                 Log.i("RxThread", "get result:" + integer + ", run In :" + Thread.currentThread()
                         .getName());
             }
         });
     }
 }.start();

輸出結(jié)果:

12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :RxNewThreadScheduler-1
12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :RxNewThreadScheduler-1
12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :RxNewThreadScheduler-1
12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:1, run In :main
12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:2, run In :main
12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:3, run In :main

Tips / Gotchas:

1. “UI線程運(yùn)行異?!?/h3>
Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .subscribe(/** logic which touches ui **//); //在newThread中調(diào)用

obviously.

2. 邏輯處理放在后臺(tái)(newThread)

錯(cuò)誤姿勢(shì):

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .observeOn(AndroidSchedulers.mainThread())
  .flatMap(/** logic which doesn't touch ui **//)
  .subscribe();

實(shí)驗(yàn):

new Thread() {
       @Override public void run() {
           Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           })
           .subscribeOn(Schedulers.newThread())
           .observeOn(AndroidSchedulers.mainThread())
           .flatMap(new Func1<String, Observable<String>>() {
               @Override public Observable<String> call(String str) {
                   Log.i("RxThread", "flatMap:" + str + ", run In :" + Thread.currentThread());
                   return Observable.from(str.split("-") ); // 返回平方
               }
           })
           .subscribe(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           });
       }
   }.start();

輸出結(jié)果:

12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Picasso, run In :Thread[main,5,main]
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Picasso, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Glide, run In :Thread[main,5,main]
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Glide, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Fresco, run In :Thread[main,5,main]
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Fresco, run In :main

正確姿勢(shì):

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .flatMap(/** logic which doesn't touch ui **//)
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe();

第二段代碼中 flatMap (或者其他邏輯處理)將運(yùn)行在后臺(tái)線程, 如果是在Android中,這樣做不會(huì)阻塞UI,阻塞UI的話有可能導(dǎo)致ANR之類的異常。這跟 AsyncTask中的 doInBackground()類似,在 doInBackground()中做耗時(shí)操作

實(shí)驗(yàn):

new Thread() {
       @Override public void run() {
           Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           })
           .subscribeOn(Schedulers.newThread())
           .flatMap(new Func1<String, Observable<String>>() {
               @Override public Observable<String> call(String str) {
                   Log.i("RxThread", "flatMap:" + str + ", run In :" + Thread.currentThread());
                   return Observable.from(str.split("-") ); // 返回平方
               }
           })
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           });
       }
   }.start();

輸出結(jié)果:

12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Picasso, run In :Thread[RxNewThreadScheduler-1,5,main]
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Glide, run In :Thread[RxNewThreadScheduler-1,5,main]
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Fresco, run In :Thread[RxNewThreadScheduler-1,5,main]
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Picasso, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Glide, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Fresco, run In :main

3. 最早的 .subscribeOn() 生效

看下面的代碼:

Observable.just(1,2,3)
  .subscribeOn(thread1)
  .subscribeOn(thread2)
  .subscribe();

Observable 的創(chuàng)建和 .subscribeOn() 的調(diào)用都將在 thread1 上面執(zhí)行,所以沒有必要多次調(diào)用 .subscribeOn(),因?yàn)橹挥械谝淮蔚氖怯杏玫摹?/p>

實(shí)驗(yàn):

new Thread() {
      @Override public void run() {
          Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
              @Override public void call(String str) {
                  Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                          .getName());
              }
          })
          .subscribeOn(Schedulers.newThread())
          .subscribeOn(Schedulers.io())
          .subscribeOn(Schedulers.computation())
          .subscribe(new Action1<String>() {
              @Override public void call(String str) {
                  Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                          .getName());
              }
          });
      }
  }.start();

輸出結(jié)果

12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Fresco, run In :RxNewThreadScheduler-1

實(shí)驗(yàn)

new Thread() {
       @Override public void run() {
           Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           })
           .subscribeOn(Schedulers.io())
           .subscribeOn(Schedulers.newThread())
           .subscribeOn(Schedulers.computation())
           .subscribe(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           });
       }
   }.start();

輸出結(jié)果

12-06 16:52:13.378 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Picasso, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Glide, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Fresco, run In :RxCachedThreadScheduler-2

Android Rxjava Rxandroid

文章出處 (http://hanks.xyz)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容