響應(yīng)式編程開源庫(kù) RxJava2——起源

1.RxJava來(lái)源

從github上我們可以看到RxJava是隸屬于ReactiveX(Rx)倉(cāng)庫(kù),Rx是從微軟的函數(shù)響應(yīng)式編程庫(kù)(Reactive Extensions)發(fā)展而來(lái)的, 提供了一種新的組織和協(xié)調(diào)異步事件的方式。
官方對(duì)于該庫(kù)的介紹是,

  1. An API for asynchronous programming with observable streams
  2. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming
    ReactiveX提供可觀測(cè)流的異步編程API,它的靈感來(lái)源于觀察者模式、迭代器模式和函數(shù)編程這些優(yōu)秀思想。
    ReactiveX作為一個(gè)通用庫(kù), 現(xiàn)在已經(jīng)有多種語(yǔ)言的實(shí)現(xiàn)版本(都是開源的), 包含RxJava, RxCpp, RxSwift, RxKotlin, RxGroovy, RxJavaScript等具體可參照所有支持語(yǔ)言

2.什么是RxJava

RxJava 是 ReactiveX 在 Java 上的開源的實(shí)現(xiàn)。
網(wǎng)上大多概況為它是一個(gè)實(shí)現(xiàn)了異步操作的庫(kù)。這種概括來(lái)源于官方但是太過(guò)片面,它不僅僅異步一個(gè)功能,在使用中發(fā)現(xiàn)它對(duì)于數(shù)據(jù)流的各種處理非常方便。

下面是Github上RxJava庫(kù)的官方簡(jiǎn)介。

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

RXJava——JVM的響應(yīng)性擴(kuò)展——在Java VM上使用可觀察序列(即ReactiveX中提到的可觀測(cè)流)編寫異步和基于事件的程序的庫(kù)。
簡(jiǎn)而言之,就是在使用Java編程時(shí),該庫(kù)能使用觀察者模式編寫異步和基于事件的程序。

3.關(guān)鍵詞解析

既然是學(xué)習(xí)就要搞清楚細(xì)節(jié),所以對(duì)于這些平時(shí)聽到的名詞,還是值得我們基于極客和刨根問(wèn)底的精神來(lái)細(xì)細(xì)品味的。

觀察者模式

觀察者模式(有時(shí)又被稱為模型(Model)-視圖(View)模式、源-收聽者(Listener)模式或從屬者模式)是軟件設(shè)計(jì)模式的一種。在此種模式中,一個(gè)目標(biāo)物件管理所有相依于它的觀察者物件,并且在它本身的狀態(tài)改變時(shí)主動(dòng)發(fā)出通知。這通常透過(guò)呼叫各觀察者所提供的方法來(lái)實(shí)現(xiàn)。此種模式通常被用來(lái)實(shí)現(xiàn)事件處理系統(tǒng)。

迭代器模式

提供一種方法順序訪問(wèn)一個(gè)聚合對(duì)象中各個(gè)元素,而又不需暴露該對(duì)象的內(nèi)部表示。比如Java中的LinkedList內(nèi)部實(shí)現(xiàn)是雙向鏈表,比較復(fù)雜,這時(shí)候通過(guò)迭代器Iterator遍歷元素就很簡(jiǎn)單,我們不必關(guān)注內(nèi)部實(shí)現(xiàn)。

函數(shù)式編程

函數(shù)式編程是種編程方式,它將電腦運(yùn)算視為函數(shù)的計(jì)算。函數(shù)編程語(yǔ)言最重要的基礎(chǔ)是λ演算(lambda calculus),而且λ演算的函數(shù)可以接受函數(shù)當(dāng)作輸入(參數(shù))和輸出(返回值)。
函數(shù)式編程中的函數(shù),是指數(shù)學(xué)中的函數(shù),即自變量的映射(一種東西和另一種東西之間的對(duì)應(yīng)關(guān)系)。也就是說(shuō),一個(gè)函數(shù)的值僅決定于函數(shù)參數(shù)的值,不依賴其他狀態(tài)。

響應(yīng)式編程

簡(jiǎn)稱RP(Reactive Programming)
在計(jì)算機(jī)領(lǐng)域,響應(yīng)式編程是一個(gè)專注于數(shù)據(jù)流和變化傳遞的異步編程范式。這意味著可以使用編程語(yǔ)言很容易地表示靜態(tài)(例如數(shù)組)或動(dòng)態(tài)(例如事件發(fā)射器)數(shù)據(jù)流,并且在關(guān)聯(lián)的執(zhí)行模型中,存在著可推斷的依賴關(guān)系,這個(gè)關(guān)系的存在有利于自動(dòng)傳播與數(shù)據(jù)流有關(guān)的更改。
簡(jiǎn)單概括,本質(zhì)上是程序會(huì)對(duì)數(shù)據(jù)流或某種變化所作出相應(yīng)的反應(yīng)。

4.從基本使用RxJava分析

