【一起學習Reactor】響應式編程簡介

IMG_3558.JPG

Reactor是響應式編程規(guī)范的一個實現(xiàn),維基百科對響應式編程的總結如下:

響應式編程是一種異步編程范例,主要關注數(shù)據(jù)流和數(shù)據(jù)變化通知。這意味著可以使用編程語言輕松表達靜態(tài)(數(shù)組)或動態(tài)(事件發(fā)射器)數(shù)據(jù)流。更多有關響應式編程的描述可以參考Reactive programming

響應式編程邁出的第一步是微軟在.NET系統(tǒng)中創(chuàng)建了響應式擴展庫(Rx)。在微軟創(chuàng)建Rx之后,RxJava在JVM上實現(xiàn)了響應式編程。隨著時間推移,經(jīng)過Reactive Streams的不斷努力制定了Java實現(xiàn)響應式編程的規(guī)范,規(guī)范為JVM上的響應式庫定義了一組接口和交互規(guī)則。Java 9的Flow類已經(jīng)實現(xiàn)了規(guī)范定義的接口(從Java 9 開始,java開始默認支持響應式編程,有條件的小伙伴該考慮升級Java版本了)。

在面向對象的編程語言中,響應式編程通常作為觀察者模式的一種擴展。如果對比迭代器設計模式和主流的響應式流模式對比,會發(fā)現(xiàn)在幾乎所有的庫中Iterable-Iterator 都有雙重性(可以互相轉換)。兩者主要的區(qū)別是:迭代器設計模式基于,響應式流基于。

迭代器是命令式編程模式,盡管訪問數(shù)據(jù)的方法僅由Iterable負責。實際上在使用迭代器時由開發(fā)者決定何時選擇序列中的next()元素。在響應式流中,和上面Iterable-Iterator對應的是Publisher-Subscriber,新值出現(xiàn)時Publisher 會通知Subscriber ,推送是響應的關鍵。同樣,對推送值的操作是聲明式而不是命令式,代碼表達計算的邏輯,而不是描述其精確的控制流。

響應式流除了推送值之外,同樣以良好的方式定義了錯誤處理和操作完成。一個Publisher可以向其Subscriber推送新的值,也可以發(fā)送錯誤信號或者完成信號。錯誤信號和完成信號都會終止序列,下面的表達式準確簡練的描述了這個邏輯:

onNext x 0..N [onError | onComplete]

這種模式非常的靈活,可以支持沒有值,一個值或n個值(包括無限序列,比如時間)。但是為什么我們首先需要這樣一個異步響應式庫呢?

阻塞是一種浪費

現(xiàn)在應用有大量的并發(fā)用戶,盡管現(xiàn)代化硬件的能力在不斷提升,但是軟件性能依然是一個關鍵問題。有兩種方法可以提高軟件的性能:

  • 并行使用更多的線程和更多的硬件資源,
  • 提高現(xiàn)有資源的使用率。

通常,Java 開發(fā)者使用阻塞代碼開發(fā)程序,這種方法在沒有性能瓶頸之前非常的完美,因為阻塞代碼更容易理解也更容易編寫。當程序出現(xiàn)性能瓶頸時,引入另外的線程來運行相同的阻塞代碼(活多了需要加人)。但是,資源的這種擴展會迅速引入競爭和并發(fā)問題。更糟糕的是,阻塞會浪費資源。如果程序遇到延遲(特別是I/O操作,比如數(shù)據(jù)庫請求或網(wǎng)絡調用),因為線程需要等待數(shù)據(jù)而處于空閑狀態(tài)進而導致資源的浪費。

因此,并行不是靈丹妙藥。充分使用硬件的能力十分必要,但是推理過程十分復雜而且更加容易造成浪費。

異步是一副良藥嗎?

通過編寫異步、非阻塞代碼,可以切換到另一個活動的使用相同基礎資源的任務并在異步處理完成后返回到當前的任務。通過異步代碼可以提高資源的使用率,減少資源浪費。

Java提供了下面兩種異步編程模型:

  • Callbacks: 異步方法沒有返回值,但是需要一個額外的callback參數(shù)(可以是lambda表達式或匿名類),當結果可用時會被調用。
  • Futures::方法立即返回一個Future<T> 。異步計算T的值,但是Future 封裝了對T值的訪問。T值可能不是立即可用,而且Future對象支持輪詢直到值T可用。Java的ExecutorService 執(zhí)行Callable<T> 時返回一個Future 對象。

Java提供的這兩種編寫異步代碼的技術足夠好了嗎?這些技術很好,但并不適用于每一種場景,而且都有各自的局限性。Callbacks的缺點是很難組合在一起使用,而且多個回調組合在一起使用時,代碼很快就會變的難以閱讀和維護(通常稱為回調地獄)。

