概述
上一節(jié)我們分析了最簡單的Rxjava的例子,了解了Rxjava是如何創(chuàng)建事件源,如何發(fā)射事件,何時發(fā)射事件,也清楚了上游和下游是如何關(guān)聯(lián)起來的。
這一節(jié)我們著重來分析下Rxjava強大的線程調(diào)度是如何實現(xiàn)的。
簡單的例子
private void doSomeWork() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.i("lx", " subscribe: " + Thread.currentThread().getName());
Thread.sleep(2000);
e.onNext("a");
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onNext(String str) {
Log.i("lx", " onNext: " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i("lx", " onError: " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i("lx", " onComplete: " + Thread.currentThread().getName());
}
});
}
運行結(jié)果:
com.rxjava2.android.samples I/lx: onSubscribe: main
com.rxjava2.android.samples I/lx: subscribe: main
com.rxjava2.android.samples I/lx: onNext: main
com.rxjava2.android.samples I/lx: onComplete: main
因為此方法筆者是在main線程中調(diào)用的,所以沒有進行線程調(diào)度的情況下,所有方法都運行在main線程中。但我們知道Android的UI線程是不能做網(wǎng)絡(luò)操作,也不能做耗時操作,所以一般我們把網(wǎng)絡(luò)或耗時操作都放在非UI線程中執(zhí)行。接下來我們就來感受下Rxjava強大的線程調(diào)度能力。
private void doSomeWork() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.i("lx", " subscribe: " + Thread.currentThread().getName());
Thread.sleep(2000);
e.onNext("a");
e.onComplete();
}
}).subscribeOn(Schedulers.io()) //增加了這一句
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onNext(String str) {
Log.i("lx", " onNext: " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i("lx", " onError: " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i("lx", " onComplete: " + Thread.currentThread().getName());
}
});
}
運行結(jié)果:
com.rxjava2.android.samples I/lx: onSubscribe: main
com.rxjava2.android.samples I/lx: subscribe: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx: onNext: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx: onComplete: RxCachedThreadScheduler-1
只增加了subscribeOn這一句代碼, 就發(fā)生如此神奇的現(xiàn)象,除了onSubscribe方法還運行在main線程(訂閱發(fā)生的線程)其它方法全部都運行在一個名為RxCachedThreadScheduler-1的線程中。我們來看看rxjava是怎么完成這個線程調(diào)度的。
線程調(diào)度subscribeOn
首先我們先分析下Schedulers.io()這個東東。
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO); // hook function
// 等價于
return IO;
}
再看看IO是什么, IO是個static變量,初始化的地方是
IO = RxJavaPlugins.initIoScheduler(new IOTask()); // 又是hook function
// 等價于
IO = callRequireNonNull(new IOTask());
// 等價于
IO = new IOTask().call();
繼續(xù)看看IOTask
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
// 等價于
return new IoScheduler();
}
}
代碼層次很深,為了便于記憶,我們再回顧一下:
Schedulers.io()等價于 new IoScheduler()
// Schedulers.io()等價于
@NonNull
public static Scheduler io() {
return new IoScheduler();
}
好了,排除了其他干擾代碼,接下來看看IoScheduler()是什么東東了
IoScheduler看名稱就知道是個IO線程調(diào)度器,根據(jù)代碼注釋得知,它就是一個用來創(chuàng)建和緩存線程的線程池。看到這個豁然開朗了,原來Rxjava就是通過這個調(diào)度器來調(diào)度線程的,至于具體怎么實現(xiàn)我們接著往下看
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
從上面的代碼可以看出,new IoScheduler()后Rxjava會創(chuàng)建CachedWorkerPool的線程池,同時也創(chuàng)建并運行了一個名為RxCachedWorkerPoolEvictor的清除線程,主要作用是清除不再使用的一些線程。
但目前只創(chuàng)建了線程池并沒有實際的thread,所以Schedulers.io()相當(dāng)于只做了線程調(diào)度的前期準(zhǔn)備。
OK,終于可以開始分析Rxjava是如何實現(xiàn)線程調(diào)度的?;氐紻emo來看subscribeOn()方法的內(nèi)部實現(xiàn):
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
很熟悉的代碼RxJavaPlugins.onAssembly,上一篇已經(jīng)分析過這個方法,就是個hook function, 等價于直接return new ObservableSubscribeOn<T>(this, scheduler);, 現(xiàn)在知道了這里的scheduler其實就是IoScheduler。
跟蹤代碼進入ObservableSubscribeOn,
可以看到這個ObservableSubscribeOn 繼承自O(shè)bservable,并且擴展了一些屬性,增加了scheduler。 各位看官,這不就是典型的裝飾模式嘛,Rxjava中大量用到了裝飾模式,后面還會經(jīng)常看到這種wrap類。
上篇文章我們已經(jīng)知道了Observable.subscribe()方法最終都是調(diào)用了對應(yīng)的實現(xiàn)類的subscribeActual方法。我們重點分析下subscribeActual:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// 沒有任何線程調(diào)度,直接調(diào)用的,所以下游的onSubscribe方法沒有切換線程,
//本文demo中下游就是觀察者,所以我們明白了為什么只有onSubscribe還運行在main線程
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
SubscribeOnObserver也是裝飾模式的體現(xiàn), 是對下游observer的一個wrap,只是添加了Disposable的管理。
接下來分析最重要的scheduler.scheduleDirect(new SubscribeTask(parent))
// 這個類很簡單,就是一個Runnable,最終運行上游的subscribe方法
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// IoSchedular 中的createWorker()
final Worker w = createWorker();
// hook decoratedRun=run;
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// decoratedRun的wrap,增加了Dispose的管理
DisposeTask task = new DisposeTask(decoratedRun, w);
// 線程調(diào)度
w.schedule(task, delay, unit);
return task;
}
回到IoSchedular
public Worker createWorker() {
// 工作線程是在此時創(chuàng)建的
return new EventLoopWorker(pool.get());
}
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
// action 中就包含上游subscribe的runnable
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
最終線程是在這個方法內(nèi)調(diào)度并執(zhí)行的。
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
// decoratedRun = run, 包含上游subscribe方法的runnable
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// decoratedRun的wrap,增加了dispose的管理
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
// 最終decoratedRun被調(diào)度到之前創(chuàng)建或從線程池中取出的線程,
// 也就是說在RxCachedThreadScheduler-x運行
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
至此我們終于明白了Rxjava是如何調(diào)度線程并執(zhí)行的,通過subscribeOn方法將上游生產(chǎn)事件的方法運行在指定的調(diào)度線程中。
com.rxjava2.android.samples I/lx: onSubscribe: main
com.rxjava2.android.samples I/lx: subscribe: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx: onNext: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx: onComplete: RxCachedThreadScheduler-1
從上面的運行結(jié)果來看,因為上游生產(chǎn)者已被調(diào)度到RxCachedThreadScheduler-1線程中,同時發(fā)射事件并沒有切換線程,所以發(fā)射后消費事件的onNext onErro onComplete也在RxCachedThreadScheduler-1線程中。
總結(jié)
-
Schedulers.io()等價于new IoScheduler() -
new IoScheduler()Rxjava創(chuàng)建了線程池,為后續(xù)創(chuàng)建線程做準(zhǔn)備,同時創(chuàng)建并運行了一個清理線程RxCachedWorkerPoolEvictor,定期執(zhí)行清理任務(wù)。 -
subscribeOn()返回一個ObservableSubscribeOn對象,它是Observable的一個裝飾類,增加了scheduler。 - 調(diào)用
subscribe()方法,在這個方法調(diào)用后,subscribeActual()被調(diào)用,才真正執(zhí)行了IoSchduler中的createWorker()創(chuàng)建線程并運行,最終將上游Observable的subscribe()方法調(diào)度到新創(chuàng)建的線程中運行。
現(xiàn)在我們知道了被觀察者(事件上游)執(zhí)行線程是如何被調(diào)度到指定線程中執(zhí)行的,但很多情況下,我們希望觀察者(事件下游)處理事件最好在UI線程執(zhí)行,比如更新UI操作等。但下游何時調(diào)度,如何調(diào)度由于篇幅問題,將放到下節(jié)繼續(xù)分析。敬請期待。