面試的時(shí)候被問(wèn)道各種框架的原理架構(gòu),也是很尷尬,自以為寫(xiě)的代碼不少,用過(guò)的框架也不少,深入的去研究源碼的還真是不多,也是給自己敲了一個(gè)警鐘,今天就來(lái)嘗試剖析一下Rxjava2的源碼,水平有限,就先看一下基礎(chǔ)的用法相關(guān),一些難度更高的操作符就慢慢來(lái)分析吧。
就按照平時(shí)使用的順序來(lái)分析:
一、初始化Observerble
基本使用實(shí)例:
Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("aaa");
}
});
先看一下內(nèi)部的參數(shù) ObservableOnSubscribe<>() 。
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}
就是一個(gè)接口,這里用的就是它的一個(gè)匿名實(shí)現(xiàn)類。而接口內(nèi)部的方法中我們看到ObservableEmitter<> 是一個(gè)Rxjava2新推出的類,俗稱發(fā)射器。
public interface ObservableEmitter<T> extends Emitter<T> {
/**
* Sets a Disposable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param d the disposable, null is allowed
*/
void setDisposable(Disposable d);
/**
* Sets a Cancellable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param c the cancellable resource, null is allowed
*/
void setCancellable(Cancellable c);
/**
* Returns true if the downstream disposed the sequence.
* @return true if the downstream disposed the sequence
*/
boolean isDisposed();
/**
* Ensures that calls to onNext, onError and onComplete are properly serialized.
* @return the serialized ObservableEmitter
*/
ObservableEmitter<T> serialize();
}
這里面幾個(gè)回調(diào)方法的作用注釋也說(shuō)的很清楚了就不多說(shuō)了。
它繼承自Emitter
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(@NonNull T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(@NonNull Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
可以看到,這里面就是我們比較熟悉的next、complete、error三個(gè)回調(diào)方法了。其實(shí)這個(gè)create方法內(nèi)部的參數(shù)就是兩個(gè)接口的回調(diào),理解就行了,然后看一下create方法。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
requireNonNull是很好理解的,看名字也能猜測(cè)出是測(cè)試傳進(jìn)來(lái)的ObservableOnSubscribe是否為空
public static <T> T requireNonNull(T object, String message) {
if (object == null) {
throw new NullPointerException(message);
}
return object;
}
而源碼也驗(yàn)證了我們的想法。關(guān)鍵是后面一句,先看一下具體的方法實(shí)現(xiàn)。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
一句一句來(lái)分析:
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
@NonNull
R apply(@NonNull T t) throws Exception;
}
這里的Function也是一個(gè)接口,作用也很明顯,將T類型的數(shù)據(jù)轉(zhuǎn)化成R類型數(shù)據(jù)。那是我們?cè)谑褂玫?/p>
observable.map(new Function<String, Object>() {
@Override
public Object apply(@NonNull String s) throws Exception {
return null;
}
})
類似這種類型轉(zhuǎn)換的語(yǔ)句時(shí)候才會(huì)用到,這里我們先不管它,一開(kāi)始是默認(rèn)為null的,所以這個(gè)方法最后就會(huì)return source;就是將括號(hào)中的new ObservableCreate<T>(source)原樣返回。這個(gè)ObservableCreate又是什么呢?
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
源碼比較長(zhǎng)我們就之看一下它的構(gòu)造函數(shù)就可以了,目前只需要知道這是一個(gè)Observerble的子類就可以了,至于Observerble這個(gè)類,等到大概摸清楚了事件流程再回頭來(lái)分析。所以到現(xiàn)在我們的第一步初始化就算是分析完了流程。
二、初始化一個(gè)Observer
用法示例:
Observer<String> observer=new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
subscription=d;
}
@Override
public void onNext(String value) {
LogUtil.log(TAG," "+value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
LogUtil.log(TAG,"complete");
}
};
這個(gè)分析就要簡(jiǎn)單很多了,Observer只是一個(gè)簡(jiǎn)單的接口,這里也只是具體實(shí)現(xiàn)了一下接口回調(diào)。
public interface Observer<T> {
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
不過(guò)這里和Rxjava1也是有些區(qū)別的,多了一個(gè)onSubscribe 注釋也說(shuō)的很清楚,用于隨時(shí)取消訂閱。
第二步很輕松,下面看一下第三步
三、建立訂閱關(guān)系
用法示例:
observable.subscribe(observer);
這里我們就只分析最簡(jiǎn)單的一種,看一下源碼:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
第一句還是一樣,判斷是否為空,平時(shí)自己寫(xiě)代碼也要像這樣注意代碼的健壯性。
重點(diǎn)就是這三句了。
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
一句一句來(lái)看:
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
在這個(gè)onsubscribe中是不是覺(jué)得有些眼熟?就跟剛剛的onAssenmbly幾乎一樣,由于我們沒(méi)有其它的功能,所以這里onObservableSubscribe也是null,也是返回原值,下面的requireNonNull我們也見(jiàn)過(guò)了,又驗(yàn)證一遍是否為空,因?yàn)槿绻覀兗尤肓薋unction函數(shù),上面就不會(huì)返回原來(lái)的observer了,所以還要再驗(yàn)證一遍。
于是就到了最后一句
protected abstract void subscribeActual(Observer<? super T> observer);
???
怎么是個(gè)abstract方法?那么它是在哪實(shí)現(xiàn)的呢?
回想看我們的observable初始化過(guò)程。哪里出的問(wèn)題呢?就是我們一開(kāi)始沒(méi)有分析的ObservableCreate,我們?cè)诔跏蓟臅r(shí)候就將一個(gè)ObservableCreate類向上轉(zhuǎn)型賦值給了Observerble,所以方法的具體實(shí)現(xiàn)也就在ObservableCreate里了。
繼續(xù)跟進(jìn)。果不其然:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
還是一句一句來(lái)看
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
又是一個(gè)新的類
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
所以我們的前兩句
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
所以我們的前兩局就是回調(diào)了onSubscribe接口,從而將這個(gè)CreateEmitter類型轉(zhuǎn)型成Disposable輸出了。而CreateEmitter的初始化參數(shù)又是observer本身,所以大體上可以看成回調(diào)了另一個(gè)格式的自己。。。然后一般可用于自殺(取消訂閱)。。。
然后就來(lái)到的最后一句
source.subscribe(parent);
這里的source就是
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
}
}
剛剛我們初始化observable傳入的。這個(gè)parent->這里的參數(shù)e。于是就這樣完成了Observerble和Observer的綁定,也就能實(shí)現(xiàn)接口回調(diào)了。
沒(méi)有任何其它功能,只是走了一邊最基本流程的Rxjava源碼,后面還會(huì)繼續(xù)更新的。