RxJava2

1.??什么時候使用Flowable,什么時候使用Observable?

下面是官方文檔(原文)的直接翻譯:

一個小遺憾是,在RxJava 0.x引入背壓(backpressure)時,并沒有使用一個獨(dú)立的基礎(chǔ)reactive類,而是直接在Observable的基礎(chǔ)上進(jìn)行改進(jìn)了。

背壓的主要問題是,許多hot sources比如UI event,它們不能合理地被背壓,然后導(dǎo)致我們不想看到的MissingBackpressureException

注:hot sources直譯是熱來源,其真實(shí)表示的是hot Observable,也就是冷熱Observable中的熱Observable

對于這種情況,我們在2.x中這樣嘗試補(bǔ)救:將io.reactivex.Observable 設(shè)為非背壓(non-backpressured),同時增加一個新的基礎(chǔ)reactive類io.reactivex.Flowable,而這個類是可背壓的(backpressure-enabled)

好消息是2.x的操作符幾乎與之前保持一致。壞消息是大家在自動導(dǎo)包(organize imports)時需要注意,不要無意中就選擇了不支持背壓的io.reactivex.Observable

  • 注:項目如果使用了2.x之前的版本的RxJava,即使有些場景需要背壓,但當(dāng)時只能使用io.reactivex.Observable;所以當(dāng)遷移到2.x時,要注意將這部分代碼改成使用io.reactivex.Flowable,因?yàn)榍罢咴?2.x時不支持背壓。

什么時候使用Observable?

  • 當(dāng)你的數(shù)據(jù)流最多也不會超過1000個元素(element)時,也就是說一段時間內(nèi)只有很少的元素發(fā)射,所以你的應(yīng)用不大可能發(fā)生內(nèi)存溢出。
  • 當(dāng)你處理GUI事件,例如鼠標(biāo)或者觸摸事件時,這些事件很難被合理地背壓,并且不會頻繁的發(fā)生。你可以使用Observable去處理頻率小于等于1000赫茲的元素發(fā)送,并且盡量考慮使用sampling/debouncing 等操作符。
  • 本來你的數(shù)據(jù)流是異步的,但你的平臺不支持Java流或者你分不清該使用Observable還是Flowable時,使用ObservableFlowable有更小的開銷。

什么時候使用Flowable?

  • 當(dāng)處理超過10k的元素,這些元素生成自某處并且具備某些特性,因此數(shù)據(jù)鏈(Chain)可以告訴來源(Source)去限制生成量。
  • 讀取或者解析來自硬盤的文件自然而然地會產(chǎn)生阻塞(blocking),并且是基于拉取式(pull-based)的。在你的控制下,同樣可以很好地處理好背壓。比如:在特定的請求量下,你會讀取多少行的數(shù)據(jù)。
  • 通過JDBC讀取數(shù)據(jù)庫同樣是基于拉取式并且會產(chǎn)生阻塞,但是你可以通過調(diào)用ResultSet.next()得到很好的控制,并且通過它,幾乎可以應(yīng)對每一條下流的請求。
  • 網(wǎng)絡(luò)(流)IO:網(wǎng)絡(luò)請求或者是一些支持請求邏輯量的協(xié)議。
  • 一些阻塞和/或基于拉取式的數(shù)據(jù)源,但是未來也許會提供非阻塞響應(yīng)式的API或者驅(qū)動。

2.??Consumer,F(xiàn)unction

這里說的兩個類指的是io.reactivex.functions包下的:

io.reactivex.functions包

先說Consumer:

2.x的 Consumer ??等于?? 1.x的 Action1

2.x的 BiConsumer ??等于?? 1.x的 Action2

2.x的 Consumer<Object[]> ??等于?? 1.x的 ActionN

2.x的 Action ??等于?? 1.x的 Action0

2.x中 ??沒有?? 1.x中的Action3~Action9

再說Function:

2.x的 Function ??等于?? 1.x的 Func

2.x的 BiFunction ??等于?? 1.x的 Func2

