這篇文章探究一下rxjava安卓主線程是怎么實現(xiàn)的,上代碼:
public final class AndroidSchedulers {
public static Scheduler mainThread() {
//老樣式,這個方法實際直接返回入?yún)AIN_THREAD,追著他看就行
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
//靜態(tài)變量,initMainThreadScheduler()返回 MainHolder.DEFAULT,追著看MainHolder.DEFAULT
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
//靜態(tài)內(nèi)部類MainHolder ,DEFAULT 在這里初始化,HandlerScheduler就是我們要的主線程scheduler
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
最終得到一個HandlerScheduler對象,這個就是安卓主線程的Scheduler了,看他的構(gòu)造方法,傳入了一個new Handler(Looper.getMainLooper()),很熟悉吧。
了解這個類之前,先把它在哪里調(diào)用的代碼放出來,在上上篇線程切換的文章里講到過的:
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//這個scheduler就是HandlerScheduler,createWorker()方法跟上篇的io線程方法步驟調(diào)用差不多
Scheduler.Worker w = scheduler.createWorker();
//最終這個 worker放入了ObserveOnObserver,最終執(zhí)行schdule()方法來執(zhí)行onNext()操作,達到切換線程的目的。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
既然調(diào)用步驟知道了,就帶著這些方法來看看HandlerScheduler類:
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
//直接new了一個HandlerWorker
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
//HandlerWorker是哥靜態(tài)內(nèi)部類,看來各種Scheduler的結(jié)構(gòu)都是相似的
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
//構(gòu)造方法,傳入了那個有主線程looper的handler,
HandlerWorker(Handler handler) {
this.handler = handler;
}
//最終執(zhí)行線程任務(wù)的地方
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//handler的消息機制,obtainf()方法把scheduled 這個raunnable給了massage的callback
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
//發(fā)送出去了,最終這個message會被主線程的Handler劫持,然后因為msg的callback不為空,
//最終它會自己執(zhí)行run()方法,這樣就達到了在主線程執(zhí)行下游onNext()的目的,
//不熟悉handler的要再去看看它的源碼咯
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
//封裝的一個代理類,方便管理生命周期
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
總結(jié):
rxjava切換到android主線程的關(guān)鍵就是那個獲取了mainLooper的handler,利用message和handler的機制達到了切換的目的。