FlatMap是RxJava2變換操作符中比較重要的一個,本文我們來學習一下它的內(nèi)部變換過程。
使用FlatMap變換方法如下:
private void init() {
Observer<String> observer = new Observer<String>() {
//.....代碼省略
};
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
Log.i(TAG, "subscribe--運行線程:" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
//FlatMap變換
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) {
//將int類型參數(shù)轉(zhuǎn)換為string類型參數(shù),然后用just操作符將其重新發(fā)射出去
return Observable.just(String.valueOf(integer));
}
})
.subscribe(observer);
}
點進這個flatMap方法看下:
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
//調(diào)用兩個參數(shù)的FlatMap
return flatMap(mapper, false);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
//調(diào)用三個參數(shù)的FlatMap
return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
//調(diào)用四個參數(shù)的FlatMap
return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//如果上游的Observable類型是ScalarCallable類型的(比如上游的observable是通過Observable.just創(chuàng)建的等等,這種情況比較少見)
if (this instanceof ScalarCallable) {
@SuppressWarnings("unchecked")
T v = ((ScalarCallable<T>)this).call();
if (v == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(v, mapper);
}
//上游類型不是ScalarCallable類型,返回ObservableFlatMap(一般情況下都是返回這個)
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
FlatMap操作符通過一系列調(diào)用,最終生成了一個ObservableFlatMap對象,ObservableFlatMap類的狗造方法接收五個參數(shù),簡要介紹下:
- ObservableSource<T> source:保存上游的observable對象。
- Function<? super T, ? extends ObservableSource<? extends U>> mapper:調(diào)用flatMap操作符時傳入的Function接口實現(xiàn)類對象。
- boolean delayErrors:當訂閱出現(xiàn)異常時,是否立即發(fā)送錯誤(備注:如果delayErrors為true,則第一個出現(xiàn)異常的序列將直接終止整個序列;如果delayErrors為false,則該異常將被推遲,直到整個任務(wù)序列被異常終止)。
- int maxConcurrency:可以同時訂閱的ObservableSource的最大數(shù)量(由于FlatMap是一對多變換,因此可能需要多個臨時的Observable來輔助變換,最后再將這多個臨時的Observable合并為一個將數(shù)據(jù)發(fā)射出去。這里的最大數(shù)量就是這多個臨時的Observable數(shù)量)。
- int bufferSize:數(shù)據(jù)緩沖區(qū)的緩存大小(默認為128)。
看下ObservableFlatMap這個類:
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//如果上游的Observable是ScalarCallable(ScalarCallable接口繼承了Callable接口)類型的,
//則委托ObservableScalarXMap執(zhí)行數(shù)據(jù)下發(fā),程序返回
//(備注:這里幾乎不會執(zhí)行,在調(diào)用FlatMap操作符創(chuàng)建新的Observable對象時就已經(jīng)經(jīng)過該類型的判斷,參見上面的代碼,
//因此若最終返回ObservableFlatMap對象,上游observable對象必定不是ScalarCallable類型)
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
//如果上游Observable對象不是ScalarCallable類型,通過MergeObserver來實現(xiàn)具體的數(shù)據(jù)變換以及下發(fā)
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
//......代碼省略
}
在subscribeActual方法內(nèi)通過MergeObserver來包裝下游的observer,并將其他參數(shù)傳遞進去,輔助變換。在RxJava中這些包裝類的設(shè)計思路基本都是類似的,因此就不做過多描述了,我們看下MergeObserver內(nèi)部的onNext方法:
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
//保存調(diào)用mapper.apply(t)方法生成的Observable,接收上游數(shù)據(jù),用于接下來的數(shù)據(jù)變換
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
//調(diào)用方法
subscribeInner(p);
}
由于RxJava默認傳入的maxConcurrency是Integer.MAX_VALUE(這也是我們通常使用的方式),因此直接調(diào)用 subscribeInner(p),這個p就是保存我們調(diào)用FlatMap操作符時實現(xiàn)的Function接口生成的Observable(p在初始化是調(diào)用了mapper.apply(t)方法,t為上游Observable發(fā)射的數(shù)據(jù)),用來進行數(shù)據(jù)變換??聪逻@個方法:
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
//1、如果傳入的ObservableSource是Callable類型的(比如上面的示例代碼生成的ObservableJust對象,但這屬于特例)程序走這里,
//(ScalarCallable接口繼承了Callable接口),
//這里tryEmitScalar執(zhí)行的過程和下面的InnerObserver執(zhí)行過程極為類似,這里我們就只介紹InnerObserver的執(zhí)行過程
tryEmitScalar(((Callable<? extends U>)p));
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
break;
}
}
} else {
break;
}
} else {
//2、Function對象內(nèi)部返回的observable不是Callable類型的,則為每個observable創(chuàng)建一個InnerObserver,
//本文中我們只關(guān)心這里,上面的數(shù)據(jù)下發(fā)過程以此類似,因此我們只要分析一個方面就行了
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
//MergeObserver內(nèi)部有一個保存InnerObserver的數(shù)組observers,
//因此這個addInner方法就是將每次新建的InnerObserver保存到這個數(shù)組中
if (addInner(inner)) {
//對每次創(chuàng)建的InnerObserver執(zhí)行訂閱
p.subscribe(inner);
}
break;
}
}
}
本文我們只分析使用InnerObserver這種方式,InnerObserver又包裝了當前的MergeObserver,并將新創(chuàng)建的InnerObserver保存到InnerObserver數(shù)組中。看下addInner方法:
boolean addInner(InnerObserver<T, U> inner) {
//這里通過死循環(huán)來保存新建的InnerObserver,確保保存成功
for (;;) {
//從observers獲取InnerObserver數(shù)組,這個observers是一個AtomicReference類型,
//確保在多線程環(huán)境下只有一個InnerObserver數(shù)組對象
InnerObserver<?, ?>[] a = observers.get();
//保存失敗的唯一條件是外部取消訂閱
if (a == CANCELLED) {
inner.dispose();
return false;
}
int n = a.length;
//保存新建的InnerObserver的目標數(shù)組
InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
//源數(shù)組到目標數(shù)組的遷移
System.arraycopy(a, 0, b, 0, n);
//保存新建的InnerObserver
b[n] = inner;
//將目標數(shù)組設(shè)置回observers中,完成保存InnerObserver的數(shù)組的更新
if (observers.compareAndSet(a, b)) {
return true;
}
}
}
新建的InnerObserver保存成功后,調(diào)用p.subscribe(inner)開始下發(fā)數(shù)據(jù)(這里的p就是Function返回的Observable),依次調(diào)用InnerObserver的onSubscribe,onNext,onComplete/onError方法,我們看下這幾個方法:
//建立訂閱關(guān)系,獲取disposable訂閱狀態(tài)管理對象
public void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this, s)) {
//如果s是QueueDisposable類型
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<U> qd = (QueueDisposable<U>) s;
//獲取合并標記
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//這里根據(jù)訂閱類型是同步的還是異步的執(zhí)行相應的操作
if (m == QueueDisposable.SYNC) {
fusionMode = m;
queue = qd;
done = true;
//同步情況下,直接發(fā)射數(shù)據(jù)
parent.drain();
return;
}
if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;
}
}
}
}
接著看onNext方法:
public void onNext(U t) {
//由于在onSubscribe中條件(s instanceof QueueDisposable)為false,因此fusionMode值還是0,走第一個條件
if (fusionMode == QueueDisposable.NONE) {
parent.tryEmit(t, this);
} else {
parent.drain();
}
}
看下tryEmit(t, this)方法:
void tryEmit(U value, InnerObserver<T, U> inner) {
//1、判斷VALUE==0是否成立,如果成立則將其設(shè)置為1,并執(zhí)行條件內(nèi)的方法
if (get() == 0 && compareAndSet(0, 1)) {
//2、調(diào)用下游observer的onNext方法,下發(fā)數(shù)據(jù)
actual.onNext(value);
//3、每次下發(fā)數(shù)據(jù)后將VALUE減一,并判斷減一后的VALUE是否為0,如果為0,則表示數(shù)據(jù)都已下發(fā)完畢,方法結(jié)束
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
inner.queue = q;
}
//4、將接收到的上游數(shù)據(jù)緩存到隊列中
q.offer(value);
//5、每次將一個數(shù)據(jù)緩存到緩存隊列中后,將VALUE加一,
//然后判斷VALUE加一之前的值,若不等于0,直接返回,結(jié)束方法;若等于0,執(zhí)行步驟6
if (getAndIncrement() != 0) {
return;
}
}
//6、循環(huán)獲取緩存隊列中的數(shù)據(jù)
drainLoop();
}
MergeObserver繼承了AtomicInteger,主要是保證數(shù)據(jù)的下發(fā)是一個一個進行的。
簡要介紹下tryEmit方法的運行流程:
- tryEmit方法第一次調(diào)用時,get() == 0成立,調(diào)用compareAndSet(0, 1)將VALUE設(shè)置為1,此時執(zhí)行條件內(nèi)部語句,開始下發(fā)數(shù)據(jù)。數(shù)據(jù)下發(fā)執(zhí)行完后調(diào)用decrementAndGet()==0,這里是先將VALUE減一,VALUE值變?yōu)?,因此VALUE==0成立,方法結(jié)束。
- 如果在下發(fā)數(shù)據(jù)執(zhí)行期間,上游有新的數(shù)據(jù)傳遞過來,此時get() == 0不成立,進入步驟4,將上游發(fā)送過來的數(shù)據(jù)保存到緩存隊列中。
- 執(zhí)行完步驟4后,調(diào)用getAndIncrement()將VALUE值加一,并判斷VALUE加一之前的值,若不等于0,結(jié)束方法;若等于0,執(zhí)行步驟6。
- 步驟6是從隊列中循環(huán)取出數(shù)據(jù)并下發(fā)給下游observer。其執(zhí)行條件是步驟3或步驟5的條件判斷不成立,什么時候不成立呢?
假設(shè)有兩個線程在執(zhí)行,分別稱為線程A和線程B,假設(shè)線程A先搶到了CPU權(quán)限,步驟1條件成立,此時VALUE值為1,但是在執(zhí)行步驟3之前,線程B搶占了CPU執(zhí)行權(quán)限,線程A處于休眠狀態(tài),由于此時VALUE值為1,get() == 0不成立,此時線程B就會將數(shù)據(jù)緩存到隊列中。
一種情況是:線程B繼續(xù)執(zhí)行,并在步驟5處將VALUE加一(VALUE值變?yōu)?),由于VALUE原先的值為1,因此方法結(jié)束,線程B讓出CPU權(quán)限。線程A獲取CPU權(quán)限,原先的步驟3繼續(xù)執(zhí)行(VALUE的值變?yōu)?),條件不成立,執(zhí)行步驟6,從隊列中循環(huán)取出數(shù)據(jù)并下發(fā)。
另一種情況是:線程B在執(zhí)行步驟5之前,失去了CPU的權(quán)限。原線程A搶到了CPU權(quán)限,執(zhí)行步驟3(VALUE值變?yōu)?),條件3判斷成立,線程A中這個方法結(jié)束,線程A讓出CPU權(quán)限。線程B獲取CPU權(quán)限,繼續(xù)執(zhí)行步驟5(VALUE值變?yōu)?),但getAndIncrement()方法獲取的是VALUE原來的值,原來的值為0,因此條件不成立,執(zhí)行步驟6,從隊列中循環(huán)取出數(shù)據(jù)并下發(fā)。
tryEmit方法介紹完了,再來看下drainLoop()這個方法:
void drainLoop() {
final Observer<? super U> child = this.actual;
int missed = 1;
for (;;) {
//檢查訂閱是否被終止
if (checkTerminate()) {
return;
}
//獲取MergeObserver內(nèi)的緩存隊列
SimplePlainQueue<U> svq = queue;
//通常情況下,MergeObserver內(nèi)的緩存隊列都是空的
if (svq != null) {
//如果緩存隊列里面有數(shù)據(jù),開始循環(huán)
for (;;) {
U o;
for (;;) {
//再次檢查訂閱是否被終止
if (checkTerminate()) {
return;
}
//從緩存隊列中取一個數(shù)據(jù)
o = svq.poll();
if (o == null) {
break;
}
//調(diào)用下游observer的onNext方法,下發(fā)數(shù)據(jù)
child.onNext(o);
}
if (o == null) {
break;
}
}
}
boolean d = done;
svq = queue;
InnerObserver<?, ?>[] inner = observers.get();
int n = inner.length;
//調(diào)用下游observer的onComplete或onError方法
if (d && (svq == null || svq.isEmpty()) && n == 0) {
Throwable ex = errors.terminate();
if (ex != ExceptionHelper.TERMINATED) {
if (ex == null) {
child.onComplete();
} else {
child.onError(ex);
}
}
return;
}
//處理MergeObserver內(nèi)的InnerObserver數(shù)組
boolean innerCompleted = false;
if (n != 0) {
long startId = lastId;
int index = lastIndex;
if (n <= index || inner[index].id != startId) {
if (n <= index) {
index = 0;
}
int j = index;
for (int i = 0; i < n; i++) {
if (inner[j].id == startId) {
break;
}
j++;
if (j == n) {
j = 0;
}
}
index = j;
lastIndex = j;
lastId = inner[j].id;
}
int j = index;
sourceLoop:
//循環(huán)處理InnerObserver數(shù)組內(nèi)的每個InnerObserver對象
for (int i = 0; i < n; i++) {
if (checkTerminate()) {
return;
}
@SuppressWarnings("unchecked")
InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
//處理InnerObserver內(nèi)的緩存隊列,如果有緩存數(shù)據(jù),則將其發(fā)射出去
for (;;) {
if (checkTerminate()) {
return;
}
SimpleQueue<U> q = is.queue;
if (q == null) {
break;
}
U o;
for (;;) {
try {
o = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
is.dispose();
errors.addThrowable(ex);
if (checkTerminate()) {
return;
}
removeInner(is);
innerCompleted = true;
i++;
continue sourceLoop;
}
if (o == null) {
break;
}
//發(fā)射InnerObserver內(nèi)緩存隊列緩存的數(shù)據(jù)
child.onNext(o);
if (checkTerminate()) {
return;
}
}
if (o == null) {
break;
}
}
boolean innerDone = is.done;
SimpleQueue<U> innerQueue = is.queue;
//如果InnerObserver處理完畢,并且其內(nèi)部緩存隊列的數(shù)據(jù)都已發(fā)射出去
if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
//將InnerObserver從數(shù)組中移除
removeInner(is);
if (checkTerminate()) {
return;
}
//設(shè)置InnerObserver狀態(tài)為complete
innerCompleted = true;
}
j++;
if (j == n) {
j = 0;
}
}
lastIndex = j;
lastId = inner[j].id;
}
//當前InnerObserver已經(jīng)處理完畢,繼續(xù)循環(huán)處理數(shù)組中下一個InnerObserver
if (innerCompleted) {
//傳入的默認maxConcurrency值是Integer.MAX_VALUE,因此if條件內(nèi)的代碼不會執(zhí)行
if (maxConcurrency != Integer.MAX_VALUE) {
ObservableSource<? extends U> p;
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
continue;
}
}
subscribeInner(p);
}
//結(jié)束當前循環(huán),開始下一循環(huán)處理下一個InnerObserver
continue;
}
//每次數(shù)據(jù)下發(fā)完畢,將VALUE值減一
missed = addAndGet(-missed);
//如果VALUE值變?yōu)?,表示緩存隊列的數(shù)據(jù)已全部下發(fā)完畢,退出循環(huán),方法結(jié)束
if (missed == 0) {
break;
}
}
}
至此整個FlatMap操作符的流程就分析完了,總結(jié)下:
- 根據(jù)上游Observable對象的類型是不是ScalarCallable類型,F(xiàn)latMap決定返回相應的新的Observable對象,一般情況下返回的都是ObservableFlatMap對象(ObservableScalarXMap對象處理數(shù)據(jù)分發(fā)的方式與ObservableFlatMap類似,這里我們只分析ObservableFlatMap)。
- 下游訂閱時,觸發(fā)ObservableFlatMap的subscribeActual方法,觸發(fā)上游subscribe
-->subscribeActual-->調(diào)用MergeObserver的onSubscribe建立訂閱關(guān)系,上游調(diào)用onNext下發(fā)數(shù)據(jù)-->調(diào)用MergeObserver的onNext,數(shù)據(jù)下發(fā)完畢或者出錯調(diào)用MergeObserver的onComplete或onError。 - 在MergeObserver的onNext方法中,獲取FlatMap操作符接收的Function對象返回的數(shù)據(jù)變換Observable(暫時命名為observableA),并為每個返回的observableA創(chuàng)建一個對應的InnerObserver對象(暫時命名為innerObserverA),然后直行訂閱observableA.subscribe(innerObserverA),最終將上游的數(shù)據(jù)經(jīng)過變換后重新發(fā)射出去。
- InnerObserver內(nèi)部有一個緩存隊列,用于緩存變換后的數(shù)據(jù),其onNext方法內(nèi)部最終還是調(diào)用的MergeObserver的tryEmit方法,將變換后的數(shù)據(jù)重新發(fā)射到下游observer。MergeObserver實現(xiàn)了AtomicInteger類,采用CAS操作保證了數(shù)據(jù)下發(fā)操作的原子性(即每次只有一個數(shù)據(jù)下發(fā),在當前數(shù)據(jù)下發(fā)過程中,如果上游有新的數(shù)據(jù)到來,則將新的數(shù)據(jù)保存到InnerObserver的緩存隊列中。等當前數(shù)據(jù)下發(fā)完畢后,再從InnerObserver的緩存隊列中取出數(shù)據(jù)并將其下發(fā)給下游observer)。