在上一篇響應式編程開源庫 RxJava2——Stream API中主要介紹了Java 8的Stream API,理解了什么是流,以及為什么要用流。它的實現(xiàn)基本上運用了前面part1、part2中學習過的重要概念。我們準備了這么久,下面將真正的進入RxJava的學習。
在前面已經(jīng)介紹過RxJava的官方定義,在Java VM上使用可觀察序列(即ReactiveX中提到的可觀測流)編寫異步和基于事件的程序的庫。前面我們也對RxJava的基本使用進行了簡單解析。RxJava可以使用Observable把某個對象轉(zhuǎn)變?yōu)橐粋€可觀測的序列(可觀測流),對其進行相應操作后作出相應的響應。
下面就從操作符入手,來學習使用RxJava。
操作符
Observable類里提供了很多操作符(intermediate operators)我們從里面不難發(fā)現(xiàn)有的操作符在前面介紹過的Stream API中也是有的。所以對于編程來說,用萬變不離其宗,殊途同歸來說的話一點不夸張。
1.create()
在前面part1我們已經(jīng)初步學習過該操作符。它可以把某對象轉(zhuǎn)化為可觀測的序列,簡單來說就是把某個對象轉(zhuǎn)變?yōu)楸挥^察者,并具有流的特性。
2.just()單個參數(shù)
源碼中共有十個just重載方法。他們能接收不同數(shù)量的參數(shù)。目前最大支持10個參數(shù)。

可以從下面這種圖看出他的含義,在前面已經(jīng)說明了這種圖所代表的意義。它能將多個參數(shù)轉(zhuǎn)變到同一個可觀測源中并且一次性發(fā)送。

@SuppressLint("CheckResult")
public static void main(String[] args){
Observable.just("s",2,true)
.subscribe(s->System.out.println(s));
}
上段代碼運行結(jié)果如下。

還是按照慣例從源碼再一次進行分析。先看看
just(T item)只有一個參數(shù)的方法。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
其中onAssembly方法返回的就是ObservableJust對象。那么just方法實際上返回的對象就是ObservableJust對象。從源碼中看到它是Observable的子類,public final class ObservableJust<T> extends Observable<T>。
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
下面我們看一看在ObservableJust中到底進行了什么操作。
/**
* Represents a constant scalar value.
* @param <T> the value type
*/
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ObservableScalarXMap.ScalarDisposable<T> sd = new ObservableScalarXMap.ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
構(gòu)造方法中對傳入的數(shù)據(jù)進行了從新賦值,這就使得我們控制了對象的不可變性,我們進行的 Intermediate中間操作只是對 ObservableJust類中的變量value進行操作,并不會影響傳入的參數(shù)值。從下圖 可以看出,ObservableJust中subscribeActual(Observer<? super T> s)方法是基類 Observable中的抽象方法實現(xiàn)。Observable在重寫了ObservableSource接口中的方法subscribe(Observer<? super T> observer)時調(diào)用subscribeActual(observer);在ObservableJust中我們具體實現(xiàn)了subscribeActual(Observer<? super T> s)方法。s.onSubscribe(sd);給了觀察者一個中斷器 ScalarDisposable sd.run();方法將調(diào)用Observer接口中的onNext方法回調(diào)數(shù)據(jù)。
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}

我們再來梳理一次Just()操作符的整個代碼內(nèi)部調(diào)用過程,Just(T item)會將數(shù)據(jù)賦值給ObservableJust類的value變量,嚴格控制了源數(shù)據(jù)的不可變性,之后調(diào)用Observable中的 subscribe(Observer<? super T> observer)方法將被觀察者和觀察者建立訂閱關系(RxJavaPlugins.onSubscribe(this, observer) ),之后通過ObservableJust中實現(xiàn)父類的抽象方法,void subscribeActual(Observer<? super T> observer)來一次性將數(shù)據(jù)發(fā)送。
-
Just()多個參數(shù)
當Just()方法有多個參數(shù)時,其內(nèi)部和單個參數(shù)是有一定區(qū)別的。從以下源碼就可以看出,當Just有多個參數(shù)時,它實際上是利用了fromArray操作符。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item1, T item2) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");
return fromArray(item1, item2);
}
那我們就來研究下fromArray操作符內(nèi)部具體是怎么實現(xiàn)的。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
可以看出,當只有一個參數(shù)時,實際上還是運用的Just(T item)操作符。當多個參數(shù)時會將數(shù)據(jù)丟給 ObservableFromArray類。
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
可以看到任然是重新賦值,保證源數(shù)據(jù)的不可變性。之后通過d.run();方法發(fā)送數(shù)據(jù)。
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
可以看到這里使用了for循環(huán)遍歷,將元素一個個發(fā)送。通過上面的分析不難發(fā)現(xiàn)之前提到過的,當observable被訂閱后才會開始發(fā)送數(shù)據(jù)。因為只有subscribe后才會執(zhí)行Observable的各個子類中的方法subscribeActual(Observer<? super T> s)從而執(zhí)行各子類的run方法進行數(shù)據(jù)的發(fā)送。并且可以發(fā)現(xiàn),和create方式并不相同的是這里并不需要ObservableEmitter數(shù)據(jù)發(fā)射器的協(xié)助。這里的發(fā)送數(shù)據(jù)操作實際上就是回調(diào)。
3.以from開頭的操作符

