RxJava源碼分析(五)變換操作符Map

引言

前面我們分析了RxJava的線程調(diào)度,今天我們研究下RxJava的另外一塊強(qiáng)大的功能-事件變換操作符。

map操作符

官方定義:transform the items emitted by an Observable by applying a function to each item

翻譯過(guò)來(lái)就是就是轉(zhuǎn)換發(fā)射數(shù)據(jù)的操作符,說(shuō)白了就是起到事件變換的作用,下面是圖示:


map操作符

map操作符示例:

        /**
         * 變換操作符
         */
        //1.map將事件通過(guò)一個(gè)函數(shù)做變換,一般用作數(shù)據(jù)類(lèi)型轉(zhuǎn)換
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
            //Integer轉(zhuǎn)換成String
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "使用 Map變換操作符 將事件" + integer + "的參數(shù)從 整型" + integer + " 變換成 字符串類(lèi)型" + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
        //2.FlatMap/concatMap 將每個(gè)事件進(jìn)行拆分和轉(zhuǎn)換,再合并成一個(gè)新的事件序列,最后再發(fā)送,前者無(wú)序發(fā)送,后者有序發(fā)送
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                //事件拆分
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("我是事件 " + integer + "拆分后的子事件" + i);
                    // 通過(guò)flatMap中將被觀察者生產(chǎn)的事件序列先進(jìn)行拆分,再將每個(gè)事件轉(zhuǎn)換為一個(gè)新的發(fā)送三個(gè)String事件
                    // 最終合并,再發(fā)送給被觀察者
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

在這個(gè)例子中map操作符將整形數(shù)據(jù)變換成了String數(shù)據(jù),交給Consumer<String>對(duì)象接收。
我們看一下map函數(shù)的源碼:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        //判空略過(guò)
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

返回ObservableMap對(duì)象:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    //調(diào)用層傳入的Function,用于數(shù)據(jù)轉(zhuǎn)換
    final Function<? super T, ? extends U> function;
    //source為上游,支持類(lèi)型為T(mén),U為下游的支持類(lèi)型
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        //上游source訂閱MapObserver,根據(jù)function和傳入的觀察者t構(gòu)造MapObserver對(duì)象
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
       //MapObserver通過(guò)function對(duì)象mapper,將T數(shù)據(jù)轉(zhuǎn)換U數(shù)據(jù)類(lèi)型后,再轉(zhuǎn)換后的數(shù)據(jù)交給真正的接受者actual
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
            //轉(zhuǎn)換后的數(shù)據(jù)
            U v;

            try {
               //調(diào)用mapper.apply(t)執(zhí)行數(shù)據(jù)轉(zhuǎn)換,交給actual
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}
  1. 它繼承自AbstractObservableWithUpstream,前面我們提到過(guò),它封裝了上游的ObservableSource,是支持?jǐn)?shù)據(jù)類(lèi)型轉(zhuǎn)換的Observable。

  2. subscribeActual訂閱方法是調(diào)用上游(也就是用戶(hù)定義的Observable對(duì)象)的subscribe方法,入?yún)镺bserver對(duì)象(下游)的裝飾類(lèi)MapObserver。

  3. ObservableMap持有Function對(duì)象和真正的觀察者對(duì)象,在被訂閱的onXXX方法中,通過(guò)Function將上游發(fā)送的T類(lèi)型數(shù)據(jù)轉(zhuǎn)換成U類(lèi)型數(shù)據(jù),然后將交給觀察者對(duì)象處理。

FlatMap操作符

還是先看看官方定義:

官方定義:transform the items emitted by an Observable by applying a function to each item

大致意思是將每一個(gè)上游發(fā)射的數(shù)據(jù)從一個(gè)Observable轉(zhuǎn)成為多個(gè)Observable,并將所有要發(fā)射的數(shù)據(jù)平鋪為一個(gè)Observable。
下面是FlatMap的圖解:


FlatMap

到這里我們總結(jié)一下:

  1. flatmap 轉(zhuǎn)換是一對(duì)多的(一對(duì)一當(dāng)然也支持),原來(lái)發(fā)射了幾個(gè)數(shù)據(jù),轉(zhuǎn)換之后可以是更多個(gè);
  2. flatMap 轉(zhuǎn)換同樣可以改變發(fā)射的數(shù)據(jù)類(lèi)型;
  3. flatMap 轉(zhuǎn)換后的數(shù)據(jù),還是會(huì)逐個(gè)發(fā)射給我們的Observer來(lái)接收(就像這些數(shù)據(jù)是由一個(gè)Observable發(fā)射的一樣,其實(shí)是多個(gè)Observable發(fā)射然后合并的);
    4.注意:轉(zhuǎn)換后的數(shù)據(jù)發(fā)射順序可能和上游的發(fā)射順序不一致,為什么會(huì)這樣,我們后面看源碼分析。
    樣例:
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                //事件拆分
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("我是事件 " + integer + "拆分后的子事件" + i);
                    // 通過(guò)flatMap中將被觀察者生產(chǎn)的事件序列先進(jìn)行拆分,再將每個(gè)事件轉(zhuǎn)換為一個(gè)新的發(fā)送三個(gè)String事件
                    // 最終合并,再發(fā)送給被觀察者
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

下面我們看下源碼

FlatMap方法

flatMap有多個(gè)重載方法,最終調(diào)用下面的方法:

@SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
       //判空
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //參數(shù)為正校驗(yàn)
        ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        if (this instanceof ScalarCallable) {
            @SuppressWarnings("unchecked")
            T v = ((ScalarCallable<T>)this).call();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        //返回一個(gè)ObservableFlatMap對(duì)象
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }

ObservableFlatMap

還是和之前的套路一樣,看下ObservableFlatMap的訂閱方法:

 @Override
    public void subscribeActual(Observer<? super U> t) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
       //返回包裝類(lèi)MergeObserver
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

最后返回MergeObserver,顯而易見(jiàn)這是實(shí)現(xiàn)flatMap操作的核心類(lèi)。

MergeObserver

看關(guān)鍵方法onNext:

  @Override
        public void onNext(T t) {
            // safeguard against misbehaving sources
            if (done) {
                return;
            }
            ObservableSource<? extends U> p;
            try {
               //調(diào)用Function方法轉(zhuǎn)換成新的Observable
                p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                s.dispose();
                onError(e);
                return;
            }

            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    if (wip == maxConcurrency) {
                        sources.offer(p);
                        return;
                    }
                    wip++;
                }
            }
            //默認(rèn)情況會(huì)走到這里
            subscribeInner(p);
        }

由于我們默認(rèn)調(diào)用的flatmap 的 maxConcurrency 大小是 Integer.MAX_VALUE, 所以最終會(huì)調(diào)用 subscribeInner(p),注意這里我們的mapper方法以及被調(diào)用了,p就是跟我們傳入的Function生成的Observable,我們?cè)倮^續(xù)往下看

       @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                if (p instanceof Callable) {
                    tryEmitScalar(((Callable<? extends U>)p));

                    if (maxConcurrency != Integer.MAX_VALUE) {
                        synchronized (this) {
                            p = sources.poll();
                            if (p == null) {
                                wip--;
                                break;
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                     //一般情況下,返回的Observable 都不是 Callable類(lèi)型的,走這里
                    //構(gòu)造最終的觀察者InnerObserver,它是真正接收數(shù)據(jù)的
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    addInner(inner);
                    p.subscribe(inner);
                    break;
                }
            }
        }

終于找到最后的接受者InnerObserver,接著看它的onNext方法:

        @Override
        public void onNext(U t) {
            if (fusionMode == QueueDisposable.NONE) {
                //MergeObserable的方法
                parent.tryEmit(t, this);
            } else {
                parent.drain();
            }
        }

paren為上面的MergeObserable的tryEmit方法,看字面意思是嘗試發(fā)送,所以可能會(huì)引入同步機(jī)制,下面是tryEmit方法:

 void tryEmit(U value, InnerObserver<T, U> inner) {
            //引入同步機(jī)制,保證每個(gè)時(shí)刻只發(fā)射一個(gè)數(shù)據(jù)給最后的接受者
            //嘗試獲取cas鎖
            if (get() == 0 && compareAndSet(0, 1)) {
                //最后的接受者
                actual.onNext(value);
                if (decrementAndGet() == 0) {
                    //釋放鎖返回
                    return;
                }
            } else {//沒(méi)有拿到鎖,就把數(shù)據(jù)加入隊(duì)列
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            //輪詢(xún)隊(duì)列,取數(shù)據(jù)
            drainLoop();
        }

這里需要提一下:MergeObserver 繼承了 AtomicInteger,所以這里的tryEmit方法就利用了 AtomicInteger 的同步機(jī)制,同時(shí)只會(huì)有一個(gè) value 被 actual Observer 發(fā)射,而且這里 剛好 可以解答我們上面留下的 問(wèn)題,由于 AtomicInteger CAS鎖只能保證操作的原子性,并不保證鎖的獲取順序,是搶占式的,所以最終數(shù)據(jù)的發(fā)射順序并不是固定的(同一個(gè)Observable發(fā)出的數(shù)據(jù)是有序的)。
如果沒(méi)有獲取到鎖,就會(huì)將要發(fā)射的數(shù)據(jù)放入 隊(duì)列中,drainLoop 方法會(huì)循環(huán)去獲取隊(duì)列中的 數(shù)據(jù),然后發(fā)射。

void drainLoop() {
            //取最終的觀察者
            final Observer<? super U> child = this.actual;
            int missed = 1;
            for (;;) {
                //錯(cuò)誤檢查
                if (checkTerminate()) {
                    return;
                }
                SimplePlainQueue<U> svq = queue;

                if (svq != null) {
                    for (;;) {
                        U o;
                        for (;;) {
                            if (checkTerminate()) {
                                return;
                            }
                            //取數(shù)據(jù)
                            o = svq.poll();
                            if (o == null) {
                                break;
                            }
                            //交給最后的觀察者
                            child.onNext(o);
                        }
                        if (o == null) {
                            break;
                        }
                    }
                }
                .....
                //隊(duì)列數(shù)據(jù)輪詢(xún)完畢,做完成或者錯(cuò)誤處理
                if (d && (svq == null || svq.isEmpty()) && n == 0) {
                    Throwable ex = errors.get();
                    if (ex == null) {
                        child.onComplete();
                    } else {
                        child.onError(errors.terminate());
                    }
                    return;
                }
              ....
        }

這個(gè)方法就是從循環(huán)從數(shù)據(jù)隊(duì)列中取數(shù)據(jù)交給最后的觀察者接收。
FlatMap引入CAS機(jī)制,在onNext方法中嘗試拿到鎖,如果拿到則立即交給最終的觀察者,否則加入等待隊(duì)列。
關(guān)于map操作法我們先講到這里,水平有限,難免有紕漏,還望多多指正!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容