RxJava的線程控制主要設(shè)計(jì)到兩種操作符:subscribeOn 和observeOn
subscribeOn:如果多次調(diào)用,則只有第一次調(diào)用有效;
observeOn:如果多次調(diào)用,每次有可以切換線程。
(1)默認(rèn)情況下
Observable.just("A")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", "threadName:"+Thread.currentThread().getName());
}
});
打印日志:

默認(rèn)情況下被觀察者和觀察者是運(yùn)行在主線程的,如果阻塞50秒(耗時(shí)操作)
Observable.just("A")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Thread.sleep(50000);
Log.d("aaa", "threadName:"+Thread.currentThread().getName());
}
});
這樣會(huì)阻塞主線程。
這時(shí),我們就需要用到線程控制的知識(shí)了。
(2)Scheduler的種類
Schedulers.io(?):
用于IO密集型的操作,例如讀寫SD卡文件,查詢數(shù)據(jù)庫,訪問網(wǎng)絡(luò)等,具有線程緩存機(jī)制,在此調(diào)度器接收到任務(wù)后,先檢查線程緩存池中,是否有空閑的線程,如果有,則復(fù)用,如果沒有則創(chuàng)建新的線程,并加入到線程池中,如果每次都沒有空閑線程使用,可以無上限的創(chuàng)建新線程。Schedulers.newThread(?):
在每執(zhí)行一個(gè)任務(wù)時(shí)創(chuàng)建一個(gè)新的線程,不具有線程緩存機(jī)制,因?yàn)閯?chuàng)建一個(gè)新的線程比復(fù)用一個(gè)線程更耗時(shí)耗力,雖然使用Schedulers.io(?)的地方,都可以使用Schedulers.newThread(?),但是,Schedulers.newThread(?)的效率沒有Schedulers.io(?)高。Schedulers.computation():
用于CPU 密集型計(jì)算任務(wù),即不會(huì)被 I/O 等操作限制性能的耗時(shí)操作,例如xml,json文件的解析,Bitmap圖片的壓縮取樣等,具有固定的線程池,大小為CPU的核數(shù)。不可以用于I/O操作,因?yàn)镮/O操作的等待時(shí)間會(huì)浪費(fèi)CPU。Schedulers.trampoline():
在當(dāng)前線程立即執(zhí)行任務(wù),如果當(dāng)前線程有任務(wù)在執(zhí)行,則會(huì)將其暫停,等插入進(jìn)來的任務(wù)執(zhí)行完之后,再將未完成的任務(wù)接著執(zhí)行。Schedulers.single():
擁有一個(gè)線程單例,所有的任務(wù)都在這一個(gè)線程中執(zhí)行,當(dāng)此線程中有任務(wù)執(zhí)行時(shí),其他任務(wù)將會(huì)按照先進(jìn)先出的順序依次執(zhí)行。Scheduler.from(@NonNull Executor executor):
指定一個(gè)線程調(diào)度器,由此調(diào)度器來控制任務(wù)的執(zhí)行策略。AndroidSchedulers.mainThread():
在Android UI線程中執(zhí)行任務(wù),為Android開發(fā)定制。注:
在RxJava2中,廢棄了RxJava1中的Schedulers.immediate(?)
在RxJava1中,Schedulers.immediate(?)的作用為在當(dāng)前線程立即執(zhí)行任務(wù),功能等同于RxJava2中的Schedulers.trampoline(?)。
而Schedulers.trampoline(?)在RxJava1中的作用是當(dāng)其它排隊(duì)的任務(wù)完成后,在當(dāng)前線程排隊(duì)開始執(zhí)行接到的任務(wù),有點(diǎn)像RxJava2中的Schedulers.single(),但也不完全相同,因?yàn)镾chedulers.single()不是在當(dāng)前線程而是在一個(gè)線程單例中排隊(duì)執(zhí)行任務(wù)。
(3)subscribeOn的使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
e.onNext("A");
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
}
});
代碼中添加了subscribeOn(Schedulers.io())這句代碼,這樣就可以從默認(rèn)主線程切換到IO線程。
我們看一下打印結(jié)果

所以, 如果單純用subscribeOn來控制線程,那么被觀察者和觀察者都會(huì)被切換到指定的線程。
如果添加多個(gè), 比如
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
e.onNext("A");
}
})
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.trampoline())
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.computation())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
}
});
那么只有第一次調(diào)用subscribeOn有效果。
(4)observeOn的使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
e.onNext("A");
}
})
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
}
});
打印效果

我們發(fā)現(xiàn)被觀察者在主線程運(yùn)行,觀察者在子線程運(yùn)行。
結(jié)論:結(jié)合(3)總結(jié)的結(jié)論是,subscribeOn可以控制被觀察者和觀察者的線程,observeOn僅可以控制觀察者的線程。
(5)subscribeOn和observeOn結(jié)合使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
e.onNext("A");
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.computation())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
}
});
打印效果如下:

這樣觀察者就從主線程切換到子線程了。
我們再來舉一個(gè)稍微復(fù)雜的例子。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d("aaa", "Observable----threadName:"+Thread.currentThread().getName());
e.onNext("A");
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map1----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map2----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map3----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.single())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map4----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map1----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map2----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map3----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.single())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("aaa", "map4----threadName:"+Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", "Obsever----threadName:"+Thread.currentThread().getName());
}
});
執(zhí)行效果如下:

我們發(fā)現(xiàn)
- 多次調(diào)用Schedulers.single(),都是在同一個(gè)線程執(zhí)行。
- 多次調(diào)用Schedulers.computation()、Schedulers.newThread()、Schedulers.io()都會(huì)重新新建線程。
Schedulers.from()和AndroidSchedulers.mainThread()就不介紹了。