
在分析之前呢,首先需要大家打開(kāi)源碼對(duì)照分析。效果更加??!
首先我們看一下RxJava 2 三步曲的一個(gè)基本實(shí)現(xiàn):
1 創(chuàng)建被觀察者(也叫數(shù)據(jù)發(fā)射者)
2 創(chuàng)建觀察者(也叫數(shù)據(jù)消費(fèi)者)
3 建立訂閱關(guān)系
代碼如下:
//第一步 創(chuàng)建被觀察者
Observable<String> observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("發(fā)射數(shù)據(jù)");
e.onComplete();
}
}
);
//第二步 創(chuàng)建觀察者
Observer<String> observer = new Observer<String>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//第三步 建立訂閱關(guān)系
observable.subscribe(observer);
新記:
我們來(lái)看一下創(chuàng)建Observable的這個(gè)靜態(tài)方法create.實(shí)際上這個(gè)是RxJava大量的操作符中的一個(gè),create方法會(huì)返回一個(gè)Observable實(shí)例。
create方法的參數(shù)是一個(gè)實(shí)現(xiàn)了ObservableOnSubscribe接口的對(duì)象實(shí)例,該接口提供了發(fā)射數(shù)據(jù)的回調(diào)subscribe()方法,回調(diào)回來(lái)的ObservableEmitter實(shí)例就可以看成是數(shù)據(jù)發(fā)射器,用來(lái)發(fā)射數(shù)據(jù)。
我們來(lái)看看Observable.create()的內(nèi)部實(shí)現(xiàn):
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
首先是創(chuàng)建了一個(gè)ObservableCreate實(shí)例,是Observable的子類,把之前參數(shù)中創(chuàng)建的ObservableOnSubscribe實(shí)例直接傳了進(jìn)去,作了件什么事呢?實(shí)際上實(shí)現(xiàn)了一個(gè)代理的作用,代理的是誰(shuí)?是Observer,后面我們?cè)僭敿?xì)分析,我們可以確定,ObservableCreate就是一個(gè)Observable。
我們?cè)倏碦xJavaPlugins.onAssembly().這個(gè)方法拿了ObservableCreate(Observable)實(shí)例去做了什么:
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
RxJavaPlugins這個(gè)類是一個(gè)鉤子函數(shù)集合類,為RxJava中大量的操作符提供鉤子函數(shù)的注入。這里的鉤子函數(shù)會(huì)對(duì)Observable實(shí)例按照鉤子函數(shù)實(shí)際提供的功能進(jìn)行加工處理,然后返回一個(gè)處理過(guò)的Observable。實(shí)際上在我們上面所寫的這個(gè)代碼示例中,這里的鉤子函數(shù)為null,并沒(méi)有對(duì)Observable做任何處理就直接返回了。
實(shí)際上我們示例代碼中整個(gè)被觀察者Observable的創(chuàng)建,實(shí)際創(chuàng)建的是一個(gè)ObservableCreate實(shí)例,該實(shí)例提供了回調(diào)方法subscribe(),當(dāng)發(fā)生訂閱行為時(shí)會(huì)回調(diào),也就是示例代碼中執(zhí)行observable.subscribe(),訂閱后就可以發(fā)射數(shù)據(jù)了。通過(guò)數(shù)據(jù)發(fā)射器ObservableEmitter來(lái)進(jìn)行數(shù)據(jù)發(fā)射。
像這種訂閱后才開(kāi)始發(fā)射數(shù)據(jù)的,我們稱為Cold Observable; 另外一種稱作Hot Observable,這種是不管有沒(méi)有觀察者來(lái)訂閱都會(huì)不斷地發(fā)射數(shù)據(jù)。
我們?cè)賮?lái)看看observable.subscribe()訂閱的內(nèi)部代碼:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
...//這里把不關(guān)心的代碼省略掉了
}
首先是代碼中的RxJavaPlugins.onSubscribe()的調(diào)用與上面講的鉤子方法是一樣的,這里是通過(guò)鉤子方法對(duì)observer作了某種處理。示例中實(shí)際上也并未調(diào)用實(shí)際的鉤子方法。
最后就執(zhí)行到了subscribeActual()方法,我們前面講過(guò)我們的Observable是一個(gè)ObservableCreate實(shí)例,subscribeActual方法在Observable中是一個(gè)虛方法,真正的實(shí)現(xiàn)是在ObservableCreate中,代碼如下:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
代碼中的CreateEmitter是ObservableCreate的內(nèi)部類,實(shí)現(xiàn)了對(duì)observer的代理,CreateEmitter同時(shí)也實(shí)現(xiàn)了Disposable接口,該接口提供了dispose方法,可以用來(lái)停止對(duì)數(shù)據(jù)的接收。
我們接下來(lái)再看后面的幾行代碼就很明了了,首先是進(jìn)行了observer的onSubscribe回調(diào),然后是調(diào)用了observable的subscribe回調(diào),回調(diào)后就執(zhí)行數(shù)據(jù)發(fā)射操作。
整個(gè)過(guò)程就這樣。
最后,我們來(lái)看一下執(zhí)行的順序:
