一、前言
- 基于RxJava2.1.1
- 我們在前面的 RxJava2.0使用詳解(一)初步分析了RxJava從創(chuàng)建到執(zhí)行的流程。RxJava2.0使用詳解(二) 中分析了RxJava的隨意終止Reactive流的能力的來源;也明白了
RxJava的onComplete();與onError(t);只有一個會被執(zhí)行的秘密。RxJava2.X 源碼分析(三)中探索了RxJava2調(diào)用subscribeOn切換被觀察者線程的原理。 - 本次我們將繼續(xù)探索
RxJava2.x切換觀察者的原理,分析observeOn與subscribeOn的不同之處。繼續(xù)實現(xiàn)我們在第一篇中定下的小目標(biāo)
二、從Demo到原理
- OK,我們的Demo還是上次的demo,忘記了的小伙伴可以點擊RxJava2.X 源碼分析(三),這里就不再重復(fù)了哦,我們直接進入正題。
- Ok,按照套路,我們從
observeOn方法入手。
- Ok,我點~_
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
//false為默認(rèn)無延遲發(fā)送錯誤,bufferSize為緩沖區(qū)大小
return observeOn(scheduler, false, bufferSize());
}
- 我們繼續(xù)往下看,我猜套路跟
subscribeOn的逃不多,也是采用裝飾者模式,wrapper我們的Observable和Observer產(chǎn)生一個中間被觀察者和觀察中,通過中間被觀察者訂閱上游被觀察者,通過中間觀察者接收上游被觀察者下發(fā)的數(shù)據(jù),然后通過線程切換將數(shù)據(jù)傳遞給下游觀察者。
- Ok,我們來驗證下才想。我覺得就是沒完全猜對,也能猜對其中的大部分。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
-
Ok,熟悉的
RxJavaPlugins.onAssemblyhook處理,略過,直接看new ObservableObserveOn(this, scheduler, delayError, bufferSize)這句public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observersuper T> observer) { //1、在當(dāng)前線程調(diào)度,但不是立即執(zhí)行,放入隊列中 if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { //2、本次走的是這里 Scheduler.Worker w = scheduler.createWorker(); //3 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
- Ok,果然,熟悉的模式,對我們上游的
Observable,下游的Observerwrapper一次。
1、ObservableObserveOn繼承了AbstractObservableWithUpstream
2、source保存上游的Observable
3、scheduler為本次的調(diào)度器
4、在下游調(diào)用subscribe訂閱時觸發(fā)->subscribeActual->Wrapper了下游的Observer觀察者
- 3處:source為游Observable,下游Observer被wrapper到ObserveOnObserver,發(fā)生訂閱數(shù)件,上游Observable開始執(zhí)行subscribeActual,調(diào)用ObserveOnObserver的onSubscribe以及onNext、onError、onComplete等
- OK,我們接著看Observer被包裝進 ObserveOnObserver的樣子,代碼有點多,我們分段講解
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
//下游的Observer
final Observersuper T> actual;
//調(diào)度工作者
final Scheduler.Worker worker;
//是否延遲錯誤,默認(rèn)false
final boolean delayError;
//隊列大小
final int bufferSize;
//存儲上游Observable下發(fā)的數(shù)據(jù)隊列
SimpleQueue<T> queue;
//存儲下游Observer的Disposable
Disposable s;
//存儲錯誤信息
Throwable error;
//校驗是否完畢
volatile boolean done;
//是否被取消
volatile boolean cancelled;
//存儲執(zhí)行模式,同步或者異步 同步
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observersuper T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//1、判斷執(zhí)行模式并調(diào)用onSubscribe傳遞給下游Observer
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
//true 后面的onXX方法都不會被調(diào)用
done = true;
actual.onSubscribe(this);
//2、同步模式下,直接調(diào)用schedule
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
//2、異步模式下,等待schedule
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//判斷執(zhí)行模式并調(diào)用onSubscribe傳遞給下游Observer
actual.onSubscribe(this);
}
}
- OK,執(zhí)行玩這里之后,就到我們的onXX方法了
- 首先可無限調(diào)用的
onNext
@Override
public void onNext(T t) {
//3、數(shù)據(jù)源是同步模式或者執(zhí)行過error / complete 會是true
if (done) {
return;
}
//如果數(shù)據(jù)源不是異步類型,
if (sourceMode != QueueDisposable.ASYNC) {
//4、上游Observable下發(fā)的數(shù)據(jù)壓入queue
queue.offer(t);
}
//5、開始調(diào)度
schedule();
}
- 其次只能觸發(fā)一次的onError,基本差不多
@Override
public void onError(Throwable t) {
if (done) {
//6、已完成再執(zhí)行會拋一場
RxJavaPlugins.onError(t);
return;
}
//7、記錄錯誤信息
error = t;
//8、標(biāo)識已完成
done = true;
//9、開始調(diào)度
schedule();
}
- 同樣是只能觸發(fā)一次的onComplete,同樣的套路,就不說了
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
- 然后就是我們的關(guān)鍵點
schedule();
//關(guān)鍵點就是直接、簡單、里面線程調(diào)度工作者調(diào)用schedule(this),傳入了this
void schedule() {
//getAndIncrement很關(guān)鍵,他原子性的保證了worker.schedule(this);在調(diào)度完之前不會被再次調(diào)度
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
什么?傳入了this?那么說明什么呢?( ̄? ̄)
嗯?this是個
runnable,沒錯,我們的ObserveOnObserver實現(xiàn)了Runnable接口那么,接下來自然是調(diào)用
run方法
@Override
public void run() {
//outputFused一般是false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
- 好吧,在看drainNormal前,我們先看一個函數(shù)
//從名字看是檢測是否已終止
boolean checkTerminated(boolean d, boolean empty, Observersuper T> a) {
//1、訂閱已取消
if (cancelled) {
//清空隊列
queue.clear();
return true;
}
//2、d其實是done,
if (d) {
//done==ture可能的情況onNext剛被調(diào)度完,onError或者onCompele被調(diào)用,
Throwable e = error;
if (delayError) {
//delayError==true時等到隊列為空才調(diào)用
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
//否則直接調(diào)用
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
//否則未終結(jié)
return false;
}
true:1、訂閱被取消cancelled==true,2、done==true onNext剛被調(diào)度完,onError或者onCompele被調(diào)用
繼續(xù)看drainNormal
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observersuper T> a = actual;
//Ok,死循環(huán),我們來看下有哪些出口
for (;;) {
//Ok,出口,該方法前面分析的
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
//在此死循環(huán)
for (;;) {
boolean d = done;
T v;
try {
//分發(fā)數(shù)據(jù)出隊列
v = q.poll();
} catch (Throwable ex) {
//有異常時終止退出
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
//停止worker(線程)
worker.dispose();
return;
}
boolean empty = v == null;
//判斷隊列是否為空
if (checkTerminated(d, empty, a)) {
return;
}
//沒數(shù)據(jù)退出
if (empty) {
break;
}
//數(shù)據(jù)下發(fā)給下游Obsever,這里支付者onNext,onComplete和onError主要放在了checkTerminated里面回調(diào)
a.onNext(v);
}
//保證此時確實有一個 worker.schedule(this);正在被執(zhí)行,
missed = addAndGet(-missed);
//為何要這樣做呢?我的理解是保證drainNormal方法被原子性調(diào)用,如果執(zhí)行了addAndGet之后getAndIncrement() == 0就成立了,此時又一個worker.schedule(this);被調(diào)用了,那么就不能執(zhí)行break了
if (missed == 0) {
break;
}
}
}
總結(jié)
- Ok,看到這里我們基本了解了observeOn的實現(xiàn)流程,同樣是老套路,使用裝飾者模式,中間Wrapper了我們的Observable和Observer,通過中間增加一個Observable和Observer來實現(xiàn)線程的切換。
- 喜歡就給我留言哦