Reactive Streams

Reactive Streams + RxJava

為了標(biāo)準(zhǔn)化JVM上的異步流協(xié)議,響應(yīng)流已經(jīng)是共同努力的成果,. RxJava團(tuán)隊(duì)從一開(kāi)始就參與了這項(xiàng)工作,并支持使用響應(yīng)流api,并最終支持Java 9流api,這些結(jié)果與團(tuán)隊(duì)的努力息息相關(guān)。

這與RxJava本身有什么關(guān)系?

RxJava1.x

目前,Rxjava 1.x 不直接實(shí)現(xiàn) 響應(yīng)流的接口。這是由于Rxjava 1.x 已經(jīng)存在這些api且不能破壞公共的api。 然而它確實(shí)在語(yǔ)義上遵守非阻塞近似于背壓和流控制的“響應(yīng)式牽引”。且因此可以用作兩種類(lèi)型之間的橋梁。為了響應(yīng)流實(shí)現(xiàn)之間的交互協(xié)作和通過(guò)響應(yīng)流TCK兼容性測(cè)試,Rxjava響應(yīng)流模型在RxJava1.x 類(lèi)型和 響應(yīng)流類(lèi)型建立了一個(gè)橋梁。它的API像下面這樣:

package rx;

import org.reactivestreams.Publisher;

public  abstract  class  RxReactiveStreams {

  public  static  <T>  Publisher<T>  toPublisher(Observable<T>  observable) { … }

  public  static  <T>  Observable<T>  toObservable(Publisher<T>  publisher) { … }

}

RxJava 2.x

RxJava2.x 將直接針對(duì)Java 8+ 的響應(yīng)流API。這個(gè)計(jì)劃通過(guò)利用新的多版本Java jar包來(lái)支持Java9 j.u.c.Flow 類(lèi)型

由于這里有一個(gè)接口去擴(kuò)展,Rxjava2 將真正是“響應(yīng)式擴(kuò)展”。RxJava 1.x 沒(méi)有一個(gè)基礎(chǔ)接口 或者 協(xié)議來(lái)擴(kuò)展 因此 必須通過(guò) 口頭來(lái)定義它。RxJava2 希望 基于一種 高性能,經(jīng)久沙場(chǎng)的,輕量級(jí)(響應(yīng)流單一依賴(lài)),非固執(zhí)的響應(yīng)流的實(shí)現(xiàn) 且 j.u.cFlow 提供一個(gè)帶參數(shù)化并發(fā)的高階函數(shù)的庫(kù)。

公共庫(kù)的API

響應(yīng)流的一個(gè)巨大的價(jià)值是它的庫(kù)公開(kāi)的公共API。當(dāng)解耦具體的實(shí)現(xiàn)時(shí),如下是一些指引和建議關(guān)于如何使用響應(yīng)流和RxJava 來(lái)創(chuàng)建響應(yīng)庫(kù)。

公開(kāi)響應(yīng)式API的優(yōu)點(diǎn)而不是Rxjava的。

輕量級(jí):接口上沒(méi)有任何具體的實(shí)現(xiàn),且非常輕量級(jí)的依賴(lài)。這能讓依賴(lài)圖和字節(jié)位很小。

前瞻性:由于響應(yīng)式API如此簡(jiǎn)單且正成為JDK9中的一部分。在公開(kāi)異步訪問(wèn)數(shù)據(jù)方面是具有前瞻性的。JDK9 的 j.u.c.Flow 的API 匹配 響應(yīng)流的API ,所以任何實(shí)現(xiàn) 響應(yīng)流的Publisher 將也能夠?qū)崿F(xiàn) Flow.Publisher 類(lèi)型。

交互性:一個(gè)以響應(yīng)流類(lèi)型公開(kāi)的API 能輕易的被任何實(shí)現(xiàn)消費(fèi),例如RxJava,Akka,Reactor.

公開(kāi)響應(yīng)流API的缺點(diǎn)而不是Rxjava的

