本篇包含內(nèi)容包括
1 RxJava事件流向的基本流程;
2 自己手寫(xiě)一個(gè)RxJava的基本流程。
1 RxJava事件流向的基本流程
最開(kāi)始接觸RxJava時(shí),很多文章把observer,observable對(duì)象定義為觀察者和被觀察者。其實(shí)這樣容易把人給繞暈,直接把observable看成上游產(chǎn)生事件者,把observer看出下游接收處理事件者。

RxJava2最簡(jiǎn)單的調(diào)用方式如下:
Observable.create(new ObservableOnSubscribe<String>() { // ①
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
基本的邏輯是:Observable通過(guò)create方法創(chuàng)建一個(gè)<? extends Observable>的類,然后通過(guò)<? extends Observable>類調(diào)用subscribe方法,并傳入一個(gè)觀察者observer。
需要搞定的問(wèn)題:
問(wèn):
create方法創(chuàng)建的到底是什么類型的實(shí)例?
帶著問(wèn)題我們先來(lái)看①處的create方法,主要涉及的內(nèi)容如下:
// Observable 類
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); ①
}
// RxJavaPlugins類
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source; ②
}
為避免分散精力,我們只看主流分支,搞清主流分支,所有的脈絡(luò)就基本清晰。
由上①處可知,返回的Observable對(duì)象就是方法onAssembly的返回值,而當(dāng)onObservableAssembly為空時(shí),返回值其實(shí)就是①處傳進(jìn)來(lái)的ObservableCreate對(duì)象。
所以到此,我們可以回答上面的問(wèn)題了:create創(chuàng)建的對(duì)象就是ObservableCreate。
那么正常邏輯下,我們現(xiàn)在肯定要找ObservableCreate的subscribe方法。
嗯嗯~可事情會(huì)如我們預(yù)期的那么順利嗎?我們?cè)?code>ObservableCreate類中是找不到subscribe方法。
問(wèn):
subscribe在哪里?
我們來(lái)看看ObservableCreate類部分源碼:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
......
}
由上可知ObservableCreate繼承至Observable,但我們并沒(méi)有找到subscribe方法。于是第一個(gè)想法就是去父類找,也就是Observable類中。
// Observable類
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
我的乖乖~,subscribe竟然是個(gè)final方法,難怪在子類中看不到這個(gè)方法,不過(guò)我們看到了另一個(gè)方法subscribeActual,點(diǎn)進(jìn)去一看:
// Observable類
protected abstract void subscribeActual(Observer<? super T> observer);
是個(gè)抽象方法,這我們就放心了,有點(diǎn)類似與Android控件View中Measure方法中的onMeasure。
于是乎,我們就可以回答上面提出的問(wèn)題。
答:Observable的subscribe方法是抽象方法,所有子類繼承Observable后,實(shí)現(xiàn)其抽象方法subscribeActual,進(jìn)行實(shí)際的訂閱操作。
于是我們就直接去看ObservableCreate類中的subscribeActual方法:
// ObservableCreate類
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer); ①
observer.onSubscribe(parent); ②
try {
source.subscribe(parent); ③
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
問(wèn):發(fā)送消息的基本原理?
①處創(chuàng)建發(fā)射器
②處的方法是不是看上去很眼熟,對(duì)的,沒(méi)錯(cuò),它就是我們觀察者Observer最先被調(diào)用的方法。
③處source就是我們最開(kāi)始調(diào)用的create方法中的參數(shù)ObservableOnSubscribe,也就是發(fā)射器中的參數(shù)。
還記得我們最開(kāi)始那個(gè)例子嗎?
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
}
})
......
③處傳入的參數(shù)就是此處ObservableEmitter對(duì)象的實(shí)例,所以每次當(dāng)
我們利用emitter發(fā)送消息時(shí),就觸發(fā)了CreateEmitter類的onNext方法。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// 觸發(fā)observer對(duì)象的onNext方法
observer.onNext(t);
}
}
省略部分代碼.....
}
2 自己手寫(xiě)一個(gè)RxJava的基本流程
古人云:紙上得來(lái)終覺(jué)淺,絕知此事要躬行。我們自己手寫(xiě)一個(gè)這個(gè)的過(guò)程比任何教程都會(huì)要記得深刻,牢固。
寫(xiě)我們的Observable,我把命名都加上一個(gè)perry前綴。
abstract class PerryObservable<T> {
// 訂閱開(kāi)始調(diào)用的方法
fun subscribe(observer: PerryObserver<in T>) {
subscribeActual(observer)
}
internal abstract fun subscribeActual(observer: PerryObserver<in T>)
companion object {
// create構(gòu)造方法
fun <T> create(source: PerryObservableOnSubscribe<T>): PerryObservable<T> {
return PerryObservableCreate(source)
}
}
}
接下來(lái)是PerryObservableCreate。
class PerryObservableCreate<T>(private val source: PerryObservableOnSubscribe<T>) : PerryObservable<T>() {
override fun subscribeActual(observer: PerryObserver<in T>) {
val emitter = PerryCreateEmitter(observer)
observer.onSubscribe(emitter)
source.subscribe(emitter)
}
// 這是我們的發(fā)射器
class PerryCreateEmitter<T> internal constructor(private val observer: PerryObserver<in T>)
: PerryDisposable, PerryEmitter<T> {
override fun dispose() {
}
override fun isDisposed(): Boolean {
return false
}
override fun onNext(value: T) {
observer.onNext(value)
}
override fun onError(error: Throwable) {
observer.onError(error)
}
override fun onComplete() {
observer.onComplete()
}
}
}
其他全部都是一些接口,就不一一貼出來(lái)啦。
interface PerryEmitter<T> {
fun onNext(@NonNull value: T)
fun onError(@NonNull error: Throwable)
fun onComplete()
}
最終調(diào)用方法如下:
PerryObservable.Companion.create(new PerryObservableOnSubscribe<String>() {
@Override
public void subscribe(PerryObservableCreate.PerryCreateEmitter<String> emitter) {
emitter.onNext("hello");
emitter.onNext("world");
}
}).subscribe(new PerryObserver<String>() {
@Override
public void onSubscribe(PerryDisposable d) {
}
@Override
public void onNext(String s) {
Log.d("zp_test", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印日志:

至此,我們基本搞清其發(fā)生消息的邏輯,由于篇幅有限,其他更加高級(jí)的功能,期待下次再見(jiàn)!