前言
使用RxJava也有一段時間了,通過這種訂閱數(shù)據(jù)的思想編寫代碼,避免了大量的接口回調(diào),使得數(shù)據(jù)處理更加方便,對外提供數(shù)據(jù)的方式更加統(tǒng)一,回避了同步接口和異步接口的不同。
本文是閱讀拋物線的《給 Android 開發(fā)者的 RxJava 詳解》一文后,結(jié)合閱讀源碼理解觀察、訂閱實現(xiàn)原理的筆記。
最簡單的觀察、訂閱
下面是一個Observable的創(chuàng)建和完成訂閱的示例代碼
Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(0);
subscriber.onCompleted();
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
需要關(guān)注的就三個:
- Observable
- OnSubscriber
- Subscriber
Observable
首先看create()如何創(chuàng)建了一個了一個Observable。
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
過程很簡單,就是將傳遞給 create() 的 OnSubscribe 保存了起來就結(jié)束了。RxJavaHooks主要是用于性能優(yōu)化,在RxJava的源代碼中很常見。
subscribe()
// 核心代碼
public final Subscription subscribe(Subscriber<? super T> subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
通過核心代碼,可見流程十分簡單,首先是調(diào)用傳入的 subscriber#onStart 方法,該方法默認不做任何操作。之后就是將Subscriber當(dāng)作參數(shù)調(diào)用Observable中的OnSubscriber#call,而在 call() 中調(diào)用了subscriber的 onNext() 和 onCompelte() 。數(shù)據(jù)就完成了從了Observable.OnSubscribe到Subscriber的數(shù)據(jù)的傳遞。最后返回的Subscriber是為了方便取消訂閱等操作。
給subscriber添加Subscription
在實際應(yīng)用中,會有這樣一個需求:在subscriber退訂時需要清理Observable被訂閱時一起創(chuàng)建的資源,例如關(guān)閉socket等。示例代碼如下:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
// 創(chuàng)建資源
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
// 在退訂時被調(diào)用,清理資源
}
}));
// do something
}
});
代碼中給subscriber添加了一個Subscription,Subscription接口有兩個方法:
- void unsubscribe();
- boolean isUnsubscribed();
void unsubscribe();在退訂時被調(diào)用。通過 Subscriptions#create 創(chuàng)建的Subscription會在退訂時調(diào)用 Action0#call 。實現(xiàn)代碼如下:
// 構(gòu)造方法
private BooleanSubscription(Action0 action) {
actionRef = new AtomicReference<Action0>(action);
}
@Override
public boolean isUnsubscribed() {
return actionRef.get() == EMPTY_ACTION;
}
@Override
public void unsubscribe() {
Action0 action = actionRef.get();
if (action != EMPTY_ACTION) {
action = actionRef.getAndSet(EMPTY_ACTION);
if (action != null && action != EMPTY_ACTION) {
action.call();
}
}
}
可見在退訂時會清除對action的引用,并且是通過判斷action是否為空引用來判斷是否已經(jīng)被退訂的,并且使用了AtomicReference類來保存引用,保證了該類線程安全。
退訂時,Subscription#unsubscribe被調(diào)用的原理可以查看 SubscriptionList 的源碼知曉:
// subscriber#unsubscribe
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
// SubscriptionList
@Override
public void unsubscribe() {
if (!unsubscribed) {
List<Subscription> list;
synchronized (this) {
if (unsubscribed) {
return;
}
unsubscribed = true;
list = subscriptions;
subscriptions = null;
}
// we will only get here once
unsubscribeFromAll(list);
}
}
private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
if (subscriptions == null) {
return;
}
List<Throwable> es = null;
for (Subscription s : subscriptions) {
try {
s.unsubscribe();
} catch (Throwable e) {
if (es == null) {
es = new ArrayList<Throwable>();
}
es.add(e);
}
}
Exceptions.throwIfAny(es);
}
核心思想就是退訂時遍歷subscriptions中的Subcription并調(diào)用 unsubscribe() 。
總結(jié)
可見完成一個基本的觀察、訂閱原理并不復(fù)雜,而在源碼中會有很多性能優(yōu)化,錯誤處理相關(guān)的代碼,在閱讀源碼時需要學(xué)會挑重點、優(yōu)先關(guān)注核心的邏輯代碼。