響應式編程開源庫 RxJava2——操作符

在上一篇響應式編程開源庫 RxJava2——Stream API中主要介紹了Java 8的Stream API,理解了什么是流,以及為什么要用流。它的實現(xiàn)基本上運用了前面part1、part2中學習過的重要概念。我們準備了這么久,下面將真正的進入RxJava的學習。
在前面已經(jīng)介紹過RxJava的官方定義,在Java VM上使用可觀察序列(即ReactiveX中提到的可觀測流)編寫異步和基于事件的程序的庫。前面我們也對RxJava的基本使用進行了簡單解析。RxJava可以使用Observable把某個對象轉(zhuǎn)變?yōu)橐粋€可觀測的序列(可觀測流),對其進行相應操作后作出相應的響應。
下面就從操作符入手,來學習使用RxJava。

操作符

Observable類里提供了很多操作符(intermediate operators)我們從里面不難發(fā)現(xiàn)有的操作符在前面介紹過的Stream API中也是有的。所以對于編程來說,用萬變不離其宗,殊途同歸來說的話一點不夸張。

1.create()

在前面part1我們已經(jīng)初步學習過該操作符。它可以把某對象轉(zhuǎn)化為可觀測的序列,簡單來說就是把某個對象轉(zhuǎn)變?yōu)楸挥^察者,并具有流的特性。

2.just()單個參數(shù)

源碼中共有十個just重載方法。他們能接收不同數(shù)量的參數(shù)。目前最大支持10個參數(shù)。



可以從下面這種圖看出他的含義,在前面已經(jīng)說明了這種圖所代表的意義。它能將多個參數(shù)轉(zhuǎn)變到同一個可觀測源中并且一次性發(fā)送。


@SuppressLint("CheckResult")
    public static void main(String[] args){
        Observable.just("s",2,true)
                .subscribe(s->System.out.println(s));
    }

上段代碼運行結(jié)果如下。


還是按照慣例從源碼再一次進行分析。先看看just(T item)只有一個參數(shù)的方法。

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

其中onAssembly方法返回的就是ObservableJust對象。那么just方法實際上返回的對象就是ObservableJust對象。從源碼中看到它是Observable的子類,public final class ObservableJust<T> extends Observable<T>。

@NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

下面我們看一看在ObservableJust中到底進行了什么操作。

        /**
         * Represents a constant scalar value.
         * @param <T> the value type
         */
        public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
            private final T value;
            public ObservableJust(final T value) {
                this.value = value;
            }
            @Override
            protected void subscribeActual(Observer<? super T> s) {
                ObservableScalarXMap.ScalarDisposable<T> sd = new ObservableScalarXMap.ScalarDisposable<T>(s, value);
                s.onSubscribe(sd);
                sd.run();
            }
            @Override
            public T call() {
                return value;
            }
        }

構(gòu)造方法中對傳入的數(shù)據(jù)進行了從新賦值,這就使得我們控制了對象的不可變性,我們進行的 Intermediate中間操作只是對 ObservableJust類中的變量value進行操作,并不會影響傳入的參數(shù)值。從下圖 可以看出,ObservableJust中subscribeActual(Observer<? super T> s)方法是基類 Observable中的抽象方法實現(xiàn)。Observable在重寫了ObservableSource接口中的方法subscribe(Observer<? super T> observer)時調(diào)用subscribeActual(observer);在ObservableJust中我們具體實現(xiàn)了subscribeActual(Observer<? super T> s)方法。s.onSubscribe(sd);給了觀察者一個中斷器 ScalarDisposable sd.run();方法將調(diào)用Observer接口中的onNext方法回調(diào)數(shù)據(jù)。

        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }


我們再來梳理一次Just()操作符的整個代碼內(nèi)部調(diào)用過程,Just(T item)會將數(shù)據(jù)賦值給ObservableJust類的value變量,嚴格控制了源數(shù)據(jù)的不可變性,之后調(diào)用Observable中的 subscribe(Observer<? super T> observer)方法將被觀察者和觀察者建立訂閱關系(RxJavaPlugins.onSubscribe(this, observer) ),之后通過ObservableJust中實現(xiàn)父類的抽象方法,void subscribeActual(Observer<? super T> observer)來一次性將數(shù)據(jù)發(fā)送。

  • Just()多個參數(shù)
    當Just()方法有多個參數(shù)時,其內(nèi)部和單個參數(shù)是有一定區(qū)別的。從以下源碼就可以看出,當Just有多個參數(shù)時,它實際上是利用了 fromArray操作符。
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> just(T item1, T item2) {
        ObjectHelper.requireNonNull(item1, "The first item is null");
        ObjectHelper.requireNonNull(item2, "The second item is null");
        return fromArray(item1, item2);
    }

