RxJava操作符Zip、Merge、Concat

鏈式調(diào)用

鏈式調(diào)用.png

1、Zip

Zip

1.1、zip將多個observable并行執(zhí)行,通過function,轉(zhuǎn)成一個value給下游。
1.2、當最短的ObservableSource執(zhí)行完成后,最長的ObservableSource剩余部分不再執(zhí)行,也就是說,較長的Source的onComplete調(diào)用不到。當長度相等時,也會發(fā)生這種情況,比如zip(new ObservableSource[]{range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)} action2可能調(diào)用不到,比如action1已經(jīng)完成,而action2將要完成,還沒有完成。上述圖片[3,null]不會執(zhí)行。
1.3、zip源碼分析

public final class ObservableZip extends <T, R> extends Observable<R>{

  public void subscribeActual(Observer<? super R> observer) {
        ObservableSource<? extends T>[] sources = this.sources;
        int count = 0;
        count = sources.length;
        ZipCoordinator<T, R> zc = new ZipCoordinator<>(observer, zipper, count, delayError);
        zc.subscribe(sources, bufferSize);
    }

//ZipCoordinator
class ZipCoordinator{

    public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
            ZipObserver<T, R>[] s = observers;
            int len = s.length;
            for (int i = 0; i < len; i++) {
                s[i] = new ZipObserver<>(this, bufferSize);
            }
       
            this.lazySet(0);
            downstream.onSubscribe(this);
            for (int i = 0; i < len; i++) {
                if (cancelled) {
                    return;
                }
                sources[i].subscribe(s[I]);
            }
     }

     public void drain() {

            if (getAndIncrement() != 0) {
                return;
            }

            int missing = 1;

            final ZipObserver<T, R>[] zs = observers;
            final Observer<? super R> a = downstream;
            final T[] os = row;
            final boolean delayError = this.delayError;

            for (;;) {  //為了能夠執(zhí)行多次,避免使用同步代碼塊

                for (;;) {  //遍歷observer
                    int i = 0;
                    int emptyCount = 0;
                    for (ZipObserver<T, R> z : zs) {
                        if (os[i] == null) {  //先用歷史的判斷
                            boolean d = z.done;
                            T v = z.queue.poll();
                            boolean empty = v == null;

                            if (checkTerminated(d, empty, a, delayError, z)) {
                                return;
                            }
                            if (!empty) {
                                os[i] = v;
                            } else {
                                emptyCount++;
                            }
                        } else {
                            if (z.done && !delayError) {
                   
                            }
                        }
                        I++;
                    }
                    if (emptyCount != 0) {
                        break;
                    }

                    R v;
                    try {
                        v = Objects.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
                    } catch (Throwable ex) {
                     
                    }
                    a.onNext(v);
                    Arrays.fill(os, null);
                }
               //避免同步代碼塊
                missing = addAndGet(-missing);
                if (missing == 0) {
                    return;
                }
            }
        }
   }

 static final class ZipObserver<T, R> implements Observer<T> {

        final ZipCoordinator<T, R> parent;
        final SpscLinkedArrayQueue<T> queue;

        @Override
        public void onNext(T t) {
            queue.offer(t);
            parent.drain();
        }
   }
}

上述代碼,首先 sources[i].subscribe(s[I]),訂閱后,上游執(zhí)行onNext,就會調(diào)用到ZipObserver中的onNext,此時把執(zhí)行的結(jié)果在onNext中,保存在queue中。因為Observer是有序的,遍歷Observer,拿到每一個observer對應(yīng)的queue中的值。如果為null,則跳出循環(huán),每一個observer都取出相同index的值,則向下執(zhí)行apply方法。
1.4、實現(xiàn)一個并發(fā)執(zhí)行,順序返回結(jié)果的功能

 Observable.zip(getSource1(true), getSource2(true), new BiFunction<Integer, Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> apply(Integer integer, Integer integer2) throws Throwable {
                Log.d(TAG, "apply:  完成====+++++");
                return Observable.fromArray(integer, integer2);
            }
        }).concatMap(new Function<Observable<Integer>, ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> apply(Observable<Integer> integerObservable) throws Throwable {
                return integerObservable;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.d(TAG, "accept:--------- " + integer);
            }
        });

上述concatMap還可以簡寫成concatMap(Functions.identity())。
1.5、簡寫Zip功能,去掉轉(zhuǎn)換函數(shù)mapper。

public class ZipRxJava {

    private static final String TAG = "RxJavaZip";
    private Observable[] mObservables;
    private MyObserver[] mObservers;
    private Object[] row;

