響應(yīng)式編程入門之 Project Reactor

本文目標(biāo)

  • 理解響應(yīng)式編程

前言

之前的《聊聊 IO 多路復(fù)用》中,我們理解了非阻塞 IO 的意義。但是 Spring MVC 并不能完美的應(yīng)用非阻塞編程,于是 Spring 團(tuán)隊(duì)開發(fā)了 WebFlux,而 WebFlux 的基礎(chǔ)正是本文要講到的 Project Reactor(下文簡(jiǎn)稱為 Reactor)

本文以 Reactor 為例帶大家入門響應(yīng)式編程

版本

    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.4.6</version>
    </dependency>

什么是 Reactor

Reactor 是 JVM 的非阻塞響應(yīng)式編程基礎(chǔ),支持背壓。 它直接與 Java 8 函數(shù)式 API 集成,特別是 CompletableFuture、Stream 和 Duration。 它提供了可組合的異步序列 API — Flux(用于 [N] 個(gè)元素)和 Mono(用于 [0|1] 個(gè)元素),并實(shí)現(xiàn)了 Reactive Streams 規(guī)范。
在 Reactor 的基礎(chǔ)上還演化出了適合微服務(wù)架構(gòu)的 Reactor Netty 。為 HTTP(包括 Websockets)、TCP 和 UDP 提供支持背壓和響應(yīng)式的網(wǎng)絡(luò)引擎。

上面是對(duì)于官方文檔的翻譯。下面來說說我自己對(duì) Reactor 和響應(yīng)式編程的理解。

回想一下之前的非阻塞 IO 編程,例如我們現(xiàn)在要用非阻塞的方式調(diào)用一個(gè)遠(yuǎn)程服務(wù),當(dāng)遠(yuǎn)程接口數(shù)據(jù)可用時(shí)去做一些業(yè)務(wù)處理。這時(shí)候代碼怎么寫呢?我們需要提供一個(gè)回調(diào)函數(shù),然后在響應(yīng)就緒的時(shí)候,去調(diào)用我們的回調(diào)函數(shù)。

從邏輯上來看,這完全沒有問題。但是如果我們的回調(diào)很復(fù)雜,代碼看起來會(huì)是什么樣呢?

// 以下案例來自 Reactor 官網(wǎng)
userService.getFavorites(userId, new Callback<List<String>>() { 
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

這個(gè)代碼說實(shí)話已經(jīng)有點(diǎn)回調(diào)地獄那味兒了,讓一段不是很復(fù)雜的邏輯變得很難讀了。但是如果用 Reactor 寫呢?

// 以下案例來自 Reactor 官網(wǎng)
userService.getFavorites(userId) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions()) 
           .take(5) 
           .publishOn(UiUtils.uiThreadScheduler()) 
           .subscribe(uiList::show, UiUtils::errorPopup);

可以看到,代碼變得非常的簡(jiǎn)潔。唯一帶來的困擾就是,我們不知道這些函數(shù)到底是啥意思 ??

響應(yīng)式編程雖然有非常多的特性,但是它并不是什么神奇的技術(shù),它也是建立在傳統(tǒng)命令式編程的基礎(chǔ)上。只不過它所提供的 API 以及規(guī)范更適合在非阻塞 IO 中使用。雖然在非阻塞 IO 框架中幾乎只使用響應(yīng)式編程(Vertx,WebFlux),只是因?yàn)檫@樣做更合適,并不是說沒了響應(yīng)式編程,就玩不了非阻塞 IO 了。

響應(yīng)式編程內(nèi)幕

Reactor 實(shí)現(xiàn)了 org.reactivestreams 提供的 Java 響應(yīng)式編程規(guī)范,我們只要了解 reactivestreams 中代碼是如何運(yùn)轉(zhuǎn)的,再看 Reactor 相關(guān)的代碼就容易多了。

下圖展示了 reactivestreams 中的核心接口

reactivestreams 核心接口
  • Publisher:發(fā)布者

  • Subscriber:訂閱者

  • Subscription:這個(gè)單詞中文翻譯為名詞的訂閱,在代碼中它是發(fā)布者和訂閱者之間的媒介

  • Processor:該接口繼承了發(fā)布者和訂閱者,可以理解為發(fā)布者和訂閱者的中間操作(但是 Reactor 的中間操作并沒有實(shí)現(xiàn) Processor,在最新版本的 Reactor 中,Processor 的相關(guān)實(shí)現(xiàn)接口已經(jīng)被棄用)

在了解了響應(yīng)式編程的核心接口之后,我們來看下響應(yīng)式編程是如何運(yùn)作的

響應(yīng)式編程執(zhí)行邏輯