那我們就來研究下fromArray操作符內(nèi)部具體是怎么實現(xiàn)的。

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> fromArray(T... items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        } else
        if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }

可以看出,當只有一個參數(shù)時,實際上還是運用的Just(T item)操作符。當多個參數(shù)時會將數(shù)據(jù)丟給 ObservableFromArray類。

    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }
    @Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
        s.onSubscribe(d);
        if (d.fusionMode) {
            return;
        }
        d.run();
    }

可以看到任然是重新賦值,保證源數(shù)據(jù)的不可變性。之后通過d.run();方法發(fā)送數(shù)據(jù)。

 void run() {
            T[] a = array;
            int n = a.length;
            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                actual.onNext(value);
            }
            if (!isDisposed()) {
                actual.onComplete();
            }
        }

可以看到這里使用了for循環(huán)遍歷,將元素一個個發(fā)送。通過上面的分析不難發(fā)現(xiàn)之前提到過的,當observable被訂閱后才會開始發(fā)送數(shù)據(jù)。因為只有subscribe后才會執(zhí)行Observable的各個子類中的方法subscribeActual(Observer<? super T> s)從而執(zhí)行各子類的run方法進行數(shù)據(jù)的發(fā)送。并且可以發(fā)現(xiàn),和create方式并不相同的是這里并不需要ObservableEmitter數(shù)據(jù)發(fā)射器的協(xié)助。這里的發(fā)送數(shù)據(jù)操作實際上就是回調(diào)。

3.以from開頭的操作符

  • fromArray(T... items)
    可以將一組數(shù)據(jù)轉(zhuǎn)為可觀測流,并且逐個發(fā)送參數(shù)中的每個元素。在上面的Just學習中,我們已經(jīng)大概了解了fromArray這個操作符。
    fromArray與just的主要區(qū)別主要取決于他們的參數(shù)個數(shù)。
  1. 單個參數(shù) 相當于都是Just,將參數(shù)的映射value一次發(fā)送。
  2. 多個參數(shù) 相當于fromArray 將參數(shù)的映射value遍歷后發(fā)送。
  • fromIterable(Iterable<? extends T> source)
    可以看到它只接收繼承Iterable接口的數(shù)據(jù),而在Java中集合類基本都繼承了該接口。所以這里只接收集合參數(shù)??梢钥吹酱笾逻^程和前面學習的兩個操作符差不多,只是內(nèi)部遍歷用上了集合特有的迭代器遍歷。它會遍歷集合,并將每個元素發(fā)送。
void run() {
            boolean hasNext;
            do {
                if (isDisposed()) {
                    return;
                }
                T v;
                try {
                    v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    actual.onError(e);
                    return;
                }
                actual.onNext(v);
                if (isDisposed()) {
                    return;
                }
                try {
                    hasNext = it.hasNext();
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    actual.onError(e);
                    return;
                }
            } while (hasNext);
            if (!isDisposed()) {
                actual.onComplete();
            }
        }
  • fromCallable(Callable<? extends T> supplier)
    它將會發(fā)送Callable返回的參數(shù)。
  • fromFuture(Future<? extends T> future)
    它將會發(fā)送Future中get函數(shù)的返回值。其中timeout參數(shù)是調(diào)用get()方法之前最多等待時間。
  • fromPublisher(Publisher<? extends T> publisher)
    通過官方介紹,可以看出該操作符并不推薦,而是希望盡可能用create代替。所以這里不過多學習。

If possible, use create(ObservableOnSubscribe) to create a source-like Observable instead.

4.range()

它會發(fā)送指定區(qū)間的int數(shù),每次發(fā)送比前一次數(shù)據(jù)大1,相當于for循環(huán)。


5.interval()

