RxJava2操作符(FlatMap學習)

FlatMap是RxJava2變換操作符中比較重要的一個,本文我們來學習一下它的內(nèi)部變換過程。
使用FlatMap變換方法如下:

private void init() {
    Observer<String> observer = new Observer<String>() {
        //.....代碼省略
    };

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) {
            Log.i(TAG, "subscribe--運行線程:" + Thread.currentThread().getName());
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    }).subscribeOn(Schedulers.io())
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            //FlatMap變換
            .flatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) {
                    //將int類型參數(shù)轉(zhuǎn)換為string類型參數(shù),然后用just操作符將其重新發(fā)射出去
                    return Observable.just(String.valueOf(integer));
                }
            })
            .subscribe(observer);
}

點進這個flatMap方法看下:

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    //調(diào)用兩個參數(shù)的FlatMap
    return flatMap(mapper, false);
}

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
    //調(diào)用三個參數(shù)的FlatMap
    return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    //調(diào)用四個參數(shù)的FlatMap
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    //如果上游的Observable類型是ScalarCallable類型的(比如上游的observable是通過Observable.just創(chuàng)建的等等,這種情況比較少見)
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    //上游類型不是ScalarCallable類型,返回ObservableFlatMap(一般情況下都是返回這個)
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

FlatMap操作符通過一系列調(diào)用,最終生成了一個ObservableFlatMap對象,ObservableFlatMap類的狗造方法接收五個參數(shù),簡要介紹下:

  • ObservableSource<T> source:保存上游的observable對象。
  • Function<? super T, ? extends ObservableSource<? extends U>> mapper:調(diào)用flatMap操作符時傳入的Function接口實現(xiàn)類對象。
  • boolean delayErrors:當訂閱出現(xiàn)異常時,是否立即發(fā)送錯誤(備注:如果delayErrors為true,則第一個出現(xiàn)異常的序列將直接終止整個序列;如果delayErrors為false,則該異常將被推遲,直到整個任務(wù)序列被異常終止)。
  • int maxConcurrency:可以同時訂閱的ObservableSource的最大數(shù)量(由于FlatMap是一對多變換,因此可能需要多個臨時的Observable來輔助變換,最后再將這多個臨時的Observable合并為一個將數(shù)據(jù)發(fā)射出去。這里的最大數(shù)量就是這多個臨時的Observable數(shù)量)。
  • int bufferSize:數(shù)據(jù)緩沖區(qū)的緩存大小(默認為128)。

看下ObservableFlatMap這個類:

public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final boolean delayErrors;
    final int maxConcurrency;
    final int bufferSize;

    public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        //如果上游的Observable是ScalarCallable(ScalarCallable接口繼承了Callable接口)類型的,
        //則委托ObservableScalarXMap執(zhí)行數(shù)據(jù)下發(fā),程序返回
        //(備注:這里幾乎不會執(zhí)行,在調(diào)用FlatMap操作符創(chuàng)建新的Observable對象時就已經(jīng)經(jīng)過該類型的判斷,參見上面的代碼,
        //因此若最終返回ObservableFlatMap對象,上游observable對象必定不是ScalarCallable類型)
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
        //如果上游Observable對象不是ScalarCallable類型,通過MergeObserver來實現(xiàn)具體的數(shù)據(jù)變換以及下發(fā)
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
    //......代碼省略
}

在subscribeActual方法內(nèi)通過MergeObserver來包裝下游的observer,并將其他參數(shù)傳遞進去,輔助變換。在RxJava中這些包裝類的設(shè)計思路基本都是類似的,因此就不做過多描述了,我們看下MergeObserver內(nèi)部的onNext方法:

public void onNext(T t) {
    // safeguard against misbehaving sources
    if (done) {
        return;
    }
    ObservableSource<? extends U> p;
    try {
        //保存調(diào)用mapper.apply(t)方法生成的Observable,接收上游數(shù)據(jù),用于接下來的數(shù)據(jù)變換
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        s.dispose();
        onError(e);
        return;
    }

    if (maxConcurrency != Integer.MAX_VALUE) {
        synchronized (this) {
            if (wip == maxConcurrency) {
                sources.offer(p);
                return;
            }
            wip++;
        }
    }
    //調(diào)用方法
    subscribeInner(p);
}