    public void rxjavaZip(Observable... sources) {
        if (sources == null || sources.length == 0) {
            return;
        }
        mObservables = sources;
        mObservers = new MyObserver[sources.length];
        row = new Object[sources.length];
        for (int i = 0; i < sources.length; i++) {
            MyObserver observer = new MyObserver(128);
            mObservers[i] = observer;
            mObservables[i].subscribe(mObservers[i]);
        }
    }

    private synchronized void drain() {
        Log.d(TAG, "drain:  開始begin====== " + Thread.currentThread().getName());
        boolean hasAllRunComplete = true;
        Object[] objects = row;
        for (int i = 0; i < mObservers.length; i++) {
            MyObserver observer = mObservers[i];
            if (objects[i] == null) {
                Object poll = observer.queue.poll();
                if (poll == null) {
                    hasAllRunComplete = false;
                    break;
                } else {
                    objects[i] = poll;
                }
            }
            Log.d(TAG, "drain: 數(shù)組   " + Arrays.toString(objects));
        }
        Log.d(TAG, "drain: 數(shù)組f " + Arrays.toString(objects));
        if (hasAllRunComplete) {
            for (int i = 0; i < row.length; i++) {
                Log.d(TAG, "drain: " + row[i]);
            }
            Arrays.fill(objects, null);
        }
    }

    class MyObserver<T> implements Observer<T> {

        final SpscLinkedArrayQueue<T> queue;

        public MyObserver(int count) {
            queue = new SpscLinkedArrayQueue<>(count);
        }

        @Override
        public void onNext(@NonNull T t) {
            queue.offer(t);
            drain();
        }
    }

}

使用

public void testRxjavaZip() {
        ZipRxJava rxJavaZip = new ZipRxJava();
        rxJavaZip.rxjavaZip(Observable.just(1, 2, 3).subscribeOn(Schedulers.newThread()),
                Observable.just(4, 5).delay(1, TimeUnit.SECONDS).subscribeOn(Schedulers.newThread()));
    }
2、merge
merge.png
  public static <@NonNull T> Observable<T> merge(@NonNull ObservableSource<? extends T> source1, @NonNull ObservableSource<? extends T> source2) {
        return fromArray(source1, source2).flatMap((Function)Functions.identity(), false, 2);
    }

2.1、fromArray、fromIterable
從上面看出fromArray是最上游的Observable,調(diào)用OnNext 會直接調(diào)用MergeObserver的onNext。source中的onNext會調(diào)用到InnerObserver中的onNext()

  void run() {
            boolean hasNext;
            do {
                T v;
                try {
                    v = Objects.requireNonNull(it.next(), "The iterator returned a null value");
                } catch (Throwable e) {
                }
                downstream.onNext(v);
                try {
                    hasNext = it.hasNext();
                } catch (Throwable e) {
                }
            } while (hasNext);
        }

2.2、flatMap
mapper : 轉(zhuǎn)換函數(shù),返回一個ObservableSource對象。
delayErrors : 延遲錯誤。
maxConcurrency :最大并發(fā)數(shù),同時可執(zhí)行多少個ObservableSource
bufferSize :緩存多少個ObservableSource對象

flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize)

看下FlatMap源碼
2.2.1、上游訂閱當前的MergeObserver,傳遞OnNext會傳遞到MegeObser的onNext中。

public final class ObservableFlatMap  extends Observable{

     @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
}

2.2.2、在onNext中使用mapper函數(shù)轉(zhuǎn)化,轉(zhuǎn)化后的是Observable對象,因此必須給emit出去。所以將改Observable訂閱內(nèi)部的InnerObserver,執(zhí)行內(nèi)部的onNext,然后調(diào)用下游真正的Observer的OnNext()

  @Override
        public void onNext(T t) {
            ObservableSource<? extends U> p;
            try {
                p = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
            }
            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    if (wip == maxConcurrency) {
                        sources.offer(p);
                        return;
                    }
                    wip++;
                }
            }
            subscribeInner(p);
        }

maxConcurrency 最大并發(fā)數(shù),如果超過了最大并發(fā)數(shù),就存在sources隊列中,當執(zhí)行完一個Observable之后,再從sources隊列中取。


merge.png

從上圖也可以看出,merge結(jié)果是無序的,但是每一個ObservableSource的結(jié)果是有序的。 當超過超過最大并發(fā)數(shù),就會等待前面的source執(zhí)行完,再執(zhí)行。