從源碼可以看出該操作符的作用是每隔一段時間發(fā)送一個從0開始的Long型數(shù)據(jù),可以說相當于一個定時器。在java中相當于TimerTimerTask的應用。



這里每次發(fā)送的是long型的count,發(fā)送一次后count+1。

6.timer()

延遲指定時間后發(fā)送數(shù)據(jù)0。其實也就是延時操作,相當于handler的postDelay。


7.empty()

不會發(fā)送任何數(shù)據(jù),并且立即調(diào)用onComplete終止操作。這在前面很多操作符里都有調(diào)用。比如當數(shù)組為空就會首先調(diào)用empty終止操作。

8.map()

首先看看map()在ReactiveX中的解釋。

transform the items emitted by an Observable by applying a function to each item

應用一個函數(shù)對每個發(fā)出的元素進行轉(zhuǎn)換。
所以對每個發(fā)出的元素應用指定函數(shù),并發(fā)送函數(shù)的返回值。從下圖也可以看出,它可以進行數(shù)據(jù)的轉(zhuǎn)換,比如將int轉(zhuǎn)為String。map()接收一個函數(shù)式接口Function作為參數(shù),而在前面介紹過它能接收一個參數(shù),返回另一個參數(shù)。所以決定它有轉(zhuǎn)換功能。


我們在結(jié)合之前的 just()或者說fromArray()來看看map是怎么進行數(shù)據(jù)的轉(zhuǎn)換的。

Observable.just(1,2,3,4).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "map"+integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                log(s);
            }
        });
    }

Observable.just(1,2,3,4)我們已經(jīng)清楚,當被觀察者訂閱后,就會遍歷依次發(fā)送每個元素。發(fā)送出去后被 map接收。那么map是怎么接收的呢?還是從源碼入手。

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

它最終返回的是ObservableMap對象,注意看它的參數(shù)。它接收了當前的Observable對象,和Function函數(shù)。我們在來看ObservableMap內(nèi)部。從下面看出ObservableMap的繼承關系。

ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U>
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U>
在ObservableMap中,注意構(gòu)造方法的 super(source);在父類AbstractObservableWithUpstream中通過它的構(gòu)造方法將會獲得Observable.just(1,2,3,4)創(chuàng)建的上游可觀測源,當訂閱后會調(diào)用 subscribeActual。新的可觀測源的映射source會和MapObserver建立新的訂閱關系。

 public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
 @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

所以這里just發(fā)送的數(shù)據(jù),會被 MapObserver再次訂閱而且只一次,這樣數(shù)據(jù)就完美的流向了map操作符。在MapObserver中實現(xiàn)了基類ObserveronNext方法。這時候just通過遍歷發(fā)送的數(shù)據(jù)就會一次被下面的onNext接收到。當接收到數(shù)據(jù),就會執(zhí)行mapper.apply(t)將數(shù)據(jù)轉(zhuǎn)化,最后發(fā)送轉(zhuǎn)化后的數(shù)據(jù)v
通過上面的分析就可以印證之前的學習,每一次的中間操作都會有一個新的數(shù)據(jù)源映射,并且其中很多構(gòu)造方法完美的解決了對象的不可變性。當訂閱后,所有的操作都是一次性完成,減少了時間復雜度。

      @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

9.flatMap()

transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
可以將可觀測數(shù)據(jù)源發(fā)送的數(shù)據(jù)轉(zhuǎn)化成多個可觀測源,最后把他們在融合大一個可觀測數(shù)據(jù)源中發(fā)送。但是不保證轉(zhuǎn)化后數(shù)據(jù)的順序。


Observable.just(1,2,3,4).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                return Observable.just("flatmap"+integer);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                log(s);
            }
        });

結(jié)合之前的just和map的學習,從代碼很容易看出just依次發(fā)送元素,flatMap利用Function接口將每個元素轉(zhuǎn)化為新的數(shù)據(jù)源,然后合并到一個可觀測源中發(fā)送。那么是怎么合并發(fā)送的呢?這里看下源碼。flatMap最終的可觀測源是ObservableFlatMapsubscribeActual方法中進行了數(shù)據(jù)源的映射重新訂閱source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));重新訂閱后在onNext中會將發(fā)送的數(shù)據(jù)轉(zhuǎn)化成新的數(shù)據(jù)源。

