RxJava是一個非常好用的框架,它的作用是將一個復(fù)雜的業(yè)務(wù)邏輯,分拆成一個個小的功能模塊,然后將這些功能串起來,達到思路清晰的目的。
關(guān)于如何詳細使用rxjava,這里就不再介紹了,可以點擊 這里
進行查看,rxjava核心代碼很簡潔巧妙,本文主要介紹核心代碼實現(xiàn)。
我們先看一個例子。
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "call");
subscriber.onNext("hello");
subscriber.onNext("beauty");
subscriber.onCompleted();
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext s: " + s);
}
});
05-31 14:36:06.469 29241-29241/? E/rx2: call
05-31 14:36:06.470 29241-29241/? E/rx2: onNext s: hello
05-31 14:36:06.470 29241-29241/? E/rx2: onNext s: beauty
05-31 14:36:06.470 29241-29241/? E/rx2: onCompleted
這個例子創(chuàng)建了一個被觀察者和一個訂閱者,被觀察者產(chǎn)生了兩個字符串,然后傳遞給了訂閱者。
我們看一下源碼
public class Observable<T> {
//創(chuàng)建被觀察者,需要傳入訂閱事件f,這個是被觀察者和訂閱者之間的紐帶,我們就稱呼它為訂閱紐帶吧,避免混淆
public final static <T> Observable<T> create(OnSubscribe<T> f) {
//hook里面什么都沒有做,只是將f返回
return new Observable<T>(hook.onCreate(f));
}
//保存訂閱紐帶
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
//訂閱方法
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//通知訂閱者已經(jīng)開始
subscriber.onStart();
try {
//onSubscribe是訂閱紐帶,這里直接調(diào)用訂閱紐帶的call方法,并將訂閱者傳了過去,然后執(zhí)行subscriber.onNext("hello");方法,那么訂閱者也就收到了該事件
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
return Subscriptions.unsubscribed();
}
}
}
這個例子,從代碼實現(xiàn)層來看就是訂閱者訂閱被觀察者后,觸發(fā)訂閱紐帶的call方法,call方法產(chǎn)生數(shù)據(jù),再傳給訂閱者。
現(xiàn)在將例子改一下,要將產(chǎn)生的字符串后面加上“*”號,我們可以通過map方法
//為了方便后面說明,我們稱create返回的被觀察者為原始被觀察者,這個OnSubscribe我們稱為原始訂閱紐帶
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "call");
subscriber.onNext("hello");
subscriber.onNext("beauty");
subscriber.onCompleted();
}
})
//map方法里面有map被觀察者,map訂閱紐帶,map訂閱者,見后面的代碼
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e(TAG, "map call s: " + s);
return s + "*";
}
})
//這個Subscriber我們稱為最終訂閱者
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext s: " + s);
}
});
打印結(jié)果如下
05-31 14:39:03.707 29692-29692/? E/rx2: call
05-31 14:39:03.707 29692-29692/? E/rx2: map call s: hello
05-31 14:39:03.708 29692-29692/? E/rx2: onNext s: hello*
05-31 14:39:03.708 29692-29692/? E/rx2: map call s: beauty
05-31 14:39:03.708 29692-29692/? E/rx2: onNext s: beauty*
05-31 14:39:03.708 29692-29692/? E/rx2: onCompleted
我們可以看到,在原始訂閱紐帶里面調(diào)用來subscriber.onNext("hello");后,會將字符串傳遞到map的call方法里面,然后再調(diào)用最終訂閱者的onNext方法
這個例子將一件事情分拆層兩個步驟去實現(xiàn),產(chǎn)生字符串,加工字符串。我們看一下具體是怎么做到的
這個例子跟上一個的差別是在中間加了map方法,我們看一下map方法的實現(xiàn)
public class Observable<T> {
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
//這里看一下OperatorMap類,func是要加工數(shù)據(jù)的方法
return lift(new OperatorMap<T, R>(func));
}
//該方法返回的是被觀察者,相當(dāng)于新建了一個觀察者,為了方便說明,我們稱該返回的被觀察者是map被觀察者,訂閱紐帶為map訂閱紐帶
//注意,下面的call方法是理解整個框架的關(guān)鍵,有點繞,冷靜下來分析一下
//我們現(xiàn)在就來梳理這個例子的調(diào)用流程,在這之前,我們定義了幾個名稱:原始被觀察者、原始訂閱紐帶、map被觀察者、map訂閱紐帶、map訂閱者,最終訂閱者,先熟悉一下這幾個詞
//1.在最后調(diào)用subscribe(new Subscriber<String>())的時候,也是就是執(zhí)行map被觀察者的subscribe方法時,會調(diào)用map訂閱紐帶的call方法,也就是下面lift里面call方法
//2.調(diào)用hook.onLift(operator).call(o)得到的是map訂閱者,o是最終訂閱者,map訂閱者是最終訂閱者的代理,里面的onNext方法多了一層在轉(zhuǎn)換
//3.onSubscribe是這個類的訂閱紐帶,也就是初始訂閱紐帶。調(diào)用onSubscribe.call(st)方法執(zhí)行初始訂閱者的call方法,也就是上面代碼塊的subscriber.onNext("hello")這里。
//這里的subscriber訂閱者是map訂閱者,調(diào)用了它的call方法,在OperatorMap這個類里面,執(zhí)行的是里面的訂閱者的onNext(T t)方法,調(diào)用o.onNext(transformer.call(t));。
//t就是傳進來的"hello",通過transformer進行轉(zhuǎn)換,這里的o是最終訂閱者,t是原始訂閱紐帶傳過來的"hello"字符串,這樣就完成了轉(zhuǎn)換,所以最終訂閱者o就收到來轉(zhuǎn)化后的數(shù)據(jù)
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
onSubscribe.call(st);
}
});
}
}
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;
//加工數(shù)據(jù)的類,負責(zé)轉(zhuǎn)換數(shù)據(jù)
public OperatorMap(Func1<? super T, ? extends R> transformer{
this.transformer = transformer;
}
//為了方便說明,我們稱該方法返回的訂閱者是map訂閱者,從代碼來看,生成的這個訂閱者是最終訂閱者的代理,唯一不同的是多了一步transformer.call(t)的轉(zhuǎn)化
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o){
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
//這里的o是最終訂閱者,t是原始訂閱紐帶傳過來的
o.onNext(transformer.call(t));
}
};
}
}
這里有一點繞,我畫了一張圖來梳理這個關(guān)系
上面的說明再結(jié)合這張圖,關(guān)系就能梳理清楚了
總結(jié)一下,調(diào)用subscribe訂閱時,觸發(fā)map被觀察者的call方法,調(diào)用onSubscribe.call(st),st又是最終訂閱者的代理(該代理獲取到數(shù)據(jù)后加工數(shù)據(jù)),onSubscribe是原始訂閱紐帶,觸發(fā)產(chǎn)生數(shù)據(jù)的onNext方法,然后將數(shù)據(jù)傳遞給map訂閱者,加工完數(shù)據(jù)后,再傳遞給最終訂閱者的onNext方法
每次添加一個map方法時,在原先的連接關(guān)系中間插入了一層關(guān)系,對上一層的被觀察者來說,插入了一個訂閱者;對下一層對訂閱者而言,插入了一個被觀察者,通過這種方式,又將原先對鏈條連接上了。
對于map方法是這個邏輯,對應(yīng)其它的方法,也是類似的原理