線程切換的操作在 Rx 里面非常常用,主要有 subscribeOn observeOn 他們都需要一個(gè) Scheduler 參數(shù),很明顯這個(gè)是接口 可以實(shí)現(xiàn)各種 調(diào)度器。根據(jù)這個(gè)我們可以這樣寫(xiě)...
//Scheduler.kt
abstract class Scheduler {
abstract fun createWorker(): Worker
abstract class Worker {
abstract fun schedule(action: () -> Unit)
}
}
Worker 是實(shí)際工作的,有個(gè)抽象方法 schedule 需要我們?nèi)?shí)現(xiàn)
Executor 是 Java 里面線程池的執(zhí)行器, Executors 里面有各種已經(jīng)實(shí)現(xiàn)的線程池,至此 我們可以來(lái)實(shí)現(xiàn)一個(gè)ExecutorScheduler
//ExecutorScheduler.kt
class ExecutorScheduler(private var executor: Executor) : Scheduler() {
override fun createWorker(): Worker {
return WorkerImpl(executor)
}
private class WorkerImpl(private var executor: Executor) : Worker() {
override fun schedule(action: () -> Unit) {
executor.execute {
action()
}
}
}
}
參考 rxjava 建立一個(gè) Schedulers 存放 Scheduler,這樣就直接可以調(diào)用 Schedulers.io()
class Schedulers {
companion object {
private val io = ExecutorScheduler(Executors.newSingleThreadExecutor())
fun io(): Scheduler {
return io
}
}
}
實(shí)現(xiàn) Android 特有 的 AndroidScheduler,主要利用 Looper
//LooperScheduler.kt
class LooperScheduler(looper: Looper) : Scheduler() {
private var handler : Handler = Handler(looper)
override fun createWorker(): Worker = LooperSchedulerWorker(handler)
private class LooperSchedulerWorker(private var handler : Handler) : Worker() {
override fun schedule(action: () -> Unit) {
handler.post {
action()
}
}
}
}
//AndroidSchedulers.kt
class AndroidSchedulers {
companion object {
private val mainThread = LooperScheduler(Looper.getMainLooper())
fun mainThread(): Scheduler {
return mainThread
}
}
}
實(shí)現(xiàn) subscribeOn observeOn
//Observable.kt
fun <R> lift(operator: Operator<R, T>): Observable<R> {
return create(OnSubscribeLift(onSubscribe!!, operator))
}
fun subscribeOn(scheduler: Scheduler): Observable<T> {
return create(OperatorSubscribeOn(this, scheduler))
}
fun observeOn(scheduler: Scheduler): Observable<T> {
return lift(OperatorObserveOn(scheduler))
}
interface Operator<T, R> {
fun call(subscriber: Subscriber<T>) : Subscriber<R>
}
//OnSubscribeLift.kt
class OnSubscribeLift<T, R>(private var parent: Observable.OnSubscribe<T>, private var operator: Observable.Operator<R, T>) : Observable.OnSubscribe<R>{
override fun call(subscriber: Subscriber<R>) {
try {
val st = operator.call(subscriber)
st.onStart()
parent.call(st)
}catch (e: Exception) {
subscriber.onError(e)
}
}
}
在 rxjava 里面 subscribeOn 只有調(diào)用的第一次起作用,而 observeOn 則是看最近的那次調(diào)用。
原因在于 subscribeOn 調(diào)度的是執(zhí)行 OnSubscribe 因此多次調(diào)用最上游的還是 在 第一次的線程那里執(zhí)行的,除非更改下游 觀察者所在的線程 也就是 observeOn。如果不明白的,可以多看這部分代碼或者 rxjava 的源碼加深理解。
//OperatorSubscribeOn.kt
class OperatorSubscribeOn<T>(private var source: Observable<T> , private var scheduler: Scheduler) : Observable.OnSubscribe<T>{
override fun call(subscriber: Subscriber<T>) {
val worker = scheduler.createWorker()
worker.schedule {
source.subscribe(SubscribeOnSubscriber(subscriber))
}
}
class SubscribeOnSubscriber<T>(private var actual: Subscriber<T>): Subscriber<T>() {
override fun onCompleted() {
actual.onCompleted()
}
override fun onError(t: Throwable) {
actual.onError(t)
}
override fun onNext(t: T) {
actual.onNext(t)
}
}
}
//OperatorObserveOn.kt
class OperatorObserveOn<T>(private var scheduler: Scheduler): Observable.Operator<T, T> {
override fun call(subscriber: Subscriber<T>): Subscriber<T> {
val worker = scheduler.createWorker()
return object : Subscriber<T>(){
override fun onCompleted() {
worker.schedule {
subscriber.onCompleted()
}
}
override fun onError(t: Throwable) {
worker.schedule {
subscriber.onError(t)
}
}
override fun onNext(t: T) {
worker.schedule {
subscriber.onNext(t)
}
}
}
}
}
//Test.kt
Observable.just("1", "2", "3")
.subscribeOn(Schedulers.io())
.map {
it.toInt() + 1
}
.filter {
it != 1
}
.map {
it
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Subscriber<Int>() {
override fun onCompleted() {
}
override fun onError(t: Throwable) {
}
override fun onNext(t: Int) {
System.out.println(t)
}
})