2.x的 Function3~ Fucntion9 ??等于?? 1.x的 Func3~Func9

2.x的 Function<Object[], R> ??等于?? 1.x的 FuncN

很明顯1.x的命名不太規(guī)范,2.x中采用通用的相對合理的命名。
然而2.x命名規(guī)范并不是RxJava自己設(shè)計的,而是與Java1.8中相同功能的同名類的命名保持一致(不僅是類名,還有方法名)

Java1.8中新增一個包:java.util.function
來大體瀏覽下這個包(非所有類)

java.util.function包

可以看出:Java1.8中沒有Action,Function3~Function9。

Consumer的作用

Consumer描述了這樣的一種操作(operation):接收一個傳入的參數(shù)(argument),并且不返回結(jié)果(result)。
使用Consumer的目的是,根據(jù)傳入的值,來做相應(yīng)的事。所以重點(diǎn)是accept方法:

 * @since 1.8
 */
@FunctionalInterface
public interface Consumer<T> {

    /**
     * Performs this operation on the given argument.
     *
     * @param t the input argument
     */
    void accept(T t);

同理:BiConsumer是接收兩個傳入的參數(shù),并且不返回結(jié)果

 * @since 1.8
 */
@FunctionalInterface
public interface BiConsumer<T, U> {

    /**
     * Performs this operation on the given arguments.
     *
     * @param t the first input argument
     * @param u the second input argument
     */
    void accept(T t, U u);

上面2個源碼是Java1.8的,因?yàn)樽⑨尡容^詳細(xì)。。。
下面看下RxJava2.x的:

/**
 * A functional interface (callback) that accepts a single value.
 * @param <T> the value type
 */
public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}
/**
 * A functional interface (callback) that accepts two values (of possibly different types).
 * @param <T1> the first value type
 * @param <T2> the second value type
 */
public interface BiConsumer<T1, T2> {

    /**
     * Performs an operation on the given values.
     * @param t1 the first value
     * @param t2 the second value
     * @throws Exception on error
     */
    void accept(T1 t1, T2 t2) throws Exception;
}
/**
 * A functional interface similar to Runnable but allows throwing a checked exception.
 */
public interface Action {
    /**
     * Runs the action and optionally throws a checked exception.
     * @throws Exception if the implementation wishes to throw a checked exception
     */
    void run() throws Exception;
}

對于Consumer,RxJava2.x與Java1.8最大的區(qū)別是:
accept方法默認(rèn)都會拋出Exception,這也是2.x新加入的特性。
(另外稍微提下,后者比前者多了一個andThen方法。)

Function的作用

Function描述了這樣的一種功能(function):接收一個參數(shù)(argument),并且產(chǎn)生一個結(jié)果(result)。
使用Function的目的是,根據(jù)傳入的值,來輸出一個值。所以重點(diǎn)是apply方法:

 * @since 1.8
 */
@FunctionalInterface
public interface Function<T, R> {

    /**
     * Applies this function to the given argument.
     *
     * @param t the function argument
     * @return the function result
     */
    R apply(T t);

同理:BiFunction是接收兩個參數(shù),并且產(chǎn)生一個結(jié)果

* @see Function
 * @since 1.8
 */
@FunctionalInterface
public interface BiFunction<T, U, R> {

    /**
     * Applies this function to the given arguments.
     *
     * @param t the first function argument
     * @param u the second function argument
     * @return the function result
     */
    R apply(T t, U u);

同樣也比較下Rxjava2.x的:

/**
 * A functional interface that takes a value and returns another value, possibly with a
 * different type and allows throwing a checked exception.
 *
 * @param <T> the input value type
 * @param <R> the output value type
 */
public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}
/**
 * A functional interface (callback) that computes a value based on multiple input values.
 * @param <T1> the first value type
 * @param <T2> the second value type
 * @param <R> the result type
 */
public interface BiFunction<T1, T2, R> {

