RxJava3.0已經(jīng)發(fā)布了,但是這里還是以RxJava2.x來分析部分源碼。RxJava采用的是響應(yīng)式編程的原理,采用觀察者模式。
一、RxJava案例和流程
Observable<String> observable = Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello");
}
})
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.i(TAG, s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});


這是一個(gè)標(biāo)注的RxJava使用例子,其實(shí)這個(gè)流程就是先從上往下一級一級的封裝對應(yīng)的Observable,然后再從下往上通過Observable.subscribe訂閱向上傳遞Observer觀察者,最后ObservableCreate中將Observer進(jìn)行封裝CreateEmitter,調(diào)用ObservableOnSubscribe.subscribe(emitter)用于業(yè)務(wù)中通知,CreateEmitter是ObservableEmitter子類實(shí)現(xiàn),是ObservableCreate的靜態(tài)內(nèi)部類
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public String toString() {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
}
}
從CreateEmitter的內(nèi)部onNext方法可以看出,其內(nèi)部就是通過調(diào)用Observer.onNext通知觀察者結(jié)果。其實(shí)整個(gè)流程來看,Observable向下一層層進(jìn)行封裝,通過鉤子函數(shù)的方式,而Observer從下向上進(jìn)行封裝傳遞,而且業(yè)務(wù)都是在Observer中的onNext執(zhí)行的,比如.map()中定義的Function,其實(shí)就是在MapObserver中的onNext中執(zhí)行