下面以在用戶界面顯示用戶前五個收藏夾為樣例說明Callbacks的局限性。業(yè)務場景為:顯時用戶前五個收藏夾,如果用戶沒有收藏夾則顯示建議。

userService.getFavorites(userId, new Callback<List<String>>() { // 1
  public void onSuccess(List<String> list) { // 2
    if (list.isEmpty()) { // 3
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { // 4
          UiUtils.submitOnUiThread(() -> { // 5
            list.stream()
                .limit(5)
                .forEach(uiList::show); // 6
            });
        }

        public void onError(Throwable error) { // 7
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() //8
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, // 9
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

基于callback的實現(xiàn)有很多的代碼,這些代碼難以理解,要想一步一步弄懂邏輯也比較困難,而且代碼還有部分重復。下面是基于Reactor的等價實現(xiàn):

userService.getFavorites(userId)
           .flatMap(favoriteService::getDetails)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);

基于callback的代碼實現(xiàn)如果要增加超時邏輯會十分的困難,但是基于Reactor的實現(xiàn)只要使用timeout方法即可輕松完成:

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800))
           .onErrorResume(cacheService.cachedFavoritesFor(userId))
           .flatMap(favoriteService::getDetails)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);

Future對象比回調好一點,但它們在組合方面仍然做得不好,盡管CompletableFuture在Java 8中帶來了改進。編排多個Future 對象可行但是并不容易。除此之外,Future 還有其他問題:

  • 調用get() 方法很容易導致Future 對象阻塞,
  • 不支持懶加載/懶計算,
  • 它們缺乏對多值和高級錯誤處理的支持。

考慮另一個例子:我們獲得一個id列表,我們想從中獲取一個名稱和一個統(tǒng)計信息,并將它們成對組合,所有這些都是異步的.。下面的代碼使用一個 CompletableFuture列表來實現(xiàn)該功能:

CompletableFuture<List<String>> ids = ifhIds();

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
    Stream<CompletableFuture<String>> zip =
            l.stream().map(i -> {
                CompletableFuture<String> nameTask = ifhName(i);
                CompletableFuture<Integer> statTask = ifhStat(i);

                return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);
            });
    List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
    CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

    CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
    return allDone.thenApply(v -> combinationList.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()));
});

List<String> results = result.join();
assertThat(results).contains(
        "Name NameJoe has stats 103",
        "Name NameBart has stats 104",
        "Name NameHenry has stats 105",
        "Name NameNicole has stats 106",
        "Name NameABSLAJNFOAJNFOANFANSF has stats 121");

由于Reactor有更多的可開箱的組合操作符,上面的過程可以簡化如下:

Flux<String> ids = ifhrIds();

Flux<String> combinations =
        ids.flatMap(id -> {
            Mono<String> nameTask = ifhrName(id);
            Mono<Integer> statTask = ifhrStat(id);

            return nameTask.zipWith(statTask,
                    (name, stat) -> "Name " + name + " has stats " + stat);
        });

Mono<List<String>> result = combinations.collectList(); 