    /**
     * Calculate a value based on the input values.
     * @param t1 the first value
     * @param t2 the second value
     * @return the result value
     * @throws Exception on error
     */
    @NonNull
    R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
}
/**
 * A functional interface (callback) that computes a value based on multiple input values.
 * @param <T1> the first value type
 * @param <T2> the second value type
 * @param <T3> the third value type
 * @param <R> the result type
 */
public interface Function3<T1, T2, T3, R> {
    /**
     * Calculate a value based on the input values.
     * @param t1 the first value
     * @param t2 the second value
     * @param t3 the third value
     * @return the result value
     * @throws Exception on error
     */
    @NonNull
    R apply(@NonNull T1 t1, @NonNull T2 t2, @NonNull T3 t3) throws Exception;
}

同樣,對于Function,RxJava2.x與Java1.8最大的區(qū)別是:
apply方法默認(rèn)都會拋出Exception,這也是2.x新加入的特性。
(另外稍微提下,后者比前者多了一個andThen方法。)

最后,大家可能有疑問:BiConsumerBiFcuntionBi是什么意思?
答案就是:Binary 。當(dāng)然這里不是"二進(jìn)制"的意思,而是"兩"個參數(shù)的意思。

3.??觀察者和被觀察者

來看段經(jīng)典的設(shè)計模式書籍---Head First怎么說?

出版者 + 訂閱者 = 觀察者模式

如果你了解報紙的訂閱是怎么回事,其實(shí)就知道觀察者模式是怎么回事,只是名稱不太一樣:出版者改稱為“主題”(Subject),訂閱者改稱為“觀察者”(Observer)

所以上面出現(xiàn)四個關(guān)鍵字:

Publisher(出版者) -> Subject(主題)

Subscriber(訂閱者) -> Observer(觀察者)

比較:Observable,ObservableSource,F(xiàn)lowable,Publisher

2.x 和 1.x中Observable的父類都是Object,
但前者額外實(shí)現(xiàn)了ObservableSource接口,里面只有一個方法:

void subscribe(@NonNull Observer<? super T> observer)

兩者在2.x 和 1.x 的包名分別為:io.reactivexrx 。

與2.x的Observable相似,Flowbale也實(shí)現(xiàn)了一個接口,
該接口就是Publisher!
同樣只有一個方法:

void subscribe(Subscriber<? super T> s);

需要注意的是,Publisher并沒有在io.reactivex包內(nèi),而是在org.reactivestreams內(nèi),該包內(nèi)只有4個類,但每個都非常重要:

org.reactivestreams包

比較:Observer,Subscriber,Subscription,Disposable

先看下1.x的Observablesubscribe()方法:

subscribe()方法

如上圖所示:每個方法subscribe()的方法都返回Subscription對象
Subscription是一個接口,就2個方法:unsubscribe()isUnsubscribed()

然后看下Observer,同樣是一個接口,就那3個最常用的方法:onNext(),onComplete(),onError()

Subscriber剛好同時實(shí)現(xiàn)了ObserverSubscription

public abstract class Subscriber<T> implements Observer<T>, Subscription

雖然Observablesubscribe()時可以傳入Observer,但實(shí)際處理時,會先把Observer轉(zhuǎn)為Subscriber

 public final Subscription subscribe(final Observer<? super T> observer) {
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber<? super T>)observer);
    }
    if (observer == null) {
        throw new NullPointerException("observer is null");
    }
    return subscribe(new ObserverSubscriber<T>(observer));
}

上面是對于1.x的Observer,Subscription,Subscriber的簡單理解,前兩者是接口,最后一個是抽象類,并且都在rx包內(nèi)。


來看下2.x,先是Observablesubscribe()方法:

Observable 的 subscribe()方法

與1.x的不同點(diǎn):

返回值是Disposable而不再是Subscription。

當(dāng)subscribe的是Observer時,并沒有返回值。

并且不能1.x那樣subscribe一個Subscriber。