在 Reactor 中大部分實(shí)現(xiàn)都是按照上圖的邏輯來執(zhí)行的

  1. 首先是Subscriber(訂閱者)主動(dòng)訂閱 Publisher(發(fā)布者),通過調(diào)用 Publisher 的 subscribe 方法
  2. Publisher 在向下游發(fā)送數(shù)據(jù)之前,會(huì)先調(diào)用 Subscriber 的 onSubscribe 方法,傳遞的參數(shù)為 Subscription(訂閱媒介)
  3. Subscriber 通過 Subscription#request 來請(qǐng)求數(shù)據(jù),或者 Subscription#cancel 來取消數(shù)據(jù)發(fā)布(這就是響應(yīng)式編程中的背壓,訂閱者可以控制數(shù)據(jù)發(fā)布)
  4. Subscription 在接收到訂閱者的調(diào)用后,通過 Subscriber#onNext 向下游訂閱者傳遞數(shù)據(jù)。
  5. 在數(shù)據(jù)發(fā)布完成后,調(diào)用 Subscriber#onComplete 結(jié)束本次流,如果數(shù)據(jù)發(fā)布或者處理遇到錯(cuò)誤會(huì)調(diào)用 Subscriber#onError

調(diào)用 Subscriber#onNext,onComplete,onError 這三個(gè)方法,可能是在 Publisher 中做的,也可能是在 Subscription 中做的,根據(jù)不同的場(chǎng)景有不同的實(shí)現(xiàn)方式,并沒有什么嚴(yán)格的要求??梢哉J(rèn)為 Publisher 和 Subscription 共同配合完成了數(shù)據(jù)發(fā)布

其實(shí) Reactor 中 API 實(shí)現(xiàn)原理也都是這個(gè)套路,我這邊也自己寫了個(gè)例子便于讓讀者加深對(duì)響應(yīng)式編程的理解

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
 * @author tianwen.yin
 */
public class SimpleReactiveStream {

    /**
     * 實(shí)現(xiàn)一個(gè)簡(jiǎn)單的響應(yīng)式編程發(fā)布者
     * 邏輯:當(dāng)訂閱者發(fā)起訂閱時(shí),像下游發(fā)送一個(gè) HelloWorld,發(fā)布邏輯由 SimpleSubscription 完成
     */
    static class SimplePublisher implements Publisher {
        @Override
        public void subscribe(Subscriber s) {
            // 2. Publisher 發(fā)布數(shù)據(jù)之前,調(diào)用 Subscriber 的 onSubscribe
            s.onSubscribe(new SimpleSubscription(data(), s));
        }

        private String data() {
            return "Hello World";
        }
    }

    static class SimpleSubscriber implements Subscriber {
        @Override
        public void onSubscribe(Subscription s) {
            // 3. Subscriber 通過 Subscription#request 來請(qǐng)求數(shù)據(jù)
            // 或者 Subscription#cancel 來取消數(shù)據(jù)發(fā)布
            s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Object o) {
            System.out.println(o);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("error");
        }

        @Override
        public void onComplete() {
            System.out.println("complete");
        }
    }

    static class SimpleSubscription implements Subscription {
        String data;
        Subscriber actual;
        boolean isCanceled;

        public SimpleSubscription(String data, Subscriber actual) {
            this.data = data;
            this.actual = actual;
        }

        @Override
        public void request(long n) {
            if (!isCanceled) {
                try {
                    // 4. Subscription 在接收到訂閱者的調(diào)用后
                    // 通過 Subscriber#onNext 向下游訂閱者傳遞數(shù)據(jù)
                    actual.onNext(data);
                    // 5. 在數(shù)據(jù)發(fā)布完成后,調(diào)用 Subscriber#onComplete 結(jié)束本次流
                    actual.onComplete();
                } catch (Exception e) {
                    // 如果數(shù)據(jù)發(fā)布或者處理遇到錯(cuò)誤會(huì)調(diào)用 Subscriber#onError
                    actual.onError(e);
                }
            }
        }

        @Override
        public void cancel() {
            isCanceled = true;
        }
    }

    public static void main(String[] args) {
        // 1. Subscriber ”訂閱“ Publisher
        new SimplePublisher().subscribe(new SimpleSubscriber());
    }

}

響應(yīng)式編程思想

響應(yīng)式編程,就像裝配一條流水線。Publisher 規(guī)定了數(shù)據(jù)如何生產(chǎn),中間會(huì)有 Operators(操作符)對(duì)流水線的數(shù)據(jù)進(jìn)行解析,校驗(yàn),轉(zhuǎn)換等等操作,最終處理好的數(shù)據(jù)流轉(zhuǎn)到 Subscriber。

image.png