由于RxJava默認傳入的maxConcurrency是Integer.MAX_VALUE(這也是我們通常使用的方式),因此直接調(diào)用 subscribeInner(p),這個p就是保存我們調(diào)用FlatMap操作符時實現(xiàn)的Function接口生成的Observable(p在初始化是調(diào)用了mapper.apply(t)方法,t為上游Observable發(fā)射的數(shù)據(jù)),用來進行數(shù)據(jù)變換??聪逻@個方法:

void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Callable) {
            //1、如果傳入的ObservableSource是Callable類型的(比如上面的示例代碼生成的ObservableJust對象,但這屬于特例)程序走這里,
            //(ScalarCallable接口繼承了Callable接口),
            //這里tryEmitScalar執(zhí)行的過程和下面的InnerObserver執(zhí)行過程極為類似,這里我們就只介紹InnerObserver的執(zhí)行過程
            tryEmitScalar(((Callable<? extends U>)p));

            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    p = sources.poll();
                    if (p == null) {
                        wip--;
                        break;
                    }
                }
            } else {
                break;
            }
        } else {
            //2、Function對象內(nèi)部返回的observable不是Callable類型的,則為每個observable創(chuàng)建一個InnerObserver,
            //本文中我們只關(guān)心這里,上面的數(shù)據(jù)下發(fā)過程以此類似,因此我們只要分析一個方面就行了
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            //MergeObserver內(nèi)部有一個保存InnerObserver的數(shù)組observers,
            //因此這個addInner方法就是將每次新建的InnerObserver保存到這個數(shù)組中
            if (addInner(inner)) {
                //對每次創(chuàng)建的InnerObserver執(zhí)行訂閱
                p.subscribe(inner);
            }
            break;
        }
    }
}

本文我們只分析使用InnerObserver這種方式,InnerObserver又包裝了當前的MergeObserver,并將新創(chuàng)建的InnerObserver保存到InnerObserver數(shù)組中。看下addInner方法:

boolean addInner(InnerObserver<T, U> inner) {
    //這里通過死循環(huán)來保存新建的InnerObserver,確保保存成功
    for (;;) {
        //從observers獲取InnerObserver數(shù)組,這個observers是一個AtomicReference類型,
        //確保在多線程環(huán)境下只有一個InnerObserver數(shù)組對象
        InnerObserver<?, ?>[] a = observers.get();
        //保存失敗的唯一條件是外部取消訂閱
        if (a == CANCELLED) {
            inner.dispose();
            return false;
        }
        int n = a.length;
        //保存新建的InnerObserver的目標數(shù)組
        InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
        //源數(shù)組到目標數(shù)組的遷移
        System.arraycopy(a, 0, b, 0, n);
        //保存新建的InnerObserver
        b[n] = inner;
        //將目標數(shù)組設(shè)置回observers中,完成保存InnerObserver的數(shù)組的更新
        if (observers.compareAndSet(a, b)) {
            return true;
        }
    }
}

新建的InnerObserver保存成功后,調(diào)用p.subscribe(inner)開始下發(fā)數(shù)據(jù)(這里的p就是Function返回的Observable),依次調(diào)用InnerObserver的onSubscribe,onNext,onComplete/onError方法,我們看下這幾個方法:

//建立訂閱關(guān)系,獲取disposable訂閱狀態(tài)管理對象
public void onSubscribe(Disposable s) {
    if (DisposableHelper.setOnce(this, s)) {
        //如果s是QueueDisposable類型
        if (s instanceof QueueDisposable) {
            @SuppressWarnings("unchecked")
            QueueDisposable<U> qd = (QueueDisposable<U>) s;
            //獲取合并標記
            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
            //這里根據(jù)訂閱類型是同步的還是異步的執(zhí)行相應的操作
            if (m == QueueDisposable.SYNC) {
                fusionMode = m;
                queue = qd;
                done = true;
                //同步情況下,直接發(fā)射數(shù)據(jù)
                parent.drain();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                fusionMode = m;
                queue = qd;
            }
        }
    }
}

