用 RSocket 解決響應(yīng)式服務(wù)之間的通訊-Part 1
作者:Rafa? Kowalski,原文 Reactive Service-to-Service Communication With RSocket

本文是《用 RSocket 解決響應(yīng)式服務(wù)之間的通訊》微型系列的第一篇文章,它將幫助你熟悉 RSocket——一種可能會徹底改變機(jī)器之間通訊的新二進(jìn)制協(xié)議。在以下各段中,我們首先討論當(dāng)前分布式系統(tǒng)的問題,然后說明如何使用 RSocket 解決這些問題。本文聚焦于微服務(wù)之間的通信與 RSocket 交互模型。
分布式系統(tǒng)中的通訊問題
確實(shí),微服務(wù)無處不在。從部署和維護(hù)非常麻煩的單體應(yīng)用程序到完全分布式、微型、可擴(kuò)展的微服務(wù),我們經(jīng)歷了漫長的過程。微服務(wù)架構(gòu)設(shè)計(jì)有很多好處。但是,它也有缺點(diǎn)。首先,為了向客戶交付最終產(chǎn)品,服務(wù)之間必須交換大量數(shù)據(jù)。在單體應(yīng)用程序中這不是問題,因?yàn)樗麄€(gè)通信都在單個(gè) JVM 進(jìn)程中進(jìn)行。而在“微服務(wù)架構(gòu)”中,部署在單獨(dú)的容器中服務(wù)需要通過內(nèi)部或外部網(wǎng)絡(luò)進(jìn)行通信。此時(shí),“網(wǎng)絡(luò)”是一等公民。如果在云上運(yùn)行應(yīng)用程序,事情將變得更加復(fù)雜。在這種情況下,網(wǎng)絡(luò)問題和延遲增加將是不可避免的事情。與其嘗試解決網(wǎng)絡(luò)問題,不如設(shè)計(jì)具有彈性的體系結(jié)構(gòu),讓其即使在網(wǎng)絡(luò)抖動的情況下也能完全正常運(yùn)行,這樣豈不是更好。
我們來更深入地研究下微服務(wù)、數(shù)據(jù)、通信和云的概念。試想一下,對于一般的企業(yè)級系統(tǒng),外部可以通過網(wǎng)站和移動 App 訪問,或者通過小型外部設(shè)備(如家用加熱控制器)與其進(jìn)行交互。這些系統(tǒng)都是由多個(gè)微服務(wù)組成,這些微服務(wù)大多數(shù)是用 Java 編寫的,其中一小部分是 Python 和 node.js 實(shí)現(xiàn)的組件,另外,為了確保整個(gè)系統(tǒng)高度可用,所有服務(wù)之間的傳輸數(shù)據(jù)都需要跨多個(gè)可用區(qū)進(jìn)行復(fù)制備份。
IaaS 層是不可控的,為了改善開發(fā)人員體驗(yàn),一般需要將應(yīng)用程序運(yùn)行在 PaaS 平臺之上。對于 PaaS 平臺,我們可以選擇:Cloud Foundry,Kubernetes 或兩者結(jié)合使用都是可行的。在服務(wù)之間的通信方面,設(shè)計(jì)比較簡單,每個(gè)組件都暴露普通的 REST API,如下圖所示。