先看下Disposable,只有兩個方法:dispose()isDisposed()??吹竭@里應(yīng)該會有下意識的反應(yīng):Disposable與1.x的Subscription如出一轍!很明顯,它確實(shí)是這個作用。

然后我們來看下Observer,它仍然是一個接口,但比1.x多出一個方法:

void onSubscribe(@NonNull Disposable d);

到此,就可以解釋為什么當(dāng)Observable subscribe的是Observer時沒有返回值,因?yàn)?code>Observer內(nèi)部的方法已經(jīng)提供了Disposable的引用。
同時,還由于Disposable與1.x的Subscription的作用相同,而1.x的Subscriber實(shí)現(xiàn)了ObserverSubscription,所以實(shí)際上,2.x的Observer扮演了1.x中Subscriber的角色!

下面再看下Flowable

Flowable 的 subscribe()方法

如上圖所示:

Flowable的前5個方法同Observable的一致,返回值都Disposable。

但是Flowable只能subscribe的是Subscriber,而非Observer。
(FlowableSubscriberSubscriber的子類,也是個接口,兩者區(qū)別是前者的onSubscribe方法不可以傳空對象,后者可以),

所以我們重點(diǎn)看下2.x的Subscriber

2.x的Subscriber是個接口(1.x是抽象類),與2.x的Observer非常相似,除了onNext,onComplete,onError外,還有:

void onSubscribe(Subscription s);

這里又出現(xiàn)了Subscription,同樣有2個方法,但是不同于1.x的unsubscribe()isUnsubscribed(),以及2.xDisposabledispose()isDisposed(),它的兩個方法是:

/**
 * No events will be sent by a {@link Publisher} until demand is signaled via this method.
 *
 * It can be called however often and whenever needed—but the outstanding cumulative demand must never exceed Long.MAX_VALUE.
 * An outstanding cumulative demand of Long.MAX_VALUE may be treated by the {@link Publisher} as "effectively unbounded".
 *
 * Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
 * 
 * A {@link Publisher} can send less than is requested if the stream ends but
 * then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}.
 *
 * @param n the strictly positive number of elements to requests to the upstream {@link Publisher}
 */
void request(long n);

/**
 * Request the {@link Publisher} to stop sending data and clean up resources.
 * 
 * Data may still be sent to meet previously signalled demand after calling cancel.
 */
void cancel();

request()的重要性及作用就不多說了,說下cancel()
Disposabledispose()和1.x Subscriptionunsubscribe()僅僅是不再接收上流數(shù)據(jù),并不影響上流數(shù)據(jù)的發(fā)送。
而2.x Subscriptioncancel()所做的是,直接讓上流來源停止發(fā)送數(shù)據(jù),并且清空數(shù)據(jù)。

最后還有小注意點(diǎn):

2.x的ObservableFlowable當(dāng)subscribe的都是Consumer的時候,兩者的返回值都是Disposable

而2.x的ObserverSubscriber的區(qū)別只有onSubscribe方法(都有onNext,onCompleteonError三個方法),且兩者onSubscribe方法的區(qū)別只是:前者的接收的參數(shù)是Disposable,而后者是Subscription。也就是說Subscription只有在使用Subscriber時才會用到,而Subscriber只有在使用Flowable時才會用到。
所以Subscription只有在使用Flowable時才會用到。

另外:2.xSubscriberSubscription并不在io.reactivex內(nèi),
而是在org.reactivestreams中(就是介紹Publisher時貼出的4個類)。

比較:Subject

這個就簡單了,1.x 中的Subject 與 2.x 中的Subject所用是一致的:

Represents an Observer and an Observable at the same time

但由于2.x中加入了Flowable,也就意味著2.x中的Subject的覆蓋范圍沒有1.x中那么廣。兩者分別位于2.x 和 1.x 的包:io.reactivex.subjectsrx.subjects 。

4. 操作符的決策樹

源自:A Decision Tree of Observable Operators

我想創(chuàng)建一個Observable

只需要發(fā)射一個item:Just

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容