@Override
        public void onNext(T t) {
            // safeguard against misbehaving sources
            if (done) {
                return;
            }
            ObservableSource<? extends U> p;
            try {
                //將發(fā)送的數(shù)據(jù)轉(zhuǎn)化為新的可觀測源
                p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                s.dispose();
                onError(e);
                return;
            }

            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    if (wip == maxConcurrency) {
                        sources.offer(p);
                        return;
                    }
                    wip++;
                }
            }

            subscribeInner(p);
        }

最后執(zhí)行subscribeInner(p);一般我們的可觀測源不會是Callable類型,所以最終到了else里面,在這里面就可以看出每次發(fā)送來的數(shù)據(jù)轉(zhuǎn)化為新的數(shù)據(jù)源 p后,每次會創(chuàng)建一個InnerObserver(注意它的構(gòu)造方法參數(shù),是將MergeObserver對象傳入的)。通過addInner方法將多個InnerObserver添加到AtomicReference<InnerObserver<?, ?>[]> observers;中。每次轉(zhuǎn)化的數(shù)據(jù)源p都有一個對應InnerObserver。我們這里的p實際就是Observable.just("flatmap"+integer)所產(chǎn)生的可觀測源,對它調(diào)用p.subscribe(inner);就會把當前可觀測源中的數(shù)據(jù)讓對應的InnerObserver的onNext接收。這時候就會通過MergeObserver的tryEmit方法給下游下發(fā)數(shù)據(jù)。

void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                if (p instanceof Callable) {
                    if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
                        boolean empty = false;
                        synchronized (this) {
                            p = sources.poll();
                            if (p == null) {
                                wip--;
                                empty = true;
                            }
                        }
                        if (empty) {
                            drain();
                            break;
                        }
                    } else {
                        break;
                    }
                } else {
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    //每次轉(zhuǎn)化的數(shù)據(jù)源p都有一個對應InnerObserver
                    if (addInner(inner)) {
                        p.subscribe(inner);
                    }
                    break;
                }
            }
        }

        boolean addInner(InnerObserver<T, U> inner) {
            for (;;) {
                InnerObserver<?, ?>[] a = observers.get();
                if (a == CANCELLED) {
                    inner.dispose();
                    return false;
                }
                int n = a.length;
                InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
                System.arraycopy(a, 0, b, 0, n);
                b[n] = inner;
                if (observers.compareAndSet(a, b)) {
                    return true;
                }
            }
        }

最后我們在看InnerObserver的onNext方法和tryEmit方法,最終通過MergeObserver單一可觀測源調(diào)用tryEmit方法,通過最終觀察者的回調(diào)來發(fā)送數(shù)據(jù)。MergeObserver繼承了AtomicInteger,所以這里的tryEmit方法就利用了AtomicInteger的同步機制。所以同時只會有一個value被最終觀察者actual發(fā)送,由于AtomicInteger CAS鎖只能保證操作的原子性,并不保證鎖的獲取順序,是搶占式的,所以最終數(shù)據(jù)的發(fā)射順序并不是固定的。

 if (fusionMode == QueueDisposable.NONE) {
                parent.tryEmit(t, this);
            } else {
                parent.drain();
            }
void tryEmit(U value, InnerObserver<T, U> inner) {
            if (get() == 0 && compareAndSet(0, 1)) {
                actual.onNext(value);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }

大概的過程就是,通過MergeObserver來將發(fā)送的每個元素轉(zhuǎn)化為可觀測源,每個新的可觀察源p中的數(shù)據(jù)都會通過p.subscribe(inner);下發(fā)到對應的 InnerObserver。然后通過MergeObserver中的方法下發(fā)數(shù)據(jù)到最終訂閱者。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    BrotherChen閱讀 1,779評論 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    無求_95dd閱讀 3,487評論 0 21
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    測天測地測空氣閱讀 683評論 0 1
  • 響應式編程簡介 響應式編程是一種基于異步數(shù)據(jù)流概念的編程模式。數(shù)據(jù)流就像一條河:它可以被觀測,被過濾,被操作,或者...
    說碼解字閱讀 3,549評論 0 5
  • 本文基于 RxJava1.x 版本,閱讀本文前請先了解 RxJava[https://github.com/Rea...
    StoneHui閱讀 3,114評論 2 18

友情鏈接更多精彩內(nèi)容