1.回顧
上篇已經(jīng)介紹了RxJava的基本概念以及用法 RxJava2基本框架分析一(基礎(chǔ)篇)
2.實(shí)例講解
// RxJava的鏈?zhǔn)讲僮? // 1. 創(chuàng)建被觀察者(Observable) & 定義需發(fā)送的事件
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
// 2. 創(chuàng)建觀察者(Observer) & 定義響應(yīng)事件的行為
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("開始采用subscribe連接");
}
// 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()
@Override
public void onNext(Integer value) {
System.out.println("對(duì)Next事件" + value + "作出響應(yīng)");
}
@Override
public void onError(Throwable e) {
System.out.println("對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
System.out.println("對(duì)Complete事件作出響應(yīng)");
}
};
// 3. 通過訂閱(subscribe)連接觀察者和被觀察者
observable.subscribe(observer);
-
運(yùn)行結(jié)果
示意圖
3. 源碼分析
下面,我講根據(jù) 使用步驟 進(jìn)行RxJava2的源碼進(jìn)行分析
步驟1:創(chuàng)建被觀察者(Observable)&定義需發(fā)送的事件
步驟2:創(chuàng)建觀察者(Observer)&定義響應(yīng)事件的行為
步驟3:通過訂閱(subscribe)連接觀察者和被觀察者
步驟一:創(chuàng)建被觀察者(Observable)
- 源碼分析如下
// 1. 創(chuàng)建被觀察者(Observable) & 定義需發(fā)送的事件
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
/**
* 源碼分析 Observable.create(object : ObservableOnSubscribe<Int>{...])
* create 操作主要是創(chuàng)建了 ObservableCreate 對(duì)象并且返回出去
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//判斷source是否為空
ObjectHelper.requireNonNull(source, "source is null");
//hook函數(shù):判斷是否需要再原對(duì)象加上一些代碼操作(暫時(shí)可以當(dāng)做返回對(duì)象本身)
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
/**
* 下面我們來看看 ObservableCreate 對(duì)象里面做了什么操作
*/
public final class ObservableCreate<T> extends Observable<T> {
// ObservableCreate 是Observable的子類
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
//構(gòu)造函數(shù)
//傳入source對(duì)象,并且賦值全局 = 手動(dòng)創(chuàng)建的ObservableOnSubscribe匿名內(nèi)部類對(duì)象(Observable.create(new ObservableOnSubscribe<Integer>())
this.source = source;
}
//這里需要留心關(guān)注subscribeActual方法后面會(huì)講到
- 步驟1總結(jié):創(chuàng)建被觀察者的操作已經(jīng)完成了,調(diào)用
Observable.create()返回了一個(gè)ObservableCreate對(duì)象。
步驟二創(chuàng)建觀察者(Observer)
- 源碼分析
/**
* 使用步驟2:創(chuàng)建觀察者 & 定義響應(yīng)事件的行為(方法內(nèi)的創(chuàng)建對(duì)象代碼)
**/
// 2. 創(chuàng)建觀察者(Observer) & 定義響應(yīng)事件的行為
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("開始采用subscribe連接");
}
// 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()
@Override
public void onNext(Integer value) {
System.out.println("對(duì)Next事件" + value + "作出響應(yīng)");
}
@Override
public void onError(Throwable e) {
System.out.println("對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
System.out.println("對(duì)Complete事件作出響應(yīng)");
}
};
/**
* 源碼分析Observer類
**/
public interface Observer<T> {
// 注:Observer本質(zhì) = 1個(gè)接口
// 接口內(nèi)含4個(gè)方法,分別用于 響應(yīng) 對(duì)應(yīng)于被觀察者發(fā)送的不同事件
void onSubscribe(@NonNull Disposable d); // 內(nèi)部參數(shù):Disposable 對(duì)象,可結(jié)束事件
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
- 步驟2總結(jié):創(chuàng)建觀察者的操作已經(jīng)完成了,通過
new了一個(gè)Observer的匿名內(nèi)部類
步驟三:通過訂閱(subscribe)連接觀察者和被觀察者
- 源碼分析
// 3. 通過訂閱(subscribe)連接觀察者和被觀察者
observable.subscribe(observer);
/**
* 源碼分析:Observable.subscribe(observer)
* 說明:該方法屬于 Observable 類的方法(注:傳入1個(gè) Observer 對(duì)象)
**/
public abstract class Observable<T> implements ObservableSource<T> {
...
// 僅貼出關(guān)鍵源碼
@Override
public final void subscribe(Observer<? super T> observer) {
...
// 僅貼出關(guān)鍵源碼
//可以看到調(diào)用的是本類的下面抽象方法
subscribeActual(observer);
}
//定義了一個(gè)抽象方法當(dāng)調(diào)用subscribe時(shí)會(huì)跟這個(gè)調(diào)用Observable子類的實(shí)現(xiàn)方法(就是調(diào)用者)
protected abstract void subscribeActual(Observer<? super T> observer);
}
/**
* 現(xiàn)在我們回到先前創(chuàng)建的被觀察者中 ObservableCreate類
**/
public final class ObservableCreate<T> extends Observable<T> {
// ObservableCreate 是Observable的子類
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
//構(gòu)造函數(shù)
//傳入source對(duì)象,并且賦值全局 = 手動(dòng)創(chuàng)建的ObservableOnSubscribe匿名內(nèi)部類對(duì)象(Observable.create(new ObservableOnSubscribe<Integer>())
this.source = source;
}
/**
* 重點(diǎn)關(guān)注:復(fù)寫了subscribeActual()
* 作用:訂閱時(shí),通過接口回調(diào) 調(diào)用被觀察者(Observerable) 與 觀察者(Observer)的方法
**/
@Override
protected void subscribeActual(Observer<? super T> observer) {
//1. 創(chuàng)建1個(gè)CreateEmitter對(duì)象(封裝成一個(gè)Disposable對(duì)象)
//作用:發(fā)射事件
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//2. 調(diào)用觀察者(Observer)的onSubscribe()
// onSubscribe()的實(shí)現(xiàn) = 使用步驟2(創(chuàng)建觀察者(Observer))時(shí)復(fù)寫的onSubscribe()
//將Disposable(CreateEmitter) 傳到觀察者onSubscribe(Disposable d) 參數(shù)中,使之可以解除訂閱
observer.onSubscribe(parent);
try {
//3.調(diào)用source對(duì)象的subscribe()方法
// source對(duì)象 = 使用步驟1(創(chuàng)建被觀察者(Observable))中創(chuàng)建的ObservableOnSubscribe對(duì)象
//subscribe()的實(shí)現(xiàn) = 使用步驟1(創(chuàng)建被觀察者(Observable))中復(fù)寫的subscribe()
//將CreateEmitter對(duì)象傳遞給被觀察者進(jìn)行對(duì)象方法的調(diào)用(onNext(),onComplete()...)
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
/**
* 分析2:emitter.onNext("1");
* 此處僅講解subscribe()實(shí)現(xiàn)中的onNext()
* onError()、onComplete()類似,此處不作過多描述
**/
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
//初始化講觀察者賦值到全局變量observer
this.observer = observer;
}
@Override
public void onNext(T t) {
//當(dāng)被觀察者調(diào)用onNext()方法時(shí),回調(diào)此方法(步驟一中創(chuàng)建Observable.create()匿名內(nèi)部類中的onNext())
//發(fā)送的事件不能為null
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//判斷是否斷開連接(調(diào)用Disposable.dispose())
//沒有斷開的話,則調(diào)用觀察者中的onNext()方法
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
}
步驟3總結(jié):當(dāng)被觀察者訂閱觀察者的時(shí)候,會(huì)調(diào)用被觀察者Observable的subscribeActual()抽象方法,回調(diào)其子類重新的subscribeActual()方法。這方法里面有三個(gè)步驟:
- 創(chuàng)建1個(gè)
CreateEmitter對(duì)象(封裝成一個(gè)Disposable對(duì)象) - 調(diào)用觀察者(
Observer)的onSubscribe(CreateEmitter parent )使其可以取消訂閱 - 調(diào)用
source對(duì)象的subscribe(CreateEmitter parent)方法,通過parent發(fā)送事件回調(diào)
4. 源碼總結(jié)
- 在步驟1(創(chuàng)建被觀察者(Observable))、步驟2(創(chuàng)建觀察者(Observer))時(shí),僅僅只是定義了發(fā)送的事件 & 響應(yīng)事件的行為;
- 只有在步驟3(訂閱時(shí)),才開始發(fā)送事件 & 響應(yīng)事件,真正連接了被觀察者 & 觀察者
-
具體源碼總結(jié)如下
總結(jié)