一個(gè)響應(yīng)流 Publisher本身不是非常有用,如果沒(méi)有像flatMap那樣的高階函數(shù),那么 它僅僅是一個(gè)更好的回調(diào)。這意味著 消費(fèi)Publsher 幾乎將總是需要去轉(zhuǎn)化或者封裝到一個(gè)響應(yīng)流的實(shí)現(xiàn)當(dāng)中。總是將具體的實(shí)現(xiàn)包裝到Publisher中是啰嗦和笨拙的。如果JVM支持?jǐn)U展函數(shù)將會(huì)變的優(yōu)雅,因?yàn)樗皇蔷唧w的和啰嗦的。

具體來(lái)說(shuō),響應(yīng)流和 Flow Publisher 接口不提供任何操作符的實(shí)現(xiàn),例如 flatmap,merge,filter,take,zip 和 許多其他的 用于組合 和轉(zhuǎn)換異步流的操作符 。 一個(gè)Publisher 僅僅用于被訂閱。

響應(yīng)流規(guī)范和二進(jìn)制構(gòu)件不提供具體的Publisher實(shí)現(xiàn)。一般而言,一個(gè)庫(kù)需要被RxJava 或者 Akka 等流提供能力為了它的內(nèi)部使用或者僅僅提供一個(gè)可用的支持背壓語(yǔ)義(這對(duì)正確實(shí)現(xiàn)是意義非凡的)的Publisher。

推薦的方法:

既然 響應(yīng)流 已經(jīng)實(shí)現(xiàn)了1.0 我們建議使用它作為核心api,這些api是用于交互的。這將會(huì)允許采用不帶強(qiáng)依賴(lài)的異步流語(yǔ)義 在任何單一的實(shí)現(xiàn)上。這也意味著這些API的消費(fèi)者能夠選用RxJava1.x ,RxJava2.x Akka流,Reactor 或者最合適這些api的其他流組合庫(kù)。 它也有更好的前瞻性,例如 從RxJava 1.x 移到 2.x 的例子。

然而,為了避免以上所列的缺點(diǎn),我們也建議讓消費(fèi)它們變的容易,即開(kāi)發(fā)者不需要明確的封裝他們選擇的組合庫(kù)的API. 因此我們建議在最核心的api上為流行的響應(yīng)流實(shí)現(xiàn)提供封裝的模塊,否則你的消費(fèi)者將需要自己做這些事情。

注意,如果java提供了擴(kuò)展函數(shù),那么這個(gè)方法不是必要的,但是,在Java提供這一點(diǎn)之前(如果不是很快的話),以下是一種利用優(yōu)點(diǎn)和彌補(bǔ)缺點(diǎn)的方法。

例如:一個(gè)數(shù)據(jù)庫(kù)驅(qū)動(dòng)可能有如下一些模塊:

//公開(kāi)響應(yīng)流 Publisher APIs的核心庫(kù)

  • async-database-driver

// 封裝具體實(shí)現(xiàn)的集成jar包

  • async-database-driver-rxjava1
  • async-database-driver-rxjava2
  • async-database-driver-akka-stream

核心庫(kù)可能公開(kāi)像下面這樣的API:

package com.database.driver;

public  class  Database {

  public org.reactivestreams.Publisher getValue(String key);

}

RxJava 1.x包裝器可以是一個(gè)單獨(dú)的模塊,它提供像這樣的RxJava特定api

package com.database.driver.rxjava1;

public  class  Database {

  public rx.Observable getValue(String key);

}

核心Publisher API能被封裝的像這樣簡(jiǎn)單:

public rx.Observable getValue(String key) {

  return RxReactiveStreams.toObservable(coreDatabase.getValue(key));

}

RxJava2.x 封裝將與這不同(一單2.x是可用的)

package com.database.driver.rxjava2;

public  class  Database {

  public io.reactivex.Observable getValue(String key);

}

Akka 流封裝將變成這樣:

package com.database.driver.akkastream;

public  class  Database {

  public akka.stream.javadsl.Source getValue(String key);

}

開(kāi)發(fā)者可以選擇直接依賴(lài) async-database-driver 的API ,但是大多數(shù)將使用其中一個(gè)包裝器,這個(gè)封裝器支持他們選擇的組合庫(kù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容