1 簡單使用步驟
1、創(chuàng)建被觀察者(Observable),定義要發(fā)送的事件。
2、創(chuàng)建觀察者(Observer),接受事件并做出響應操作。
3、觀察者通過訂閱(subscribe)被觀察者把它們連接到一起。
2 RxJava的消息訂閱例子
//步驟1. 創(chuàng)建被觀察者(Observable),定義要發(fā)送的事件。
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
//步驟2. 創(chuàng)建觀察者(Observer),接受事件并做出響應操作。
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
//步驟3. 觀察者通過訂閱(subscribe)被觀察者把它們連接到一起。
observable.subscribe(observer);
其輸出結果為:
onSubscribe
onNext : 文章1
onNext : 文章2
onNext : 文章3
onComplete
3 源碼分析
3.1 創(chuàng)建被觀察者過程
首先來看下創(chuàng)建被觀察者(Observable)的過程,上面的例子中我們是直接使用Observable.create()來創(chuàng)建Observable
3.1.1 Observable類的create()
創(chuàng)建一個ObservableCreate對象出來,然后把我們自定義的ObservableOnSubscribe作為參數(shù)傳到ObservableCreate中去,最后就是調用 RxJavaPlugins.onAssembly()方法。
3.1.2 ObservableCreate類
public final class ObservableCreate<T> extends Observable<T> {//繼承自Observable
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;//把我們創(chuàng)建的ObservableOnSubscribe對象賦值給source。
}
}
可以看到,ObservableCreate是繼承自Observable的,并且會把ObservableOnSubscribe對象給存起來。
3.1.3 RxJavaPlugins類的onAssembly()
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
//省略無關代碼
return source;
}
很簡單,就是把上面創(chuàng)建的ObservableCreate給返回。
3.1.4 簡單總結
Observable.create()中就是把我們自定義的ObservableOnSubscribe對象重新包裝成一個ObservableCreate對象,然后返回這個ObservableCreate對象。

3.1.5
Observable.create()的時序圖如下所示:

3.2 訂閱過程
3.2.1 Observable類的subscribe()
public final void subscribe(Observer<? super T> observer) {
//省略無關代碼
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
//省略無關代碼
}
可以看到,實際上其核心的代碼也就兩句,我們分開來看下:
3.2.2 RxJavaPlugins類的onSubscribe()
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
//省略無關代碼
return observer;
}
跟之前代碼一樣,這里同樣也是把原來的observer返回而已。
再來看下subscribeActual()方法。
3.2.3 Observable類的subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);
Observable類的subscribeActual()中的方法是一個抽象方法,那么其具體實現(xiàn)在哪呢?還記得我們前面創(chuàng)建被觀察者的過程嗎,最終會返回一個ObservableCreate對象,這個ObservableCreate就是Observable的子類,我們點進去看下:
3.2.4 ObservableCreate類的subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//觸發(fā)我們自定義的Observer的onSubscribe(Disposable)方法
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
可以看到,subscribeActual()方法中首先會創(chuàng)建一個CreateEmitter對象,然后把我們自定義的觀察者observer作為參數(shù)給傳進去。這里同樣也是包裝起來
這個CreateEmitter實現(xiàn)了ObservableEmitter接口和Disposable接口,如下:

這個CreateEmitter實現(xiàn)了ObservableEmitter接口和Disposable接口,如下:
然后就是調用了observer.onSubscribe(parent),實際上就是調用觀察者的onSubscribe()方法,即告訴觀察者已經成功訂閱到了被觀察者。
繼續(xù)往下看,subscribeActual()方法中會繼續(xù)調用source.subscribe(parent),這里的source就是ObservableOnSubscribe對象,即這里會調用ObservableOnSubscribe的subscribe()方法。
我們具體定義的subscribe()方法如下:
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
ObservableEmitter,顧名思義,就是被觀察者發(fā)射器。
所以,subscribe()里面的三個onNext()方法和一個onComplete()會逐一被調用。
3.2.5 CreateEmitter類的onNext()和onComplete()等
//省略其他代碼
@Override
public void onNext(T t) {
//省略無關代碼
if (!isDisposed()) {
//調用觀察者的onNext()
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//調用觀察者的onComplete()
observer.onComplete();
} finally {
dispose();
}
}
}
可以看到,最終就是會調用到觀察者的onNext()和onComplete()方法。
可以看到,上面有個isDisposed()方法能控制消息的走向,即能夠切斷消息的傳遞,這個后面再來說。
3.2.6 簡單總結
Observable(被觀察者)和Observer(觀察者)建立連接(訂閱)之后,會創(chuàng)建出一個發(fā)射器CreateEmitter,發(fā)射器會把被觀察者中產生的事件發(fā)送到觀察者中去,觀察者對發(fā)射器中發(fā)出的事件做出響應處理。
可以看到,是訂閱之后,Observable(被觀察者)才會開始發(fā)送事件。

3.2.7 時序流程圖
再來看下訂閱過程的時序流程圖:

4 切斷消息
4.1 切斷消息
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe : " + d);
mDisposable=d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
mDisposable.dispose();
Log.d(TAG, "切斷觀察者與被觀察者的連接");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
observable.subscribe(observer);
輸出結果為
onSubscribe : null
onNext : 文章1
切斷觀察者與被觀察者的連接
可以看到,要切斷消息的傳遞很簡單,調用下Disposable的dispose()方法即可。調用dispose()之后,被觀察者雖然能繼續(xù)發(fā)送消息,但是觀察者卻收不到消息了。
另外有一點需要注意,上面onSubscribe輸出的Disposable值是"null",并不是空引用null。
4.2 切斷消息源碼分析
Disposable是一個接口,可以理解Disposable為一個連接器,調用dispose()后,這個連接器將會中斷。
其具體實現(xiàn)在CreateEmitter類.
4.2.1 CreateEmitter的dispose()
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
就是調用DisposableHelper.dispose(this)而已。
4.2.2 DisposableHelper類
public enum DisposableHelper implements Disposable {
DISPOSED
;
//其他代碼省略
public static boolean isDisposed(Disposable d) {
//判斷Disposable類型的變量的引用是否等于DISPOSED
//即判斷該連接器是否被中斷
return d == DISPOSED;
}
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
//這里會把field給設為DISPOSED
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
}
可以看到DisposableHelper是一個枚舉類,并且只有一個值:DISPOSED。dispose()方法中會把一個原子引用field設為DISPOSED,即標記為中斷狀態(tài)。
因此后面通過isDisposed()方法即可以判斷連接器是否被中斷。
4.2.3 CreateEmitter類中的方法
@Override
public void onNext(T t) {
//省略無關代碼
if (!isDisposed()) {
//如果沒有dispose(),才會調用onNext()
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
//如果dispose()了,會調用到這里,即最終會崩潰
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
//省略無關代碼
if (!isDisposed()) {
try {
//如果沒有dispose(),才會調用onError()
observer.onError(t);
} finally {
//onError()之后會dispose()
dispose();
}
//如果沒有dispose(),返回true
return true;
}
//如果dispose()了,返回false
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//如果沒有dispose(),才會調用onComplete()
observer.onComplete();
} finally {
//onComplete()之后會dispose()
dispose();
}
}
}
如果沒有dispose,observer.onNext()才會被調用到。
onError()和onComplete()互斥,只能其中一個被調用到,因為調用了他們的任意一個之后都會調用dispose()。
先onError()后onComplete(),onComplete()不會被調用到。反過來,則會崩潰,因為onError()中拋出了異常:RxJavaPlugins.onError(t)。實際上是dispose后繼續(xù)調用onError()都會炸。