乍一看,組件都被分散在云中運(yùn)行,這樣的體系結(jié)構(gòu)看起來還不錯(cuò)。額,它可能出什么問題?主要有兩個(gè)問題:它們都與通訊有關(guān)。
第一個(gè)問題是 HTTP 的請求/響應(yīng)交互模型。盡管使用 HTTP 的案例有很多,但它并不是為機(jī)器之間的通信而設(shè)計(jì)的。微服務(wù)在不關(guān)心操作結(jié)果的情況下將某些數(shù)據(jù)發(fā)送到另一個(gè)組件是很常見的(即發(fā)即棄),或者在數(shù)據(jù)可用時(shí)自動流傳輸數(shù)據(jù)(數(shù)據(jù)流)。使用 HTTP 請求/響應(yīng)交互模型難以用優(yōu)雅、有效的方式實(shí)現(xiàn)這些交互模式。例如,在使用請求/響應(yīng)交互模型時(shí),執(zhí)行簡單的即發(fā)即棄操作也會產(chǎn)生副作用,會出現(xiàn)即使客戶端對處理響應(yīng)不感興趣,服務(wù)器也必須將響應(yīng)發(fā)送回客戶端的問題。
第二個(gè)問題是性能。假設(shè)我們的系統(tǒng)被客戶大量使用,流量增加了,并且我們注意到我們正在努力處理每秒數(shù)百個(gè)請求。借助容器和云,我們可以輕松擴(kuò)展我們的服務(wù);但是,如果我們關(guān)注下資源消耗的情況,則會發(fā)現(xiàn)一些問題。例如,當(dāng)機(jī)器內(nèi)存會出現(xiàn)不足時(shí),可能 VM 的 CPU 還幾乎處于空閑狀態(tài)。這個(gè)問題主要來自于使用 HTTP 1.x 協(xié)議通常處理每個(gè)請求需要一個(gè)線程,致使每個(gè)請求都存在堆棧內(nèi)存。在這種情況下,我們可以利用反應(yīng)式編程模型和非阻塞 IO。它將在在不增加延遲的情況下大大減少內(nèi)存使用量。 HTTP 1.x 是基于文本的協(xié)議,因此與二進(jìn)制協(xié)議相比,需要傳輸?shù)臄?shù)據(jù)大小顯著增大。
在機(jī)器之間的通信中,我們不應(yīng)將自己局限于 HTTP(尤其是 1.x 版本,請求/響應(yīng)交互模型以及性能低下)。在市場上還有許多更合適、更強(qiáng)大的解決方案。例如,基于 RabbitMQ、gRPC 或者 HTTP 2(支持多路復(fù)用和二進(jìn)制化負(fù)載)進(jìn)行信息傳輸在性能和效率方面會比純 HTTP 1.x 更好。

在給定場景下,使用多種協(xié)議可以使我們最有效、最合適地連接微服務(wù);但是,采用多種協(xié)議迫使我們一次又一次地重新發(fā)明輪子,另外,為了保證保證通訊的安全性,我們不得不用安全性相關(guān)的額外信息來豐富我們的數(shù)據(jù);并且還需要創(chuàng)建多個(gè)適配器來處理協(xié)議之間的轉(zhuǎn)換。在某些情況下,數(shù)據(jù)傳輸可能需要依賴外部資源(代理、服務(wù)等),這些服務(wù)必須高度可用。因此,盡管我們所需要的只是基于消息的簡單“即發(fā)即棄”操作,但 HTTP 請求/響應(yīng)交互模型由于其性能比較差,產(chǎn)生額外的資源會帶來額外的成本。此外,多種不同的協(xié)議可能會引入與應(yīng)用程序治理相關(guān)的嚴(yán)重問題,尤其是如果我們的系統(tǒng)包含數(shù)百個(gè)微服務(wù)時(shí)。
上面提到的兩個(gè)核心問題是推出 RSocket 的原因,同時(shí)也是它可能徹底改變云通訊的原因。通過其反應(yīng)式和內(nèi)置的強(qiáng)大交互模型,RSocket可以應(yīng)用于各種業(yè)務(wù)場景中,并可能最終統(tǒng)一我們在分布式系統(tǒng)中使用的通信模式。
RSocket 如何解決
RSocket 是一種新的、消息驅(qū)動的二進(jìn)制協(xié)議,它規(guī)范了云中的通訊方式。它有助于以一致的方式解決常見的應(yīng)用程序問題,并且它支持多種語言(例如 Java、JS、Python)和傳輸協(xié)議(TCP、WebSocket、Aeron)。在下面的部分中,我們將深入探討協(xié)議內(nèi)部實(shí)現(xiàn)并討論交互模型。
基于幀和消息驅(qū)動
RSocket 中的傳輸?shù)男畔⒖梢苑纸鉃橐粋€(gè)個(gè)的幀。每個(gè)幀都包含一個(gè)幀頭,其中包含流 ID、幀類型定義和特定于該幀類型的其他數(shù)據(jù)。幀頭部后緊跟著元數(shù)據(jù)和有效負(fù)載(這些部分承載用戶指定的數(shù)據(jù))。