-
fromArray(T... items)
可以將一組數(shù)據(jù)轉(zhuǎn)為可觀測流,并且逐個發(fā)送參數(shù)中的每個元素。在上面的Just學習中,我們已經(jīng)大概了解了fromArray這個操作符。
fromArray與just的主要區(qū)別主要取決于他們的參數(shù)個數(shù)。
- 單個參數(shù) 相當于都是Just,將參數(shù)的映射value一次發(fā)送。
- 多個參數(shù) 相當于fromArray 將參數(shù)的映射value遍歷后發(fā)送。
-
fromIterable(Iterable<? extends T> source)
可以看到它只接收繼承Iterable接口的數(shù)據(jù),而在Java中集合類基本都繼承了該接口。所以這里只接收集合參數(shù)??梢钥吹酱笾逻^程和前面學習的兩個操作符差不多,只是內(nèi)部遍歷用上了集合特有的迭代器遍歷。它會遍歷集合,并將每個元素發(fā)送。
void run() {
boolean hasNext;
do {
if (isDisposed()) {
return;
}
T v;
try {
v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
return;
}
actual.onNext(v);
if (isDisposed()) {
return;
}
try {
hasNext = it.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
return;
}
} while (hasNext);
if (!isDisposed()) {
actual.onComplete();
}
}
-
fromCallable(Callable<? extends T> supplier)
它將會發(fā)送Callable返回的參數(shù)。
-
fromFuture(Future<? extends T> future)
它將會發(fā)送Future中get函數(shù)的返回值。其中timeout參數(shù)是調(diào)用get()方法之前最多等待時間。
-
fromPublisher(Publisher<? extends T> publisher)
通過官方介紹,可以看出該操作符并不推薦,而是希望盡可能用create代替。所以這里不過多學習。
If possible, use
create(ObservableOnSubscribe)to create a source-likeObservableinstead.
4.range()
它會發(fā)送指定區(qū)間的int數(shù),每次發(fā)送比前一次數(shù)據(jù)大1,相當于for循環(huán)。

5.interval()
從源碼可以看出該操作符的作用是每隔一段時間發(fā)送一個從0開始的Long型數(shù)據(jù),可以說相當于一個定時器。在java中相當于Timer和TimerTask的應用。


這里每次發(fā)送的是long型的count,發(fā)送一次后count+1。
6.timer()
延遲指定時間后發(fā)送數(shù)據(jù)0。其實也就是延時操作,相當于handler的postDelay。

7.empty()
不會發(fā)送任何數(shù)據(jù),并且立即調(diào)用onComplete終止操作。這在前面很多操作符里都有調(diào)用。比如當數(shù)組為空就會首先調(diào)用empty終止操作。
8.map()
首先看看map()在ReactiveX中的解釋。
transform the items emitted by an Observable by applying a function to each item
應用一個函數(shù)對每個發(fā)出的元素進行轉(zhuǎn)換。
所以對每個發(fā)出的元素應用指定函數(shù),并發(fā)送函數(shù)的返回值。從下圖也可以看出,它可以進行數(shù)據(jù)的轉(zhuǎn)換,比如將int轉(zhuǎn)為String。map()接收一個函數(shù)式接口Function作為參數(shù),而在前面介紹過它能接收一個參數(shù),返回另一個參數(shù)。所以決定它有轉(zhuǎn)換功能。