這條流水線還有一個(gè)特點(diǎn)。大部分情況下當(dāng) Publisher 的 subscribe 方法被調(diào)用之前,什么都不會(huì)發(fā)生。在被訂閱之前我們只是在定義流水線該如何工作,直到真正有人需要的時(shí)候,流水線才會(huì)啟動(dòng)。

Reactor 中的 Operator

Operators 怎么理解呢?對(duì)于上游來說,Operators 像一個(gè)訂閱者,而對(duì)于它的下游來說,它像一個(gè)發(fā)布者(我們上文說過了 Reactor 中的中間操作并沒有實(shí)現(xiàn) Processor 接口)

    Mono.just("hello")
            .map(a -> a + "world")
            .subscribe(System.out::println);

舉個(gè)簡(jiǎn)單的例子,在上面的代碼中,map 就是一個(gè) Operator,它的實(shí)現(xiàn)思路是什么?來看下面的代碼

    // 注意,這是我基于 Reactor API 實(shí)現(xiàn)的偽代碼!
    public static class MonoMap implements Publisher {
        // 我們自定義的轉(zhuǎn)換邏輯
        private Function mapper;
        // source 代表當(dāng)前操作符的上游發(fā)布者
        private Publisher source;

        public MonoMap(Publisher source, Function mapper) {
            this.source = source;
            this.mapper = mapper;
        }

        @Override
        public void subscribe(Subscriber actual) {
            source.subscribe(new MonoMapSubscriber(mapper, actual));
        }
    }

    public static class MonoMapSubscriber implements Subscriber {
        // 我們自定義的轉(zhuǎn)換邏輯
        private Function mapper;
        // 真正的下游
        private Subscriber actual;

        public MonoMapSubscriber(Function mapper, Subscriber actual) {
            this.mapper = mapper;
            this.actual = actual;
        }

        @Override
        public void onSubscribe(Subscription s) {
            actual.onSubscribe(s);
        }

        @Override
        public void onNext(Object o) {
            // 當(dāng)上游數(shù)據(jù)發(fā)送過來時(shí),先進(jìn)行轉(zhuǎn)換再發(fā)送給下游
            Object result = mapper.apply(o);
            actual.onNext(result);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }
    }

上述代碼是我自己實(shí)現(xiàn)的一個(gè)偽代碼,用于讓大家理解操作符的實(shí)現(xiàn)思路,實(shí)際 Reactor 代碼也是這個(gè)思路,只不過實(shí)現(xiàn)的更加巧妙和嚴(yán)謹(jǐn)

我們首先來分析一下 Mono.just("hello").map(a -> a + "world") 這句話

  1. 當(dāng)執(zhí)行到 Mono.just 時(shí),會(huì)新建一個(gè) MonoJust 對(duì)象作為當(dāng)前的 Publisher。該發(fā)布者的邏輯是,當(dāng)訂閱時(shí),向下游發(fā)送數(shù)據(jù) "hello"

  2. 當(dāng)執(zhí)行到 map 方法時(shí),會(huì)新建一個(gè) MonoMap 對(duì)象替作為當(dāng)前的 Publisher,MonoJust 成為了 MonoMap 中的一個(gè)屬性 source(實(shí)際的上游)

    • 當(dāng) MonoMap 被訂閱時(shí),會(huì)先將它的下游 actual 做一層包裝,也就是我們上面的 MonoMapSubscriber。然后去調(diào)用 source 的 subscribe 方法。上游發(fā)布數(shù)據(jù)時(shí),MonoMapSubscriber 先對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換(我們上面的拼接字符串操作),然后再發(fā)送給 actual(它的下游)

    • 當(dāng) MonoMap 被再次轉(zhuǎn)換時(shí),MonoMap 就變成了下游操作符的 source...

最后通過一張圖來總結(jié)一下

Operator 實(shí)現(xiàn)原理

Reactor 該如何學(xué)習(xí)

本文并沒有介紹太多 Reactor 的細(xì)節(jié),因?yàn)檫@些東西實(shí)在是太多了。我想聊聊我自己是如何學(xué)習(xí) Reactor 的

如果你已經(jīng)通過本文理解了響應(yīng)式編程的核心接口是如何工作的了,那恭喜你已經(jīng)邁向了成功的第一步了。接下來就是閱讀官方文檔,不斷的練習(xí)和閱讀 Reactor 的源碼。源碼追蹤的方向已經(jīng)很明確了,當(dāng)我們想了解一個(gè)發(fā)布者的實(shí)現(xiàn)原理是什么,我就要去關(guān)注這個(gè)發(fā)布者的 subscribe 方法和 Subscription 都做了什么。想了解消費(fèi)者的邏輯,就看它的 onNext,onComplete,onError。

最后

如果覺得我的文章對(duì)你有幫助,動(dòng)動(dòng)小手點(diǎn)下關(guān)注,你的支持是對(duì)我最大的幫助

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容