接著看onNext方法:

public void onNext(U t) {
    //由于在onSubscribe中條件(s instanceof QueueDisposable)為false,因此fusionMode值還是0,走第一個條件
    if (fusionMode == QueueDisposable.NONE) {
        parent.tryEmit(t, this);
    } else {
        parent.drain();
    }
}

看下tryEmit(t, this)方法:

void tryEmit(U value, InnerObserver<T, U> inner) {
    //1、判斷VALUE==0是否成立,如果成立則將其設(shè)置為1,并執(zhí)行條件內(nèi)的方法
    if (get() == 0 && compareAndSet(0, 1)) {
        //2、調(diào)用下游observer的onNext方法,下發(fā)數(shù)據(jù)
        actual.onNext(value);
        //3、每次下發(fā)數(shù)據(jù)后將VALUE減一,并判斷減一后的VALUE是否為0,如果為0,則表示數(shù)據(jù)都已下發(fā)完畢,方法結(jié)束
        if (decrementAndGet() == 0) {
            return;
        }
    } else {
        SimpleQueue<U> q = inner.queue;
        if (q == null) {
            q = new SpscLinkedArrayQueue<U>(bufferSize);
            inner.queue = q;
        }
        //4、將接收到的上游數(shù)據(jù)緩存到隊列中
        q.offer(value);
        //5、每次將一個數(shù)據(jù)緩存到緩存隊列中后,將VALUE加一,
        //然后判斷VALUE加一之前的值,若不等于0,直接返回,結(jié)束方法;若等于0,執(zhí)行步驟6
        if (getAndIncrement() != 0) {
            return;
        }
    }
    //6、循環(huán)獲取緩存隊列中的數(shù)據(jù)
    drainLoop();
}

MergeObserver繼承了AtomicInteger,主要是保證數(shù)據(jù)的下發(fā)是一個一個進行的。
簡要介紹下tryEmit方法的運行流程:

  • tryEmit方法第一次調(diào)用時,get() == 0成立,調(diào)用compareAndSet(0, 1)將VALUE設(shè)置為1,此時執(zhí)行條件內(nèi)部語句,開始下發(fā)數(shù)據(jù)。數(shù)據(jù)下發(fā)執(zhí)行完后調(diào)用decrementAndGet()==0,這里是先將VALUE減一,VALUE值變?yōu)?,因此VALUE==0成立,方法結(jié)束。
  • 如果在下發(fā)數(shù)據(jù)執(zhí)行期間,上游有新的數(shù)據(jù)傳遞過來,此時get() == 0不成立,進入步驟4,將上游發(fā)送過來的數(shù)據(jù)保存到緩存隊列中。
  • 執(zhí)行完步驟4后,調(diào)用getAndIncrement()將VALUE值加一,并判斷VALUE加一之前的值,若不等于0,結(jié)束方法;若等于0,執(zhí)行步驟6。
  • 步驟6是從隊列中循環(huán)取出數(shù)據(jù)并下發(fā)給下游observer。其執(zhí)行條件是步驟3或步驟5的條件判斷不成立,什么時候不成立呢?
    假設(shè)有兩個線程在執(zhí)行,分別稱為線程A和線程B,假設(shè)線程A先搶到了CPU權(quán)限,步驟1條件成立,此時VALUE值為1,但是在執(zhí)行步驟3之前,線程B搶占了CPU執(zhí)行權(quán)限,線程A處于休眠狀態(tài),由于此時VALUE值為1,get() == 0不成立,此時線程B就會將數(shù)據(jù)緩存到隊列中。
    一種情況是:線程B繼續(xù)執(zhí)行,并在步驟5處將VALUE加一(VALUE值變?yōu)?),由于VALUE原先的值為1,因此方法結(jié)束,線程B讓出CPU權(quán)限。線程A獲取CPU權(quán)限,原先的步驟3繼續(xù)執(zhí)行(VALUE的值變?yōu)?),條件不成立,執(zhí)行步驟6,從隊列中循環(huán)取出數(shù)據(jù)并下發(fā)。
    另一種情況是:線程B在執(zhí)行步驟5之前,失去了CPU的權(quán)限。原線程A搶到了CPU權(quán)限,執(zhí)行步驟3(VALUE值變?yōu)?),條件3判斷成立,線程A中這個方法結(jié)束,線程A讓出CPU權(quán)限。線程B獲取CPU權(quán)限,繼續(xù)執(zhí)行步驟5(VALUE值變?yōu)?),但getAndIncrement()方法獲取的是VALUE原來的值,原來的值為0,因此條件不成立,執(zhí)行步驟6,從隊列中循環(huán)取出數(shù)據(jù)并下發(fā)。