我們在結(jié)合之前的
just()或者說fromArray()來看看map是怎么進行數(shù)據(jù)的轉(zhuǎn)換的。
Observable.just(1,2,3,4).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "map"+integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
log(s);
}
});
}
Observable.just(1,2,3,4)我們已經(jīng)清楚,當被觀察者訂閱后,就會遍歷依次發(fā)送每個元素。發(fā)送出去后被 map接收。那么map是怎么接收的呢?還是從源碼入手。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
它最終返回的是ObservableMap對象,注意看它的參數(shù)。它接收了當前的Observable對象,和Function函數(shù)。我們在來看ObservableMap內(nèi)部。從下面看出ObservableMap的繼承關系。
ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U>
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U>
在ObservableMap中,注意構(gòu)造方法的super(source);在父類AbstractObservableWithUpstream中通過它的構(gòu)造方法將會獲得Observable.just(1,2,3,4)創(chuàng)建的上游可觀測源,當訂閱后會調(diào)用subscribeActual。新的可觀測源的映射source會和MapObserver建立新的訂閱關系。
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
所以這里just發(fā)送的數(shù)據(jù),會被 MapObserver再次訂閱而且只一次,這樣數(shù)據(jù)就完美的流向了map操作符。在MapObserver中實現(xiàn)了基類Observer的onNext方法。這時候just通過遍歷發(fā)送的數(shù)據(jù)就會一次被下面的onNext接收到。當接收到數(shù)據(jù),就會執(zhí)行mapper.apply(t)將數(shù)據(jù)轉(zhuǎn)化,最后發(fā)送轉(zhuǎn)化后的數(shù)據(jù)v。
通過上面的分析就可以印證之前的學習,每一次的中間操作都會有一個新的數(shù)據(jù)源映射,并且其中很多構(gòu)造方法完美的解決了對象的不可變性。當訂閱后,所有的操作都是一次性完成,減少了時間復雜度。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
9.flatMap()
transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
可以將可觀測數(shù)據(jù)源發(fā)送的數(shù)據(jù)轉(zhuǎn)化成多個可觀測源,最后把他們在融合大一個可觀測數(shù)據(jù)源中發(fā)送。但是不保證轉(zhuǎn)化后數(shù)據(jù)的順序。
Observable.just(1,2,3,4).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just("flatmap"+integer);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
log(s);
}
});
結(jié)合之前的just和map的學習,從代碼很容易看出just依次發(fā)送元素,flatMap利用Function接口將每個元素轉(zhuǎn)化為新的數(shù)據(jù)源,然后合并到一個可觀測源中發(fā)送。那么是怎么合并發(fā)送的呢?這里看下源碼。flatMap最終的可觀測源是ObservableFlatMap在subscribeActual方法中進行了數(shù)據(jù)源的映射重新訂閱source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));重新訂閱后在onNext中會將發(fā)送的數(shù)據(jù)轉(zhuǎn)化成新的數(shù)據(jù)源。
@Override
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
//將發(fā)送的數(shù)據(jù)轉(zhuǎn)化為新的可觀測源
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
subscribeInner(p);
}
最后執(zhí)行subscribeInner(p);一般我們的可觀測源不會是Callable類型,所以最終到了else里面,在這里面就可以看出每次發(fā)送來的數(shù)據(jù)轉(zhuǎn)化為新的數(shù)據(jù)源 p后,每次會創(chuàng)建一個InnerObserver(注意它的構(gòu)造方法參數(shù),是將MergeObserver對象傳入的)。通過addInner方法將多個InnerObserver添加到AtomicReference<InnerObserver<?, ?>[]> observers;中。每次轉(zhuǎn)化的數(shù)據(jù)源p都有一個對應InnerObserver。我們這里的p實際就是Observable.just("flatmap"+integer)所產(chǎn)生的可觀測源,對它調(diào)用p.subscribe(inner);就會把當前可觀測源中的數(shù)據(jù)讓對應的InnerObserver的onNext接收。這時候就會通過MergeObserver的tryEmit方法給下游下發(fā)數(shù)據(jù)。
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
boolean empty = false;
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
empty = true;
}
}
if (empty) {
drain();
break;
}
} else {
break;
}
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
//每次轉(zhuǎn)化的數(shù)據(jù)源p都有一個對應InnerObserver
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
boolean addInner(InnerObserver<T, U> inner) {
for (;;) {
InnerObserver<?, ?>[] a = observers.get();
if (a == CANCELLED) {
inner.dispose();
return false;
}
int n = a.length;
InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = inner;
if (observers.compareAndSet(a, b)) {
return true;
}
}
}
最后我們在看InnerObserver的onNext方法和tryEmit方法,最終通過MergeObserver單一可觀測源調(diào)用tryEmit方法,通過最終觀察者的回調(diào)來發(fā)送數(shù)據(jù)。MergeObserver繼承了AtomicInteger,所以這里的tryEmit方法就利用了AtomicInteger的同步機制。所以同時只會有一個value被最終觀察者actual發(fā)送,由于AtomicInteger CAS鎖只能保證操作的原子性,并不保證鎖的獲取順序,是搶占式的,所以最終數(shù)據(jù)的發(fā)射順序并不是固定的。
if (fusionMode == QueueDisposable.NONE) {
parent.tryEmit(t, this);
} else {
parent.drain();
}
void tryEmit(U value, InnerObserver<T, U> inner) {
if (get() == 0 && compareAndSet(0, 1)) {
actual.onNext(value);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
inner.queue = q;
}
q.offer(value);
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
大概的過程就是,通過MergeObserver來將發(fā)送的每個元素轉(zhuǎn)化為可觀測源,每個新的可觀察源p中的數(shù)據(jù)都會通過p.subscribe(inner);下發(fā)到對應的 InnerObserver。然后通過MergeObserver中的方法下發(fā)數(shù)據(jù)到最終訂閱者。