有多種類型的幀,它們表示不同的行為和交互模型的可用方法。我們將不討論相關(guān)所有內(nèi)容,因?yàn)樗鼈兊脑敿?xì)內(nèi)容在官方文檔中已有描述。不過,值得關(guān)注的信息可能不多,其中比較重要的有:客戶端在通信開始時(shí)需要給服務(wù)器發(fā)送“設(shè)置幀”——該“設(shè)置幀”在連接初始化期間可以自定義,自定義的內(nèi)容包括添加自己的安全規(guī)則或所需的其他信息。應(yīng)當(dāng)注意,在建立連接之后,RSocket 不會區(qū)分客戶端和服務(wù)端。每一側(cè)都可以開始將數(shù)據(jù)發(fā)送到另一側(cè)(這使協(xié)議幾乎完全對稱)。
性能
幀作為“字節(jié)流”發(fā)送。它使 RSocket 方式比典型的基于文本的協(xié)議更有效。從開發(fā)人員的角度來看,通過 JSON 格式在網(wǎng)絡(luò)中傳輸數(shù)據(jù)時(shí),調(diào)試系統(tǒng)更容易,但是它對性能是有影響的。RSocket 的協(xié)議不強(qiáng)加任何特定的序列化/反序列化機(jī)制,而是將幀視為可以轉(zhuǎn)換為任何東西的一串比特。這樣就可以使用 JSON 序列化或更高效的其他方案,如 Protobuf 或 AVRO。
影響 RSocket 性能的第二個(gè)因素是“多路復(fù)用”。該協(xié)議在單個(gè)物理連接上創(chuàng)建“邏輯流”(通道)。每個(gè)流都有其唯一的 ID,在某種程度上,可以將其理解為類似消息系統(tǒng)的消息隊(duì)列。這種設(shè)計(jì)解決了 HTTP 1.x 版本中已知的主要問題(每個(gè)請求模型獨(dú)占連接以及“流水線”的性能較弱)。此外,RSocket 原生支持大型有效負(fù)載的傳輸。在這種情況下,“有效載荷的幀”會被分成帶有額外標(biāo)志的多個(gè)幀(給定片段的序號)。
反應(yīng)式和流量控制
RSocket 協(xié)議完全包含《反應(yīng)式宣言》中所述的原則。它在異步特性和從某種意義上資源節(jié)約幫助用戶減少了所經(jīng)歷的延遲以及基礎(chǔ)設(shè)施的成本。由于流式傳輸,我們無需將數(shù)據(jù)從一個(gè)服務(wù)拉到另一個(gè)服務(wù),而是在數(shù)據(jù)可用時(shí)將其推送到相關(guān)服務(wù)。這是一個(gè)非常強(qiáng)大的機(jī)制,但它也可能具有風(fēng)險(xiǎn)。讓我們考慮一個(gè)簡單的場景:在我們的系統(tǒng)中,我們將事件從服務(wù) A 傳輸?shù)椒?wù) B。在接收方 B 執(zhí)行的操作很簡單,但需要一定的計(jì)算時(shí)間。如果服務(wù) A 推送事件的速度快于 B 處理事件的速度,則最終 B 將耗盡資源(發(fā)送方將終止接收方)。RSocket 是 Reactor 模式的,它內(nèi)置了對“流控制”的支持,這有助于避免這種情況。
通過“背壓機(jī)制”的實(shí)現(xiàn),我們可以輕松根據(jù)需要進(jìn)行調(diào)整。接收方可以指定要消費(fèi)多少數(shù)據(jù),而不會收到更多數(shù)據(jù),除非它通知發(fā)送方準(zhǔn)備處理更多數(shù)據(jù)。另一方面,為了限制來自請求者的傳入幀數(shù),RSocket 實(shí)現(xiàn)了一種“租約機(jī)制”。響應(yīng)者可以指定請求者可以在定義的時(shí)間范圍內(nèi)發(fā)送多少個(gè)請求。
RSocket 接口
如上所述,RSocket 是 Reactor 模式的,因此在 API 級別上,我們主要在Mono和Flux對象上進(jìn)行操作。它也完全支持反應(yīng)性信號–我們可以輕松地對不同事件(onNext,onError,onClose等)執(zhí)行“反應(yīng)”。
以下各段將介紹 RSocket 的 API 可用的每個(gè)交互選項(xiàng)。下面介紹將以代碼片段和所有示例的描述為根本。在講解交互模型之前,有必要介紹一下API基礎(chǔ)知識,因?yàn)樗鼘⒃诙鄠€(gè)代碼示例中提出。
RSocketFactory.connect()
.transport(TcpClientTransport.create(HOST, PORT))
.start()
.subscribe();
RSocketFactory.receive()
.acceptor(new HelloWorldSocketAcceptor())
.transport(TcpServerTransport.create(HOST, PORT))
.start()
.subscribe();
對于響應(yīng)方,我們必須創(chuàng)建一個(gè)套接字接受器實(shí)例。 SocketAcceptor 是提供兩方之間契約的接口。它只有一個(gè)方法,該方法接受 RSocket 發(fā)送請求,并返回一個(gè) RSocket 實(shí)例,該實(shí)例將用于處理來自對方的請求。除了提供契約外,SocketAcceptor 還使我們能夠訪問配置幀的內(nèi)容。在 API 級別,它由 ConnectionSetupPayload 類型的參數(shù)來設(shè)置。
public interface SocketAcceptor {
Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket);
}
如上所示,在雙方之間建立連接是相對非常容易,特別是對于之前使用過 WebSocket 的同學(xué)來說,就 API 而言,兩種解決方案都非常相似。
交互模型
建立連接后,我們可以繼續(xù)了解其交互模型。 RSocket 支持以下操作:

“即發(fā)即忘(fire and forget)”,或者“元數(shù)據(jù)推送(metadata push)”,旨在將數(shù)據(jù)從發(fā)送方推送到接收方。在這兩種場景下,發(fā)送者都不在乎操作的結(jié)果(在 API 上它的返回類型是Mono)。這兩者之間的區(qū)別在于幀的處理。“即發(fā)即忘”,將完整的幀發(fā)送到接收方,而對于元數(shù)據(jù)推送操作,該幀不具有有效負(fù)載-它僅含有頭部和元數(shù)據(jù)。此類輕量級消息可用于將通知發(fā)送到點(diǎn)對點(diǎn)通信的 IoT 設(shè)備或者移動設(shè)備。 RSocket 還能夠模仿 HTTP 行為。它支持請求 / 響應(yīng)(request-response)場景,這可能是你使用 RSocket 主要交互類型。在流式場景中,此類操作可以表示為由單個(gè)對象組成的流。在這種情況下,客戶端正在等待響應(yīng)幀,但是它是以完全“非阻塞”的方式進(jìn)行響應(yīng)的。
更有趣的是,云平臺上應(yīng)用程序通常通過“請求流(request stream)”和“請求通道(request channel)”的方式對數(shù)據(jù)流(通常是無限的)進(jìn)行操作。在請求流方式下,請求方將單個(gè)幀發(fā)送到響應(yīng)方,并獲取數(shù)據(jù)流。這種交互方式使服務(wù)能夠從“拉數(shù)據(jù)”切換為“推數(shù)據(jù)”策略。無需向響應(yīng)者發(fā)送定期請求,請求方可以訂閱流并對收到的數(shù)據(jù)做出反應(yīng)(當(dāng)數(shù)據(jù)可用時(shí),它將自動到達(dá))。
由于多路復(fù)用和雙向數(shù)據(jù)傳輸?shù)闹С?,我們未來可以使?strong>“請求通道(request channel)”方式。RSocket 可以使用單個(gè)物理連接將數(shù)據(jù)從請求方傳輸?shù)巾憫?yīng)方,反之亦然。當(dāng)請求方更新訂閱時(shí)(如,更改訂閱規(guī)則),這種交互方式可能很有用。如果沒有雙向通道,客戶端將不得不取消流并使用新參數(shù)重新請求它。
在 API 中,交互模型的所有操作均由下面顯示的 RSocket 接口的方法表示。
public interface RSocket extends Availability, Closeable {
Mono<Void> fireAndForget(Payload payload);
Mono<Payload> requestResponse(Payload payload);
Flux<Payload> requestStream(Payload payload);
Flux<Payload> requestChannel(Publisher<Payload> payloads);
Mono<Void> metadataPush(Payload payload);
}
為了改善開發(fā)人員的體驗(yàn)并避免實(shí)現(xiàn) RSocket 接口的每個(gè)接口方法,API 提供了我們可以擴(kuò)展的抽象接口類AbstractRSocket。通過將 SocketAcceptor 和 AbstractRSocket 放在一起,我們可以得到了服務(wù)器端的基本實(shí)現(xiàn),如下:
@Slf4j
public class HelloWorldSocketAcceptor implements SocketAcceptor {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
log.info("Received connection with setup payload: [{}] and meta-data: [{}]", setup.getDataUtf8(), setup.getMetadataUtf8());
return Mono.just(new AbstractRSocket() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
log.info("Received 'fire-and-forget' request with payload: [{}]", payload.getDataUtf8());
return Mono.empty();
@Override
public Mono<Payload> requestResponse(Payload payload) {
log.info("Received 'request response' request with payload: [{}] ", payload.getDataUtf8());
return Mono.just(DefaultPayload.create("Hello " + payload.getDataUtf8()));
@Override
public Flux<Payload> requestStream(Payload payload) {
log.info("Received 'request stream' request with payload: [{}] ", payload.getDataUtf8());
return Flux.interval(Duration.ofMillis(1000))
.map(time -> DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now()));
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads)
.doOnNext(payload -> {
log.info("Received payload: [{}]", payload.getDataUtf8());
})
.map(payload -> DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now()))
.subscribeOn(Schedulers.parallel());
@Override
public Mono<Void> metadataPush(Payload payload) {
log.info("Received 'metadata push' request with metadata: [{}]", payload.getMetadataUtf8());
return Mono.empty();
}
});
}
}
在發(fā)送方,使用交互模型非常簡單,我們需要做的就是在我們使用 RSocketFactory 創(chuàng)建的 RSocket 實(shí)例上調(diào)用特定方法,例如
socket.fireAndForget(DefaultPayload.create("Hello world!"));</pre>
有關(guān) RSocket 交互模型中可用方法的更多示例,請?jiān)L問 GitHub
發(fā)送方更有趣的是背壓機(jī)制的實(shí)現(xiàn)。我們來看下發(fā)送方實(shí)現(xiàn)示例:
public class RequestStream {
public static void main(String[] args) {
RSocket socket = RSocketFactory.connect()
.transport(TcpClientTransport.create(HOST, PORT))
.start()
.block();
socket.requestStream(DefaultPayload.create("Jenny", "example-metadata"))
.limitRequest(100)
.subscribe(new BackPressureSubscriber());
socket.dispose();
}
@Slf4j
private static class BackPressureSubscriber implements Subscriber<Payload> {
private static final Integer NUMBER_OF_REQUESTS_TO_PROCESS = 5;
private Subscription subscription;
int receivedItems;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(NUMBER_OF_REQUESTS_TO_PROCESS);
}
@Override
public void onNext(Payload payload) {
receivedItems++;
if (receivedItems % NUMBER_OF_REQUESTS_TO_PROCESS == 0) {
log.info("Requesting next [{}] elements");
subscription.request(NUMBER_OF_REQUESTS_TO_PROCESS);
}
}
@Override
public void onError(Throwable t) {
log.error("Stream subscription error [{}]", t);
}
@Override
public void onComplete() {
log.info("Completing subscription");
}
}
}
在此示例中,我們正在請求數(shù)據(jù)流,但是為了確保返回的幀數(shù)據(jù)不會壓垮請求方,我們采用了背壓機(jī)制。為了實(shí)現(xiàn)這種機(jī)制,我們使用指定請求數(shù)量的方式,它在API級別上由subscription.request(n)方法反映出來。在訂閱 onSubscribe(Subscription s) 方法的開始處,我們請求 5 個(gè)數(shù)據(jù),然后在onNext(Payload payload)中統(tǒng)計(jì)接收到的數(shù)量。當(dāng)所有預(yù)期幀都到達(dá)請求方時(shí),我們再請求接下來的 5 個(gè)數(shù)據(jù)(再次使用subscription.request(n)方法)。下圖顯示了該訂戶的流程:

本段介紹的背壓機(jī)制的實(shí)現(xiàn)非?;A(chǔ)。在生產(chǎn)中,我們應(yīng)提供一個(gè)基于更準(zhǔn)確的統(tǒng)計(jì)指標(biāo)的完善解決方案。例如,預(yù)測/平均消費(fèi)時(shí)間等。畢竟,背壓機(jī)制不會讓響應(yīng)方生產(chǎn)過剩的問題消失。它只是將問題轉(zhuǎn)移到響應(yīng)方,來更好地解決問題。有關(guān)背壓的更多信息,請參見這里。
總結(jié)
在本文中,我們討論了微服務(wù)體系結(jié)構(gòu)中的通信問題,以及如何通過 RSocket 解決這些問題。我們以一個(gè)簡單的“ hello world”示例和基本的背壓機(jī)制實(shí)現(xiàn)為背景,介紹了其 API 和交互模型。
在本系列的后續(xù)文章中,我們將介紹 RSocket 的更高級的特性,包括負(fù)載均衡和可恢復(fù)性,以及我們將討論基于 RSocket 進(jìn)行抽象,實(shí)現(xiàn) RPC and Spring Reactor。
個(gè)人微信公共號,感興趣的關(guān)注下,獲取更多技術(shù)文章