tryEmit方法介紹完了,再來看下drainLoop()這個方法:

void drainLoop() {
    final Observer<? super U> child = this.actual;
    int missed = 1;
    for (;;) {
        //檢查訂閱是否被終止
        if (checkTerminate()) {
            return;
        }
        //獲取MergeObserver內(nèi)的緩存隊列
        SimplePlainQueue<U> svq = queue;
        //通常情況下,MergeObserver內(nèi)的緩存隊列都是空的
        if (svq != null) {
            //如果緩存隊列里面有數(shù)據(jù),開始循環(huán)
            for (;;) {
                U o;
                for (;;) {
                    //再次檢查訂閱是否被終止
                    if (checkTerminate()) {
                        return;
                    }
                    //從緩存隊列中取一個數(shù)據(jù)
                    o = svq.poll();

                    if (o == null) {
                        break;
                    }
                    //調(diào)用下游observer的onNext方法,下發(fā)數(shù)據(jù)
                    child.onNext(o);
                }
                if (o == null) {
                    break;
                }
            }
        }

        boolean d = done;
        svq = queue;
        InnerObserver<?, ?>[] inner = observers.get();
        int n = inner.length;
        //調(diào)用下游observer的onComplete或onError方法
        if (d && (svq == null || svq.isEmpty()) && n == 0) {
            Throwable ex = errors.terminate();
            if (ex != ExceptionHelper.TERMINATED) {
                if (ex == null) {
                    child.onComplete();
                } else {
                    child.onError(ex);
                }
            }
            return;
        }

        //處理MergeObserver內(nèi)的InnerObserver數(shù)組
        boolean innerCompleted = false;
        if (n != 0) {
            long startId = lastId;
            int index = lastIndex;

            if (n <= index || inner[index].id != startId) {
                if (n <= index) {
                    index = 0;
                }
                int j = index;
                for (int i = 0; i < n; i++) {
                    if (inner[j].id == startId) {
                        break;
                    }
                    j++;
                    if (j == n) {
                        j = 0;
                    }
                }
                index = j;
                lastIndex = j;
                lastId = inner[j].id;
            }

            int j = index;
            sourceLoop:
            //循環(huán)處理InnerObserver數(shù)組內(nèi)的每個InnerObserver對象
            for (int i = 0; i < n; i++) {
                if (checkTerminate()) {
                    return;
                }
                @SuppressWarnings("unchecked")
                InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
                //處理InnerObserver內(nèi)的緩存隊列,如果有緩存數(shù)據(jù),則將其發(fā)射出去
                for (;;) {
                    if (checkTerminate()) {
                        return;
                    }
                    SimpleQueue<U> q = is.queue;
                    if (q == null) {
                        break;
                    }
                    U o;
                    for (;;) {
                        try {
                            o = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            is.dispose();
                            errors.addThrowable(ex);
                            if (checkTerminate()) {
                                return;
                            }
                            removeInner(is);
                            innerCompleted = true;
                            i++;
                            continue sourceLoop;
                        }
                        if (o == null) {
                            break;
                        }
                        //發(fā)射InnerObserver內(nèi)緩存隊列緩存的數(shù)據(jù)
                        child.onNext(o);

                        if (checkTerminate()) {
                            return;
                        }
                    }
                    if (o == null) {
                        break;
                    }
                }
                boolean innerDone = is.done;
                SimpleQueue<U> innerQueue = is.queue;
                //如果InnerObserver處理完畢,并且其內(nèi)部緩存隊列的數(shù)據(jù)都已發(fā)射出去
                if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                    //將InnerObserver從數(shù)組中移除
                    removeInner(is);
                    if (checkTerminate()) {
                        return;
                    }
                    //設(shè)置InnerObserver狀態(tài)為complete
                    innerCompleted = true;
                }

                j++;
                if (j == n) {
                    j = 0;
                }
            }
            lastIndex = j;
            lastId = inner[j].id;
        }
        //當前InnerObserver已經(jīng)處理完畢,繼續(xù)循環(huán)處理數(shù)組中下一個InnerObserver
        if (innerCompleted) {
            //傳入的默認maxConcurrency值是Integer.MAX_VALUE,因此if條件內(nèi)的代碼不會執(zhí)行
            if (maxConcurrency != Integer.MAX_VALUE) {
                ObservableSource<? extends U> p;
                synchronized (this) {
                    p = sources.poll();
                    if (p == null) {
                        wip--;
                        continue;
                    }
                }
                subscribeInner(p);
            }
            //結(jié)束當前循環(huán),開始下一循環(huán)處理下一個InnerObserver
            continue;
        }
        //每次數(shù)據(jù)下發(fā)完畢,將VALUE值減一
        missed = addAndGet(-missed);
        //如果VALUE值變?yōu)?,表示緩存隊列的數(shù)據(jù)已全部下發(fā)完畢,退出循環(huán),方法結(jié)束
        if (missed == 0) {
            break;
        }
    }
}

