RxJava大概流程:
1.Observable.create 創(chuàng)建事件源,但并不生產(chǎn)也不發(fā)射事件。
代碼創(chuàng)建被觀察者Observable
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("a");
e.onComplete();
}
});
源碼跟進(jìn)Observable.create
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
//ObservableOnSubscribe是個(gè)接口,只包含subscribe方法。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");//判段是否為空
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
繼續(xù)跟進(jìn)RxJavaPlugins.onAssembly方法
/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook's input value
* @return the value returned by the hook
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
通過(guò)調(diào)試得知靜態(tài)對(duì)象onObservableAssembly默認(rèn)為null, 所以此方法直接返回傳入的參數(shù)source。
由此可以看出來(lái)整個(gè)Observable.create方法就是創(chuàng)建了個(gè)事件源new ObservableCreate()對(duì)象,將ObservableOnSubscribe作為參數(shù)傳遞給ObservableCreate的構(gòu)造函數(shù)。事件是由接口ObservableOnSubscribe的subscribe方法上產(chǎn)生的,但此時(shí)并不生產(chǎn)也不發(fā)射事件。
2.實(shí)現(xiàn)observer接口,此時(shí)沒(méi)有也無(wú)法接受到任何發(fā)射來(lái)的事件。
代碼創(chuàng)建觀察者Observer
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String str) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
源碼跟進(jìn)Observer
public interface Observer<T> {
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
Observer就是個(gè)接口,包含了上面四個(gè)方法,所以此時(shí)沒(méi)有也無(wú)法接受到任何發(fā)射來(lái)的事件
3.訂閱 observable.subscribe(observer), 此時(shí)會(huì)調(diào)用具體Observable的實(shí)現(xiàn)類中的subscribeActual方法,此時(shí)才會(huì)真正觸發(fā)事件源生產(chǎn)事件,事件源生產(chǎn)出來(lái)的事件通過(guò)Emitter的onNext,onError,onComplete發(fā)射給observer對(duì)應(yīng)的方法由下游observer消費(fèi)掉。從而完成整個(gè)事件流的處理。
代碼關(guān)聯(lián)觀察者和被觀察者
observable.subscribe(observer);
代碼跟進(jìn)subscribe
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//hook ,默認(rèn)直接返回observer
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//真正實(shí)現(xiàn)訂閱的方法。
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
跟進(jìn)subscribeActual方法
//subscribeActual 是抽象方法,所以需要到實(shí)現(xiàn)類中去看具體實(shí)現(xiàn),也就是上文中1.提到的ObservableCreate中
protected abstract void subscribeActual(Observer<? super T> observer);
public ObservableCreate(ObservableOnSubscribe<T> source) {
//事件源,生產(chǎn)事件的接口
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//發(fā)射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//直接回調(diào)了觀察者的onSubscribe 并傳回了Disposable
//observer中可以利用Disposable來(lái)隨時(shí)中斷事件流的發(fā)射。
observer.onSubscribe(parent);
try {
// 調(diào)用了事件源subscribe方法生產(chǎn)事件,同時(shí)將發(fā)射器傳給事件源。
// 現(xiàn)在我們明白了,數(shù)據(jù)源生產(chǎn)事件的subscribe方法只有在observable.subscribe(observer)被執(zhí)行后才執(zhí)行的。 換言之,事件流是在訂閱后才產(chǎn)生的。
// 而observable被創(chuàng)建出來(lái)時(shí)并不生產(chǎn)事件,同時(shí)也不發(fā)射事件。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
現(xiàn)在可以看出來(lái),只有subscribe訂閱后,數(shù)據(jù)源才會(huì)開始生產(chǎn)事件和發(fā)射事件。
接下來(lái)看看事件是如何被發(fā)射出去的,看下CreateEmitter源碼
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {}
CreateEmitter 實(shí)現(xiàn)了ObservableEmitter接口,同時(shí)ObservableEmitter接口又繼承了Emitter接口。
CreateEmitter 還實(shí)現(xiàn)了Disposable接口,這個(gè)disposable接口是用來(lái)判斷是否中斷事件發(fā)射的。
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
看到這就能看出來(lái),當(dāng)訂閱成功后,數(shù)據(jù)源ObservableOnSubscribe開始生產(chǎn)事件,調(diào)用Emitter的onNext,onComplete向下游發(fā)射事件,Emitter包含了observer的引用,又調(diào)用了observer onNext,onComplete,這樣下游observer就接收到了上游發(fā)射的數(shù)據(jù)。
RxJava的線程調(diào)度機(jī)制
感謝大神們的心得:
https://www.cnblogs.com/linghu-java/p/9719427.html