class InnerObserver{
        public void onNext(U t) {
            parent.tryEmit(t, this);
       }
}

class MergeObserver {

     void tryEmit(U value, InnerObserver<T, U> inner) {
            if (get() == 0 && compareAndSet(0, 1)) {
                downstream.onNext(value);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }


    void drainLoop() {
            final Observer<? super U> child = this.downstream;
            int missed = 1;
            for (;;) {

                boolean d = done;
                svq = queue;
                InnerObserver<?, ?>[] inner = observers.get();
                int n = inner.length;

                if (n != 0) {
                    int j = Math.min(n - 1, lastIndex);

                    sourceLoop:
                    for (int i = 0; i < n; i++) {
                        if (checkTerminate()) {
                            return;
                        }

                        @SuppressWarnings("unchecked")
                        InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
                        SimpleQueue<U> q = is.queue;
                        if (q != null) {
                            for (;;) {
                                U o;
                                try {
                                    o = q.poll();
                                } catch (Throwable ex) {
                          
                                }
                                if (o == null) {
                                    break;
                                }

                                child.onNext(o);

                                if (checkTerminate()) {
                                    return;
                                }
                            }
                        }

                        boolean innerDone = is.done;
                        SimpleQueue<U> innerQueue = is.queue;
                        if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                            removeInner(is);
                            innerCompleted++;
                        }

                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    lastIndex = j;
                }

                if (innerCompleted != 0) {
                    if (maxConcurrency != Integer.MAX_VALUE) {
                        subscribeMore(innerCompleted);
                        innerCompleted = 0;
                    }
                    continue;
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
}

上面代碼有這幾點
1)、 外部調(diào)用onNext 直接調(diào)用到了InnerObserver的onNext,將next值直接調(diào)用downstream.onNext(),或者存入到queue中。
2)、遍歷InnerObserver 從其中的queue中取出里面的值,調(diào)用downStream.onNext(),observer沒有值就遍歷下一個observer。
3)、是執(zhí)行完成一個source后,從sources隊列中取出新source,訂閱InnerObserver。
可以看出,外部的source誰先調(diào)用OnNext,就先調(diào)用誰的Observer, 然后執(zhí)行下游downStream.onNext()

3、concat

concat.png
 public static <@NonNull T> Observable<T> concat(@NonNull Iterable<@NonNull ? extends ObservableSource<? extends T>> sources) {
   fromIterable(sources).concatMapDelayError((Function)Functions.identity(), false, bufferSize());
    }

3.1 fromIterable這里和2.1是一樣的,也是上游發(fā)送數(shù)據(jù)的地方。
3.2 訂閱

class ObservableConcatMap {

 @Override
    public void subscribeActual(Observer<? super U> observer) {
            SerializedObserver<U> serial = new SerializedObserver<>(observer);
            source.subscribe(new SourceObserver<>(serial, mapper, bufferSize));
    }
}

3.3 將Observerable存到queue中,遍歷queue,轉(zhuǎn)化observable對象,訂閱到
InnerObserver。

static final class SourceObserver<T, U> {

        @Override
        public void onNext(T t) {
            if (fusionMode == QueueDisposable.NONE) {
                queue.offer(t);
            }
            drain();
        }

        void drain() {

            for (; ; ) {
                if (!active) {
                    boolean d = done;
                    T t;
                    try {
                        t = queue.poll();
                    } catch (Throwable ex) {
                    }
                    boolean empty = t == null;
                    if (!empty) {
                        ObservableSource<? extends U> o;
                        try {
                            o = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
                        } catch (Throwable ex) {
                        }

                        active = true;
                        o.subscribe(inner);
                    }
                }
            }
        }
     
      void innerComplete() {
            active = false;
            drain();
        }

}

 static final class InnerObserver<U>  {
        @Override
        public void onNext(U t) {
            downstream.onNext(t);
        }

            @Override
            public void onComplete() {
                parent.innerComplete();
            }

  }

從上面代碼看出,當一個InnerObserver執(zhí)行完成之后,將active 設(shè)置為false,然后for循環(huán)中再取下一個Observable對象訂閱。

4、總結(jié)

merge是上游調(diào)用OnNext之后,只要任何一個InnerObserver中有數(shù)據(jù),就調(diào)用downstream.onNext(),結(jié)果是無序的。
concat 只有一個source執(zhí)行完成之后,才會執(zhí)行下一個source,結(jié)果是有序的。
zip通過1.4可以實現(xiàn)有序,但是必須得等待source都執(zhí)行完成才能執(zhí)行。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容