List<String> results = result.block(); 
assertThat(results).containsExactly(
        "Name NameJoe has stats 103",
        "Name NameBart has stats 104",
        "Name NameHenry has stats 105",
        "Name NameNicole has stats 106",
        "Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);

使用回調和Future對象的危險是相似的,這也是響應式編程通過Publisher-Subscriber對解決的問題。

3.3. 從命令式編程到響應式編程

諸如Reactor的響應式編程庫旨在解決JVM上“經(jīng)典”異步方法的缺點,同時也著重對以下方面進行改進:

  • 可組合性可讀性
  • 把數(shù)據(jù)當做流處理,同時提供豐富的操作方法,
  • 訂閱之前不會發(fā)生任何事情,
  • 背壓消費者向生產(chǎn)者發(fā)送信號通知數(shù)據(jù)生產(chǎn)速率過高或過低的能力,
  • 對并發(fā)不可知更高價值和更高級的抽象。

3.3.1. 可組合性和可讀性

“可組合性”指的是編排多個異步任務的能力,可以將前面任務的結果作為后續(xù)任務的輸入。當然也可以以fork-join的方式運行多個任務。此外,我們可以在更高級的系統(tǒng)中把異步任務作為離散組件重用。

編排任務的能力與代碼的可讀性和可維護性緊密相關。隨著異步處理層的數(shù)量和復雜性的增加,編寫和閱讀代碼變得越來越困難。正如我們所見,callback模型十分簡單,但是callback一個缺點就是處理變的復雜,一個callback需要在另外一個callback中執(zhí)行,這樣一層一層的嵌套。這就是“回調地獄”,這種代碼難以閱讀和分析邏輯。

Reactor提供了豐富的組合操作,代碼可以反應對處理過程抽象的組織,一切盡量保持在同一層(盡量減少嵌套,這也是和callback模式相比最大的改進之一)。

3.3.2. 類比工廠的生產(chǎn)線

數(shù)據(jù)在響應式程序中的處理過程,可以被看作是數(shù)據(jù)在組裝流水線中移動。Reactor既是傳送帶又是工作站。原材料從來源(第一個Publisher)傾瀉而出(中間經(jīng)過多道工序加工),最終成為可以推送給消費者(Subscriber)的成品。

原材料可以經(jīng)過各種轉換和其他中間步驟,也可以成為將中間零件組裝在一起的更大裝配線的一部分。如果某一點出現(xiàn)故障或堵塞(某到工序耗時長),那么出問題的工作站可以向上游發(fā)出信號,以限制原材料的流動(有問題及時向上游反饋,上游做出響應,避免進一步惡化)。

3.3.3. Operators

在Reactor中,operator就是流水線中的工作站。每個operator都會將行為添加到Publisher 中,并將上一步的Publisher 包裝到新實例中。這樣構建了一個完整的鏈接,數(shù)據(jù)從第一個Publisher 向下游移動并由每一個鏈接進行轉換,最后,由一個Subscriber 結束數(shù)據(jù)的數(shù)據(jù)處理過程。在Subscriber 訂閱之前數(shù)據(jù)不會被處理也不會向下游移動。

盡管響應式流規(guī)范根本沒有定義operator,但是像Reactor的響應式庫提供的最佳附加值之一就是提供了豐富的operator,從簡單的轉換和過濾到復雜的編排和錯誤處理,這些內容涉及很多的領域。

3.3.4. 不 subscribe()不會發(fā)生任何事情

在Reactor中,當你編寫了一個Publisher 鏈,默認數(shù)據(jù)不會注入。實際上只是創(chuàng)建了一個異步處理過程的抽象描述(這有助于重用和組合)。通過subscribing 動作,將Publisher 綁定到Subscriber ,這會觸發(fā)數(shù)據(jù)在整個鏈路中移動。內部實現(xiàn)是通過Subscriber 發(fā)送Request 信號,信號被傳播到上游一直到Publisherrequest 也是實現(xiàn)背壓的關鍵方法。

3.3.5. 背壓

向上游傳播信號通常用來實現(xiàn)背壓,在和流水線的類比中描述為當工作站處理比上游工作站的處理速度慢時,沿著流水線向上游反饋。背壓其實就是下游向上游發(fā)送信號,并影響上游數(shù)據(jù)處理的一種機制。

響應式流規(guī)范定義的實際機制可以簡單的概括為:一個subscriber可以以“無界” 模式工作,并讓數(shù)據(jù)源以最快的速度推送所有數(shù)據(jù),或者使用request 機制向數(shù)據(jù)源發(fā)送信息,向數(shù)據(jù)源反饋已經(jīng)準備好處理n個元素。

中間operator可以在中途改變request。想象一下一個buffer 以十個元素為一組將元素進行分組。如果subscriber請求一個buffer,數(shù)據(jù)源發(fā)送十個元素是可以被接受。一些operator也實現(xiàn)了預拉取策略 ,這避免了request(1) 不斷往返。如果在請求之前生成元素的成本很低,這種操作就非常的有幫助,可以顯著的提高處理效率。

這會將推模式轉換為推拉混合模式,如果上游已經(jīng)準備了數(shù)據(jù),下游則可以從上游獲取n個元素。但是如果數(shù)據(jù)還沒有準備好,那么當有數(shù)據(jù)時上游就會將數(shù)據(jù)推送到下游。

3.3.6. Hot vs Cold

Rx響應式庫家族將響應序列分為兩大類:“熱”和“冷”。這種區(qū)別主要與響應式流對subscriber的響應有關:

  • 對于每一個Subscriber,包括在數(shù)據(jù)源位置,冷序列都會重新開始。例如,如果源包裝了HTTP調用,則將為每個subscription發(fā)出一個新的HTTP請求。
  • 對于每一個Subscriber ,熱序列并非都會從頭開始。相反,后面的subscriber只能收到訂閱完成之后產(chǎn)生的數(shù)據(jù)。但是一些熱響應式流可以緩存或者對歷史數(shù)據(jù)全部或部分重放,也就是說遲來的subscriber可以收到在完成訂閱動作之前的數(shù)據(jù)。從一般的角度來看,即使沒有訂閱者在訂閱數(shù)據(jù),熱序列甚至會發(fā)出數(shù)據(jù)(“訂閱之前什么也沒有發(fā)生”規(guī)則的例外)。
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容