掘金翻譯計(jì)劃中看到的有關(guān)RxJava的介紹講的很仔細(xì),解決了很多困惑。還是從最基本的使用入手,下面是最基本的最簡(jiǎn)單的用法。

/****************創(chuàng)建被觀察者******************/
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                //執(zhí)行耗時(shí)操作
                Thread.sleep(2000);
                emitter.onNext("耗時(shí)操作完畢,發(fā)送可觀測(cè)的對(duì)象");
            }
        })
                /****************線程調(diào)度,被觀察者在IO線程,觀察者在主線程訂閱******************/
                .observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())


                /****************被觀察者被觀察者訂閱******************/
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {

                        Log(s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

我們從類名就能大概簡(jiǎn)單理解到整個(gè)代碼的意思,創(chuàng)建一個(gè)被觀察者Observable,如果它被觀察者Observer訂閱了,那么事件發(fā)射器ObservableEmitter就會(huì)將事件推到每個(gè)觀察者。其中ObservableOnSubscribe在源碼中的解釋如下

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of an {@link ObservableEmitter} instance that allows pushing
 * events in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

它是一個(gè)擁有 subscribe方法的功能性接口,能接收一個(gè) ObservableEmitter對(duì)象,該對(duì)象能安全的推送事件,并能以安全方式取消。
注意這里的 subscribe方法的描述,每個(gè)訂閱該事件的觀察者都會(huì)調(diào)用這個(gè)方法,擁有這個(gè)發(fā)射器。

舉個(gè)例子來(lái)說(shuō)明,比如現(xiàn)在有很多學(xué)生訂閱21世紀(jì)英語(yǔ)報(bào),那么報(bào)社就相當(dāng)于被觀察者Observable,我們相當(dāng)于觀察者Observer,送報(bào)人就相當(dāng)于這里的事件發(fā)射器ObservableEmitter。我們想要看報(bào)紙不需要去詢問(wèn)報(bào)社什么時(shí)候有新的報(bào)紙,只需要訂閱,如果有新的報(bào)紙,送報(bào)人就會(huì)給我們送來(lái)新的報(bào)紙。

5.Observable類

下面就Observable這個(gè)類,結(jié)合ReactiveX中的關(guān)于Observable的解釋和RxJava中的文檔解釋來(lái)分析一下這個(gè)類。
ReactiveX中的介紹:在ReactiveX中,觀察者訂閱一個(gè)被觀察者。然后,觀察者可對(duì)可觀察到的項(xiàng)目或項(xiàng)目序列做出反應(yīng)。這種模式有利于并發(fā)操作。因?yàn)樗粫?huì)在等待被觀察者發(fā)射對(duì)象時(shí)阻塞,而是以觀察者的形式,隨時(shí)準(zhǔn)備對(duì)被觀察者在未來(lái)的任何時(shí)間做出適當(dāng)?shù)姆磻?yīng)。
下面這種圖表示了在ReactiveX文檔中被觀察者是如何轉(zhuǎn)換的,文檔中會(huì)多次出現(xiàn)這樣類似的圖,對(duì)于我們理解很有幫助。

RxJava中的介紹:Observable是非背壓的,它是一個(gè)提供工廠方法、中間操作符來(lái)同步或異步的處理數(shù)據(jù)流的基類。該類中的許多運(yùn)算符接受ObservableSource(s),這是非背壓流的基本接口,Observable本身也實(shí)現(xiàn)了這種接口。ObservableSource是可被觀察者處理的非背壓被觀察者源基本接口。它里面的subscribe方法是為了將觀察者和被觀察者聯(lián)系起來(lái),也就是產(chǎn)生訂閱關(guān)系。

/**
 * Represents a basic, non-backpressured {@link Observable} source base interface,
 * consumable via an {@link Observer}.
 *
 * @param <T> the element type
 * @since 2.0
 */
public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}

6.各方法作用

create方法在上面提到過(guò)就是創(chuàng)建被觀察者,調(diào)用ObservableEmitter發(fā)射器的方法如下圖,onNext會(huì)以推的方式發(fā)送數(shù)據(jù),onComplete表示已經(jīng)完成。


observeOn方法就是對(duì)被觀察者進(jìn)行線程調(diào)度,讓它在指定的線程發(fā)送。這里要注意的是,如果調(diào)度器真的是異步,那么在發(fā)送的線程中onError可能先于onNext執(zhí)行,如果需要嚴(yán)格的事件順序,請(qǐng)考慮使用 observeOn(Scheduler scheduler, boolean delayError)重載。


subscribeOn在指定線程異步訂閱被觀察者。訂閱過(guò)程是異步的,結(jié)果是在指定線程接收。


Observer就是觀察者基類接口,其中的方法和發(fā)射器基類接口的方法基本一致,所以發(fā)送的數(shù)據(jù)會(huì)在onNext中接收到,onError、onComplete同理,唯一多了的方法就是onSubscribe它會(huì)接收一個(gè)Disposable對(duì)象,以便隨時(shí)打斷被觀察者和觀察者之間的聯(lián)系。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容