響應式編程之Reactive streams

前言

上文簡單介紹了響應式編程和Reactor的使用,今天開始深入了解一下響應式編程的規(guī)范,為開始學習Reactor源碼做準備

Reactive streams

上文也簡單提到過,響應式的擴展庫有很多比如RxJava、Reactor,這就給上層應用帶來困擾,比如Spring中如果想支持響應式,那么到底是基于那個擴展庫開發(fā)吶?如果選擇了Reactor,今后想切換Reactor怎么辦?

導致這種問題的根本原因在于沒有同一的接口規(guī)范,就像jdk中有數(shù)據(jù)庫驅動的接口 Driver,各種各樣的數(shù)據(jù)庫只要去實現(xiàn)它我們的程序就可以自由切換,寫代碼時也不用關心底層實現(xiàn)

所以一些業(yè)界大佬共同定制了一個響應式規(guī)范即 Reactive Stream,maven可以通過以下方式引入

<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.3</version>
</dependency>

在java9中,響應式規(guī)范已被加入JDK中

接下來就結合Reactive streams接口代碼看看到底規(guī)范了什么內容

概念

從概念上講,Reactive streams所描述的響應式編程就是“發(fā)布者發(fā)布數(shù)據(jù),訂閱者根據(jù)發(fā)布結果作出對應響應

所以響應式編程的兩個角色:發(fā)布者 and 訂閱者

當然發(fā)布者和訂閱者之間可以編排一些中間處理流程,這些中間過程對于上游來說是訂閱者,對于下游來說是發(fā)布者,所以既是發(fā)布者又是訂閱者

最后是背壓的支持,所謂背壓,并沒有多么深奧,簡單來說就是訂閱者能控制發(fā)布者的發(fā)布速度,此時發(fā)布的主動權在訂閱者手中,訂閱者要多少,發(fā)布者就發(fā)布多少

Backpressure

這張圖比較貼切:

  • 圖1中,訂閱者是人,發(fā)布者是水瓶,訂閱者通過回給發(fā)布者壓力讓其發(fā)布(出水),再做出響應(喝水),發(fā)布的主動權在訂閱者,這就是Backpressure(背壓)
  • 圖2中,訂閱者是人,發(fā)布者是噴頭,噴頭完全忽略訂閱者的響應快慢,一個勁的噴水,導致訂閱者響應不過來,發(fā)布的主動權在發(fā)布者
接口

在看Reactive streams源碼,發(fā)布者抽象: Publisher

public interface Publisher<T> {
    // 訂閱
    public void subscribe(Subscriber<? super T> s);
}

沒錯,作為一個發(fā)布者,最終要的方法是可以接受訂閱,至于如何發(fā)布等留給實現(xiàn)者自己去實現(xiàn)

再看一下訂閱者: Subscriber

public interface Subscriber<T> {
    // 訂閱成功事件
    public void onSubscribe(Subscription s);
    // 接收到新消息事件
    public void onNext(T t);
    // 異常處理
    public void onError(Throwable t);
    // 訂閱完成、結束事件
    public void onComplete();
}

定義了訂閱者要處理的四種事件,其實也間接的限定了訂閱發(fā)布的模式

  • 訂閱成功時,會執(zhí)行onSubscribe回調
  • 發(fā)布新消息時,會執(zhí)行onNext回調
  • 發(fā)布出錯時,會執(zhí)行onError回調
  • 訂閱結束時,會執(zhí)行onComplete回調

再看一下訂閱成功回調的返回:Subscription,代表“本次訂閱”,相當于一次成功訂閱的訂單,通過它訂閱者可以向發(fā)布者請求n個數(shù)據(jù)或主動取消訂閱,這就是對“背壓”的支持

public interface Subscription {
    // 請求n個數(shù)據(jù)
    public void request(long n);
    // 取消訂閱
    public void cancel();
}

再看一下中間處理,上文說到它即是訂閱者又是發(fā)布者:Processor

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

其實就是繼承了訂閱者和發(fā)布者

Reactor

回到Reactor,它是Reactive streams的一個實現(xiàn),看看它是如何實現(xiàn)響應式規(guī)范的

Publisher

首先Reactor的Flux和Mono其實就是Reactive streams中的Publisher,只不過一個會發(fā)布0-N個,一個會發(fā)布0-1個

Flux

那么我們完全可以實現(xiàn)一個Reactive streams中的訂閱者去訂閱數(shù)據(jù)

Flux.just("a", "b", "c").subscribe(new Subscriber<String>() {
    Subscription subscription;
    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;
        subscription.request(1); // 請求1個
    }

    @Override
    public void onNext(String s) {
        System.out.println(s); // 響應
        subscription.request(1); // 再請求1個
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("completed"); // 完成
    }
});

最終輸出

a
b
c
completed
Consumer

再看一下下面這種一般訂閱寫法

Flux.just("a", "b", "c", "d").subscribe(System.out::println);

此時subscribe訂閱的訂閱者是Reactor自己封裝的訂閱者: Consumer,在subscribe方法中最終會被轉換為LambdaSubscriber

Consumer

而LambdaSubscriber就是Reactive streams中的Subscriber

LambdaSubscriber

之所以使用Consumer不需要去執(zhí)行request(n)是因為LambdaSubscriber在訂閱成功時就request(Long.MAX_VALUE)(一次性訂閱所有)

request

所以說Reactor可以支持背壓,但大部分常規(guī)寫法是不考慮背壓的,這主要因為一般場景真用不到

Processor

最后看一下reactor中的中間操作

Flux.just("a", "b", "c", "d").take(3).subscribe(System.out::println);

Reactor的中間操作如take、map、flatMap等并沒有實現(xiàn)Processor來做中間操作,而是自己定義了Operator中間操作

take:

take

map:

map

可能是Processor這個確實實現(xiàn)起來比較麻煩,而且其實主要符合發(fā)布訂閱規(guī)范就基本可以了,Reactor還是支持使用Processor的,但最新版本的Reactor中Processor已被無情棄用

最后

下一篇開始研究Reactor是如何實現(xiàn)Reactive streams規(guī)范的并提供響應式編程支持的,從源碼角度分析,并嘗試自己寫一個Reactive streams實現(xiàn)來對照Reactor源碼

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容