二、分析每步的源碼
RxJavaPlugins.onAssembly
在具體分析之前,首先先看RxJavaPlugins.onAssembly的具體實(shí)現(xiàn),RxJavaPlugins.onAssembly方法可以說貫穿整個(gè)RxJava流程,RxJavaPlugins.onAssembly方法目的就是作為一個(gè)鉤子函數(shù),將之前的Observable進(jìn)行封裝,變成一個(gè)新的Observable。
在上面的例子中,這里的f對象一直都是null。
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
1.Observable.subscribe
所有的Observable以及子類調(diào)用subscribe方法時(shí),都是調(diào)用Observable.subscribe(),所以在上述流程中,第一個(gè)調(diào)用subscribe()方法的就是ObservableObServeOn這個(gè)Observable子類,ObservableObServeOn是在observeOn()方法中對上一個(gè)Observable進(jìn)行封裝創(chuàng)建的。
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
2.ObservableObserveOn
ObservableObserveOn對象其實(shí)就是在調(diào)用ObserveOn的時(shí)候創(chuàng)建的。封裝當(dāng)前的Observable即在observeOn方法調(diào)用之前最新創(chuàng)建的Observable
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
// 這里的source其實(shí)就是Observable實(shí)現(xiàn)類,在上面的例子中,其實(shí)就是ObservableSubscribeOn對象
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
// 封裝Observer,然后通過調(diào)用上一個(gè)Observable.subscribe,向上傳遞Observer觀察者
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...
}
在這里創(chuàng)建了一個(gè)Scheduler.Worker,Scheduler是一個(gè)線程調(diào)度類,Worker其實(shí)就是針對線程調(diào)用的工作者。而Scheduler會通過不同的子類實(shí)現(xiàn),將當(dāng)前observeOn定義的線程,即讓在observeOn之前封裝的Observer都在該線程之前(Observer是從下向上傳遞的)。
ObserveOn與SubscribeOn有點(diǎn)相反,ObserveOn針對的是Observer,即觀察者,觀察者是從下向上傳遞封裝的,而ObserverOn中接收到的觀察者,其實(shí)是其下游封裝之后的觀察者,所以O(shè)bserveOn針對的是其下游
具體分析observeOn(AndroidSchedulers.mainThread())
(1)AndroidSchedulers.mainThread()
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
(2)AndroidSchedulers.MAIN_THREAD
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
(3)AndroidSchedulers.MainHolder.DEFAULT
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
所以AndroidSchedulers.mainThread()的線程切換到主線程,其實(shí)就是交由Handler來實(shí)現(xiàn)。HandlerScheduler其實(shí)是Scheduler的子類,依然是用來創(chuàng)建Scheduler.Worker實(shí)例,然后通過Worker.schedule方法進(jìn)行線程切換,將之前的線程切換到主線程。
(4)observeOn對應(yīng)的Observable->ObservableObserveOn
而ObservableObserveOn中也有一個(gè)Scheduler實(shí)例,這個(gè)實(shí)例其實(shí)就是HandlerScheduler對象。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...
}
(5)Scheduler.Worker w = scheduler.createWorker();
這里其實(shí)就是調(diào)用了HandlerScheduler這個(gè)Scheduler的createWorkder()方法
// HandlerScheduler.createWorker()
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
(6)線程切換
而ObserveOnObserver中的線程切換,其實(shí)就是調(diào)用ObserveOnObserver的schedule()方法實(shí)現(xiàn)的,而ObserveOnObserver中的Scheduler.Worker worker對象,是一個(gè)HandlerWorker對象。
// ObserveOnObserver類中的部分方法
@Override
public void run() {
if (outputFused) {
// 直接執(zhí)行下游Observer.onNext方法。
drainFused();
} else {
// 從隊(duì)列中取任務(wù)
drainNormal();
}
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
這里的this,其實(shí)就是ObserveOnObserver對象,而在ObserveOnObserver中,都會將要執(zhí)行的task放到隊(duì)列中。而ObserveOnObserver本身就是一個(gè)Runnable
(7)HandlerSchedule.HandlerWorker.schedule
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
// 這里的run,其實(shí)就是ObserveOnObserver對象
run = RxJavaPlugins.onSchedule(run);
// 封裝ObserveOnObserver對象,ObserveOnObserver本身就是一個(gè)Runnable
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
// 通過Handler執(zhí)行消息,進(jìn)而達(dá)到切換到主線程的目的
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
所以從上面的代碼可以看出,其實(shí)ObserveOn的切換任務(wù),首先會接收到會被后一個(gè)Observer調(diào)用onNext觸發(fā)調(diào)用ObserveOnObserver的onNext()方法(Observer從下向上封裝,所以下面是前一個(gè),上面是后一個(gè))然后就會調(diào)用ObserveOnObserver的schedule()方法,觸發(fā)Handler同步執(zhí)行任務(wù),而封裝的Runnable其實(shí)就是在其run()方法中調(diào)用了ObserveOnObserver的run()方法調(diào)用了前一個(gè)(下游)的Observer的onNext()方法,將結(jié)果轉(zhuǎn)換線程回調(diào)給了觀察者
所以是針對ObserveOn下游代碼的。
3.ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
...
}
我們知道,subscribeOn只有設(shè)置第一次有效,這其實(shí)是因?yàn)镾ubscribeOnObserver.setDisposable方法中,調(diào)用的是DisposableHelper.setOnce(this, d);方法
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
...
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
而DisposableHelper.setOnce方法的實(shí)現(xiàn)如下:
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
ObjectHelper.requireNonNull(d, "d is null");
if (!field.compareAndSet(null, d)) {
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
而當(dāng)前subscribeOn對應(yīng)的Observable即ObservableSubscribeOn中的subscribeActual方法中,針對之前傳進(jìn)來的Observer做了封裝,而這里的source其實(shí)就是在subscribeOn之前的Observable,所以在subscribeTask中的run方法中調(diào)用source.subscribe(parent);其實(shí)就是將subscribeOn之前的邏輯運(yùn)行的線程切換到了subscribeOn指定的線程。而subscribeOn后面部分的代碼,如果沒有指定線程切換,都是在subscribeActual中調(diào)用source.subscribe()的,所以并不會在subscribeOn指定的線程中執(zhí)行
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
如果調(diào)用多次的subscribeOn,其實(shí)每次的線程切換都會生效,但是最終只有第一個(gè)調(diào)用的subscribeOn會生效的,這個(gè)原因其實(shí)就是subscribeOn是切換其上游的線程,而subscribeOn線程切換,其實(shí)切換的就是source.subscribe(parent)所在的線程,如果create().subscribeOn,那么subscribeOn切換的就是create()中的source(即ObservableCreate)所在線程。subscribeOn創(chuàng)建對應(yīng)的ObservableSubscribeOn這個(gè)Observable是從上向下的,但是調(diào)用subscribeActual,封裝Observer是從下向上的,所以就算多次使用subscribeOn進(jìn)行線程的切換,最終只有第一個(gè)subscribeOn生效,即最后被調(diào)用subscribeActual的ObservableSubscribeOn生效了。
subscribeOn與ObserveOn不同的是,ObserveOn切換的是Observer的線程,而subscribeOn切換的是Observable的線程。所以,subscribeOn是影響上游的操作,而observeOn影響的是其下游的操作

看這個(gè)圖,其實(shí)ObservableSubscribeOn就是subscribeOn創(chuàng)建的對應(yīng)的Observable,在這個(gè)Observable的subscribeActual方法中,其實(shí)就是線程切換執(zhí)行任務(wù),而對應(yīng)的Runnable.run方法中調(diào)用的其實(shí)就是source.subscribe(),即ObservableSubscribeOn內(nèi)部封裝的Observable.subscribe線程做了切換調(diào)度,而ObservableSubscribeOn內(nèi)部封裝的Observable是subscribeOn上游創(chuàng)建的Observable

針對parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));做分析
(1)Scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
(2)Scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 創(chuàng)建工作對象,內(nèi)部通過線程池執(zhí)行runable任務(wù),這個(gè)任務(wù)其實(shí)就是SubscribeTask
// 創(chuàng)建Scheduler.Worker實(shí)例,其目的就是為了創(chuàng)建EventLoopWorker實(shí)例以及其內(nèi)部的ThreadWorker實(shí)例
// 最終的線程池調(diào)用就是通過ThreadWorker內(nèi)部的線程池來進(jìn)行,這樣就將任務(wù)交給了ThreadWorker中的線程池
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 封裝Runable,其實(shí)還是一個(gè)Runbale,只不過多實(shí)現(xiàn)了Disposable接口
// 這是為了可以在中斷任務(wù)的時(shí)候,將這個(gè)異步執(zhí)行的任務(wù)中斷
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
這里的createWorker()方法,其實(shí)是調(diào)用的IoScheduler.createWorker方法。
(3)IoScheduler.createWorker
@NonNull
@Override
public Worker createWorker() {
// 創(chuàng)建Scheduler.Worker實(shí)例對象,這時(shí)會構(gòu)造器內(nèi)部創(chuàng)建ThreadWorker實(shí)例
// ThreadWorker實(shí)例是從CachedWorkerPool實(shí)例調(diào)用get()方法創(chuàng)建的
return new EventLoopWorker(pool.get());
}
這里的pool是一個(gè)AtomicReference<CachedWorkerPool>對象,get()得到的是一個(gè)線程池包裝類。
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
// 如果cached中沒有ThreadWorker,則會創(chuàng)建一個(gè),并且在ThreadWorker構(gòu)造器中會創(chuàng)建線程池ScheduledExecutorService executor;
// 這個(gè)是因?yàn)門hreadWorker構(gòu)造器執(zhí)行父類構(gòu)造器的時(shí)候創(chuàng)建的
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
...
}
(4)第三步里的w.schedule其實(shí)就是調(diào)用IoScheduler中的內(nèi)部類EventLoopWorker.schedule
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
而threadWorker其實(shí)就是通過CachedWorkerPool.get()得到的。
而threadWorker其實(shí)就是ThreadWorker,是IoScheduler的內(nèi)部類,是NewThreadWorker的子類實(shí)現(xiàn)。所以這里調(diào)用scheduleActual,其實(shí)就是調(diào)用NewThreadWorker.scheduleActual
(5)NewThreadWorker.scheduleActual
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
// executor其實(shí)就是線程池對象。sr就是要執(zhí)行的任務(wù),
// executor這個(gè)線程池是在創(chuàng)建ThreadWorker的時(shí)候初始化創(chuàng)建的
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
而任務(wù)中,其實(shí)就是ObservableSubscribeOn的靜態(tài)內(nèi)部類實(shí)現(xiàn)對象,SubscribeTask。而SubscribeTask這個(gè)Runnable的run方法中是調(diào)用了source.subscribe(),這個(gè)source其實(shí)就是在subscribeOn之前封裝的Observable實(shí)例,所以這里的線程池異步調(diào)用的時(shí)候,其實(shí)就是執(zhí)行subscribeOn之前的Observable.subscribe過程,所以subscribeOn是針對代碼上游的線程切換。
但是Observer.onSubscribe并不會為異步,subscribeOn的線程切換不會針對onSubscribe,調(diào)用subscribe的Observable在什么線程,則onSubscribe就在什么線程中執(zhí)行。而onError和onComplete都是會因?yàn)榫€程切換而影響。
4.ObservableMap
其實(shí)map操作符,內(nèi)部創(chuàng)建的是ObservableMap,會傳入Function對象,而Function對象是被封裝在MapObserver這個(gè)Observer中的,當(dāng)從上游向下調(diào)用發(fā)送Observer.onNext的時(shí)候,就會在MapObserver中觸發(fā)Function中的操作。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
...
}
5.ObservableCreate
這里就看關(guān)鍵部分的源碼,即subscribeActual的實(shí)現(xiàn)
public final class ObservableCreate<T> extends Observable<T> {
// ObservableCreate類 = Observable的子類
...
// 僅貼出關(guān)鍵源碼
final ObservableOnSubscribe<T> source;
// 構(gòu)造函數(shù)
// 傳入了傳入source對象 = 手動(dòng)創(chuàng)建的ObservableOnSubscribe對象
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
/**
* 重點(diǎn)關(guān)注:復(fù)寫了subscribeActual()
* 作用:訂閱時(shí),通過接口回調(diào) 調(diào)用被觀察者(Observerable) 與 觀察者(Observer)的方法
* 該方法,是在被觀察者調(diào)用subscribe()方法與觀察者綁定的時(shí)候,調(diào)用的。
**/
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 1. 創(chuàng)建1個(gè)CreateEmitter對象(封裝成1個(gè)Disposable對象)
// 作用:發(fā)射事件
// CreateEmitter類中是對觀察者的一個(gè)封裝類,用于被觀察者變化時(shí)向觀察者分發(fā)事件
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 2. 調(diào)用觀察者(Observer)的onSubscribe()
// onSubscribe()的實(shí)現(xiàn) = 使用步驟2(創(chuàng)建觀察者(Observer))時(shí)復(fù)寫的onSubscribe()
// Observer對象的onSubscribe方法實(shí)現(xiàn)
observer.onSubscribe(parent);
try {
// 3. 調(diào)用source對象的subscribe()
// source對象 = 使用步驟1(創(chuàng)建被觀察者(Observable))中創(chuàng)建的ObservableOnSubscribe對象
// subscribe()的實(shí)現(xiàn) = 使用步驟1(創(chuàng)建被觀察者(Observable))中復(fù)寫的subscribe()->>分析2
// 這里調(diào)用的,其實(shí)就是在Observable.create方法中,
// 實(shí)現(xiàn)ObservableOnSubscribe接口的時(shí)候,實(shí)現(xiàn)的subscrebe方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
從這部分代碼可以看出,在ObservableCreate.subscribeActual內(nèi)部會調(diào)用ObservableOnSubscribe.subscribe,但是在調(diào)用這個(gè)方法之前,會調(diào)用觀察者的onSubscribe()方法,其實(shí)就是事件開始。
三、操作符分析
1.分析map和flatMap的區(qū)別
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false);
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
要分析兩個(gè)操作符的區(qū)別,首先看源碼。
比如在map和flatMap兩個(gè)操作符中,分別做網(wǎng)絡(luò)請求,那么在map中,就會有問題。
因?yàn)槿绻趂latMap中執(zhí)行Retrofit請求接口,返回的是一個(gè)Observable<T>,那么可以看出,如果是使用flatMap的話,則flatMap中的泛型<R>其實(shí)是接口請求的返回值的Observable<T>的泛型T,而如果是使用map的話,那么map中的泛型<R>其實(shí)就是接口請求返回值Observable<T>,那么map的返回值就會變成Observable<Observable<T>>,這結(jié)構(gòu)出現(xiàn)了變化
所以map主要是用來做數(shù)據(jù)類型的轉(zhuǎn)換的,從一個(gè)數(shù)據(jù)類型轉(zhuǎn)為另外一個(gè)數(shù)據(jù)類型,而不會影響這個(gè)數(shù)據(jù)類型在Observable的,比如轉(zhuǎn)換之前是Observable<T>,轉(zhuǎn)換之后變成Observable<R>,依然是Observable結(jié)構(gòu)的,轉(zhuǎn)換的只是Observable中的泛型的類型。
而flatMap的話,可以使用Observable的泛型類型數(shù)據(jù),得到一個(gè)新的Observable,然后使用這個(gè)新的Observable替代了舊的Observable
flatMap在實(shí)際應(yīng)用場景中,可能會出現(xiàn)一個(gè)接口的請求你數(shù)據(jù)需要借助于前一個(gè)接口,這樣的接口多層嵌套的情況,在這樣的情況下,可以借助于flatMap來簡化嵌套層次,在flatMap中還可以借助于Observable.fromIterable實(shí)現(xiàn)一個(gè)發(fā)射器功能,即遍歷一個(gè)數(shù)組或者集合,然后按集合的長度進(jìn)行遍歷發(fā)射,這樣在這個(gè)flatMap的后面的觀察者就會執(zhí)行多次。
2.doOnNext
使用doOnNext代替subscribe,使用doOnNext在兩個(gè)請求的中間進(jìn)行一次UI更新操作
MyRetrofit.createRetrofit().create(TestApi.class)
.register("947674559qq.com", "123456", "123456")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
// todo 注冊完成之后更新ui
}
})
.observeOn(Schedulers.io())
.flatMap(new Function<LoginResponse, ObservableSource<LoginResponse>>() {
@Override
public ObservableSource<LoginResponse> apply(LoginResponse loginResponse) throws Exception {
Observable<LoginResponse> observable = MyRetrofit.createRetrofit().create(TestApi.class)
.loginWanAndroid(loginResponse.getData().getUsername(), loginResponse.getData().getPassword());
return observable;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<LoginResponse>() {
@Override
public void onSubscribe(Disposable d) {
// todo 如果是使用MVP模式,可以在BaseModel中定義CompositeDisposable,用來保存Disposable
// todo 然后BaseModel實(shí)現(xiàn)LifecycleObserver接口,這樣在BaseModel中就可以根據(jù)注解回調(diào)到activity的生命周期onDestroy
// todo 然后在BaseModel對生命周期的回調(diào)中mCompositeDisposable?.dispose()
// todo 顯示加載中的dialog
}
@Override
public void onNext(LoginResponse loginResponse) {
// todo 登錄完成之后更新UI
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
// todo 登錄完成之后才會回調(diào)complete,關(guān)閉dialog
}
});