至此整個FlatMap操作符的流程就分析完了,總結(jié)下:

  • 根據(jù)上游Observable對象的類型是不是ScalarCallable類型,F(xiàn)latMap決定返回相應的新的Observable對象,一般情況下返回的都是ObservableFlatMap對象(ObservableScalarXMap對象處理數(shù)據(jù)分發(fā)的方式與ObservableFlatMap類似,這里我們只分析ObservableFlatMap)。
  • 下游訂閱時,觸發(fā)ObservableFlatMap的subscribeActual方法,觸發(fā)上游subscribe --> subscribeActual --> 調(diào)用MergeObserver的onSubscribe建立訂閱關(guān)系,上游調(diào)用onNext下發(fā)數(shù)據(jù) --> 調(diào)用MergeObserver的onNext,數(shù)據(jù)下發(fā)完畢或者出錯調(diào)用MergeObserver的onComplete或onError。
  • 在MergeObserver的onNext方法中,獲取FlatMap操作符接收的Function對象返回的數(shù)據(jù)變換Observable(暫時命名為observableA),并為每個返回的observableA創(chuàng)建一個對應的InnerObserver對象(暫時命名為innerObserverA),然后直行訂閱observableA.subscribe(innerObserverA),最終將上游的數(shù)據(jù)經(jīng)過變換后重新發(fā)射出去。
  • InnerObserver內(nèi)部有一個緩存隊列,用于緩存變換后的數(shù)據(jù),其onNext方法內(nèi)部最終還是調(diào)用的MergeObserver的tryEmit方法,將變換后的數(shù)據(jù)重新發(fā)射到下游observer。MergeObserver實現(xiàn)了AtomicInteger類,采用CAS操作保證了數(shù)據(jù)下發(fā)操作的原子性(即每次只有一個數(shù)據(jù)下發(fā),在當前數(shù)據(jù)下發(fā)過程中,如果上游有新的數(shù)據(jù)到來,則將新的數(shù)據(jù)保存到InnerObserver的緩存隊列中。等當前數(shù)據(jù)下發(fā)完畢后,再從InnerObserver的緩存隊列中取出數(shù)據(jù)并將其下發(fā)給下游observer)。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容