眾所周知Android開發(fā)不能在主線程中進行耗時操作,所以一些操作必須放在子線程中進行,這樣一來就就會涉及到涉及線程的創(chuàng)建及線程間的通信。當然Android系統(tǒng)也提供了AsyncTask,但是在處理嵌套處理方面做的并不優(yōu)雅。rxjava采用事件流的方式來解決了這一問題,當然rxjava的作用及優(yōu)點不止是這個,還有很多的功能在使用起來也是讓人愛不釋手。本文不是對rxjava的用法及功能進行介紹,而是對rxjava的內(nèi)部原理進行分析。
rxjava主要是通過發(fā)布/訂閱模式來實現(xiàn)事件的控制和處理。兩個接口和簡單的一行代碼就能明白rxjava的原理:
//發(fā)布者
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
//訂閱者
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
//訂閱
observableSource.subsrcibe(observer)
簡單用法
Observable.create(ObservableOnSubscribe<String> {
//代碼1
it.onNext("hello world")
}).flatMap(Function<String, ObservableSource<String>> {
val value = it
ObservableSource {
//代碼2
it.onNext("flatmap-->$value")
}
}).subscribe(object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
//代碼3
e("MainActivity", "t--->$t")
}
override fun onError(e: Throwable) {
}
})
上面這段段代碼創(chuàng)建了三個主要的對象:ObservableOnSubscribe(A)、ObservableFlatMap(B)和Observer(C),然后通過subscribe()方法將這個鏈串了起來。首先來看下Observable.create()方法:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
這里創(chuàng)建了一個ObservableCreate對象,ObservableCreate繼承自O(shè)bsevable,Observable實現(xiàn)了ObservableSource接口。然后調(diào)用了flatMap方法,最終會創(chuàng)建ObservableFlatMap對象,這相當于B訂閱了A。當調(diào)用了subsrcibe方法時,相當于C訂閱了B。我們來看下ObservableFlatMap中的subsrcibe()方法,然后又調(diào)用了subscribeActual()這個核心方法:
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
這里的source對象就是他訂閱的A對象,調(diào)用的subscribe(Observer o)方法就是代碼方法,這里的o對象就是上面代碼中的MergeObserver對象本身,緊接著又調(diào)用了MergeObserver.onNext()方法:
@Override
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
//省略部分代碼
}
這里的mapper對象就是代碼2處創(chuàng)建的Function對象,然后返回一個ObservaleSource對象,之后繼續(xù)調(diào)用了下一個訂閱者(也就是對象A)的onNext()方法。至此這個事件流就通過訂閱鏈依次到每一個訂閱者。
我們通過簡單的代碼來快速了解下rxjava的原理:
//發(fā)布者
public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);}
//訂閱者
public interface Observer<T> {
void onNext(T t);
}
//中間訂閱者
public interface Function<V, K> {
K apply(V v);
}
public abstract class FlatMapObservable<T> implements Observer<T>, ObservableSource<T> {
private T mT;
public <K> ObservableSource<K> flatMap(Function<T, ? extends ObservableSource<K>> function) {
return function.apply(mT);
}
@Override
public void onNext(T t) {
mT = t;
}
}
//dome
public class Test {
public static void main(String[] args) {
new FlatMapObservable<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
observer.onNext("hello world");
}
}.flatMap(new Function<String, ObservableSource<Boolean>>() {
@Override
public ObservableSource<Boolean> apply(String s) {
return new ObservableSource<Boolean>() {
@Override
public void subscribe(Observer<? super Boolean> observer) {
observer.onNext(false);
}
};
}
}).subscribe(new Observer<Boolean>() {
@Override
public void onNext(Boolean aBoolean) {
System.out.println(aBoolean);
}
});
}
}