作者:Rafa? Kowalski ,原文:Reactive Service to Service Communication With RSocket (Part 3): Abstraction Over RSocket
RSocket 系列的第三篇

如果你看過(guò)本系列的前兩篇文章,應(yīng)該已經(jīng)已經(jīng)發(fā)現(xiàn) RSocket 提供了一些底層的 API。可以直接使用交互模型中的方法進(jìn)行操作,而且可以沒(méi)有任何限制來(lái)回發(fā)送幀。這些基礎(chǔ)的 API 為我們提供了許多自由和控制權(quán),但是它可能會(huì)引入額外的問(wèn)題,尤其是與微服務(wù)之間的契約相關(guān)的問(wèn)題。
為了解決這些問(wèn)題,我們可以使用 RSocket 作為通用抽象層。有兩種可用的解決方案:RSocket RPC 或者與 Spring Framework集成。在以下各節(jié)中,我們將簡(jiǎn)要討論它們。
基于 RSocket 的 RPC
保持微服務(wù)之間的契約干凈清晰是分布式系統(tǒng)的關(guān)鍵問(wèn)題之一。為了確保應(yīng)用程序可以交換數(shù)據(jù),我們可以利用 RPC(遠(yuǎn)程過(guò)程調(diào)用)。幸運(yùn)的是,RSocket 具有專用的 RPC 模塊,它使用 Protobuf 作為序列化工具,因此,我們可以從 RSocket 的性能中受益并且同時(shí)具有保持契約的能力。通過(guò)將生成的服務(wù)和對(duì)象與 RSocket 接受器結(jié)合在一起,我們可以啟動(dòng)完全可操作的 RPC 服務(wù)端,并使用 RPC 客戶端輕松使用它。
首先,我們需要定義服務(wù)和對(duì)象。在下面的示例中,我們創(chuàng)建了具有四個(gè)方法的簡(jiǎn)單的CustomerService服務(wù),它們每個(gè)表示交互模型相互不同的方法。
syntax = "proto3";
option java_multiple_files = true;
option java_outer_classname = "ServiceProto";
package com.rsocket.rpc;
import "google/protobuf/empty.proto";
message SingleCustomerRequest {
string id = 1;
}
message MultipleCustomersRequest {
repeated string ids = 1;
}
message CustomerResponse {
string id = 1;
string name = 2;
}
service CustomerService {
rpc getCustomer(SingleCustomerRequest) returns (CustomerResponse) {} //request-response
rpc getCustomers(MultipleCustomersRequest) returns (stream CustomerResponse) {} //request-stream
rpc deleteCustomer(SingleCustomerRequest) returns (google.protobuf.Empty) {} //fire'n'forget
rpc customerChannel(stream MultipleCustomersRequest) returns (stream CustomerResponse) {} //request-channel
}
其次,我們必須使用上面顯示的 proto 文件來(lái)生成類。為此,我們可以創(chuàng)建一個(gè) Gradle 任務(wù),如下所示:
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.6.1'
}
generatedFilesBaseDir = "${projectDir}/build/generated-sources/"
plugins {
rsocketRpc {
artifact = 'io.rsocket.rpc:rsocket-rpc-protobuf:0.2.17'
}
}
generateProtoTasks {
all()*.plugins {
rsocketRpc {}
}
}
}
作為generateProto任務(wù)的結(jié)果,我們應(yīng)該能夠獲得服務(wù)接口、服務(wù)客戶端和服務(wù)服務(wù)端類
CustomerServiceCustomerServiceClientCustomerServiceServer
再次,我們必須實(shí)現(xiàn)服務(wù)接口(CustomerService)的相關(guān)業(yè)務(wù)邏輯:
public class DefaultCustomerService implements CustomerService {
private static final List RANDOM_NAMES = Arrays.asList("Andrew", "Joe", "Matt", "Rachel", "Robin", "Jack");
@Override
public Mono getCustomer(SingleCustomerRequest message, ByteBuf metadata) {
log.info("Received 'getCustomer' request [{}]", message);
return Mono.just(CustomerResponse.newBuilder()
.setId(message.getId())
.setName(getRandomName())
.build());
}
@Override
public Flux getCustomers(MultipleCustomersRequest message, ByteBuf metadata) {
return Flux.interval(Duration.ofMillis(1000))
.map(time -> CustomerResponse.newBuilder()
.setId(UUID.randomUUID().toString())
.setName(getRandomName())
.build());
}
@Override
public Mono deleteCustomer(SingleCustomerRequest message, ByteBuf metadata) {
log.info("Received 'deleteCustomer' request [{}]", message);
return Mono.just(Empty.newBuilder().build());
}
@Override
public Flux customerChannel(Publisher messages, ByteBuf metadata) {
return Flux.from(messages)
.doOnNext(message -> log.info("Received 'customerChannel' request [{}]", message))
.map(message -> CustomerResponse.newBuilder()
.setId(UUID.randomUUID().toString())
.setName(getRandomName())
.build());
}
private String getRandomName() {
return RANDOM_NAMES.get(new Random().nextInt(RANDOM_NAMES.size() - 1));
}
}
最后,我們可以通過(guò) RSocket 暴露服務(wù)。為此,我們必須創(chuàng)建服務(wù)端的實(shí)例(CustomerServiceServer)并注入服務(wù)的實(shí)現(xiàn)(DefaultCustomerService)。然后,我們準(zhǔn)備創(chuàng)建一個(gè) RSocket 接受器實(shí)例。該 API 提供了RequestHandlingRSocket,該服務(wù)包裝服務(wù)端實(shí)例,并將契約中定義的端點(diǎn)轉(zhuǎn)換為 RSocket 交互模型中可用的方法。
public class Server {
public static void main(String[] args) throws InterruptedException {
CustomerServiceServer serviceServer = new CustomerServiceServer(new DefaultCustomerService(), Optional.empty(), Optional.empty());
RSocketFactory
.receive()
.acceptor((setup, sendingSocket) -> Mono.just(
new RequestHandlingRSocket(serviceServer)
))
.transport(TcpServerTransport.create(7000))
.start()
.block();
Thread.currentThread().join();
}
}
在客戶端,實(shí)現(xiàn)非常簡(jiǎn)單。我們需要做的就是創(chuàng)建 RSocket 實(shí)例,并通過(guò)構(gòu)造函數(shù)將其注入到客戶端實(shí)例中,然后就可以開始了。
@Slf4j
public class Client {
public static void main(String[] args) {
RSocket rSocket = RSocketFactory
.connect()
.transport(TcpClientTransport.create(7000))
.start()
.block();
CustomerServiceClient customerServiceClient = new CustomerServiceClient(rSocket);
customerServiceClient.deleteCustomer(SingleCustomerRequest.newBuilder()
.setId(UUID.randomUUID().toString()).build())
.block();
customerServiceClient.getCustomer(SingleCustomerRequest.newBuilder()
.setId(UUID.randomUUID().toString()).build())
.doOnNext(response -> log.info("Received response for 'getCustomer': [{}]", response))
.block();
customerServiceClient.getCustomers(MultipleCustomersRequest.newBuilder()
.addIds(UUID.randomUUID().toString()).build())
.doOnNext(response -> log.info("Received response for 'getCustomers': [{}]", response))
.subscribe();
customerServiceClient.customerChannel(s -> s.onNext(MultipleCustomersRequest.newBuilder()
.addIds(UUID.randomUUID().toString())
.build()))
.doOnNext(customerResponse -> log.info("Received response for 'customerChannel' [{}]", customerResponse))
.blockLast();
}
}
將 RSocket 與 RPC 方法結(jié)合使用有助于維護(hù)微服務(wù)之間的契約,并改善日常開發(fā)人員的體驗(yàn)。它適用于不需要完全控制幀的典型場(chǎng)景,但是另一方面,它不限制協(xié)議的靈活性。我們?nèi)匀豢梢栽谕粦?yīng)用程序中暴露 RPC 端點(diǎn)以及普通的 RSocket 接受器,以便我們可以輕松地為給定用例選擇最佳的通信模式。 在 RSocket 上進(jìn)行 RPC 的情況下,可能會(huì)出現(xiàn)一個(gè)基本的問(wèn)題:它比 gRPC 好嗎?這個(gè)問(wèn)題沒(méi)有簡(jiǎn)單的答案。 RSocket 是一項(xiàng)新技術(shù),它需要一些時(shí)間才能達(dá)到與 gRPC 相同的成熟度。另一方面,它在兩個(gè)方面超過(guò)了 gRPC:性能(這里可以使用基準(zhǔn)測(cè)試)和靈活性——可以作為傳輸層用于 RPC 或作為普通消息傳遞解決方案。 在決定在生產(chǎn)環(huán)境中使用哪種軟件之前,應(yīng)該確定 RSocket 是否符合的“早期采用”策略,并且不會(huì)使軟件面臨風(fēng)險(xiǎn)。就個(gè)人而言,我建議在不太重要的區(qū)域引入 RSocket,然后再擴(kuò)展到系統(tǒng)的其余部分。
Spring Boot 集成
第二個(gè)可用的解決方案是通過(guò)與 Spring Boot 的集成提供對(duì) RSocket 的抽象,我們將 RSocket 用作反應(yīng)式消息傳遞解決方案,并利用 Spring 注解輕松地將方法與路由連接起來(lái)。在下面的示例中,我們實(shí)現(xiàn)了兩個(gè) Spring Boot 應(yīng)用程序(請(qǐng)求者和響應(yīng)者)。響應(yīng)者通過(guò)CustomerController暴露 RSocket 接口,并映射到三個(gè)路徑:customer、customer-stream和customer-channel。這些映射中的每一個(gè)都反映了來(lái)自 RSocket 交互模型的不同方法(分別是請(qǐng)求-響應(yīng),請(qǐng)求流和通道)。CustomerController還實(shí)現(xiàn)了簡(jiǎn)單的業(yè)務(wù)邏輯,并返回帶有隨機(jī)名稱的CustomerResponse對(duì)象,如下例所示:
@Slf4j
@SpringBootApplication
public class RSocketResponderApplication {
public static void main(String[] args) {
SpringApplication.run(RSocketResponderApplication.class);
}
@Controller
public class CustomerController {
private final List RANDOM_NAMES = Arrays.asList("Andrew", "Joe", "Matt", "Rachel", "Robin", "Jack");
@MessageMapping("customer")
CustomerResponse getCustomer(CustomerRequest customerRequest) {
return new CustomerResponse(customerRequest.getId(), getRandomName());
}
@MessageMapping("customer-stream")
Flux getCustomers(MultipleCustomersRequest multipleCustomersRequest) {
return Flux.range(0, multipleCustomersRequest.getIds().size())
.delayElements(Duration.ofMillis(500))
.map(i -> new CustomerResponse(multipleCustomersRequest.getIds().get(i), getRandomName()));
}
@MessageMapping("customer-channel")
Flux getCustomersChannel(Flux requests) {
return Flux.from(requests)
.doOnNext(message -> log.info("Received 'customerChannel' request [{}]", message))
.map(message -> new CustomerResponse(message.getId(), getRandomName()));
}
private String getRandomName() {
return RANDOM_NAMES.get(new Random().nextInt(RANDOM_NAMES.size() - 1));
}
}
}
請(qǐng)注意,下面提供的示例基于 Spring Boot RSocket starter 2.2.0.M4 版本,這意味著它不是正式版本,API 可能會(huì)更改。
值得注意的是,Spring Boot 會(huì)自動(dòng)檢測(cè)類路徑上的 RSocket 庫(kù)并啟動(dòng)服務(wù)端。我們需要做的就是指定端口:
spring:
rsocket:
server:
port: 7000
這幾行代碼和配置設(shè)置了完全可操作的響應(yīng)者程序。 讓我們?cè)倏匆幌抡?qǐng)求方。在這里,我們實(shí)現(xiàn)了CustomerServiceAdapter,它負(fù)責(zé)與響應(yīng)者進(jìn)行通信。它使用RSocketRequester bean 封裝 RSocket 實(shí)例,該 bean 中還包含數(shù)據(jù)類型以及封裝在 RSocketStrategies 對(duì)象中編碼/解碼的詳細(xì)信息。 再用反應(yīng)式的方式給 RSocketRequester 配置路由消息以及處理數(shù)據(jù)的序列化/反序列化信息??偨Y(jié)下來(lái),我們需要做的就是提供路由、數(shù)據(jù)以及消費(fèi)響應(yīng)者的消息的方式——作為單個(gè)對(duì)象(Mono)或作為流(Flux)。
@Slf4j
@SpringBootApplication
public class RSocketRequesterApplication {
public static void main(String[] args) {
SpringApplication.run(RSocketRequesterApplication.class);
}
@Bean
RSocket rSocket() {
return RSocketFactory
.connect()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket, MimeTypeUtils.APPLICATION_JSON,
rSocketStrategies);
}
@Component
class CustomerServiceAdapter {
private final RSocketRequester rSocketRequester;
CustomerServiceAdapter(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
Mono getCustomer(String id) {
return rSocketRequester
.route("customer")
.data(new CustomerRequest(id))
.retrieveMono(CustomerResponse.class)
.doOnNext(customerResponse -> log.info("Received customer as mono [{}]", customerResponse));
}
Flux getCustomers(List ids) {
return rSocketRequester
.route("customer-stream")
.data(new MultipleCustomersRequest(ids))
.retrieveFlux(CustomerResponse.class)
.doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));
}
Flux getCustomerChannel(Flux customerRequestFlux) {
return rSocketRequester
.route("customer-channel")
.data(customerRequestFlux, CustomerRequest.class)
.retrieveFlux(CustomerResponse.class)
.doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));
}
}
}
除了與響應(yīng)者進(jìn)行通信之外,請(qǐng)求者還通過(guò)三種路徑暴露 RESTful API:/customers/{id},/customers,/customers-channel。在這里,我們使用 Spring WebFlux,基于 HTTP2 協(xié)議。請(qǐng)注意,最后兩個(gè)映射會(huì)生成文本事件流,這意味著數(shù)據(jù)可用時(shí)將被流式傳輸?shù)?Web 瀏覽器。
@RestController
class CustomerController {
private final CustomerServiceAdapter customerServiceAdapter;
CustomerController(CustomerServiceAdapter customerServiceAdapter) {
this.customerServiceAdapter = customerServiceAdapter;
}
@GetMapping("/customers/{id}")
Mono getCustomer(@PathVariable String id) {
return customerServiceAdapter.getCustomer(id);
}
@GetMapping(value = "/customers", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Publisher getCustomers() {
return customerServiceAdapter.getCustomers(getRandomIds(10));
}
@GetMapping(value = "/customers-channel", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Publisher getCustomersChannel() {
return customerServiceAdapter.getCustomerChannel(Flux.interval(Duration.ofMillis(1000))
.map(id -> new CustomerRequest(UUID.randomUUID().toString())));
}
private List getRandomIds(int amount) {
return IntStream.range(0, amount)
.mapToObj(n -> UUID.randomUUID().toString())
.collect(toList());
}
}
要使用上述 REST 接口,可以使用以下curl命令:
curl http://localhost:8080/customers/1
curl http://localhost:8080/customers
curl http://localhost:8080/customers-channel
請(qǐng)注意,請(qǐng)求者應(yīng)用程序有效的代碼在here。
Spring Boot 集成和 RPC 模塊的是 RSocket 之上的補(bǔ)充解決方案。第一個(gè)是面向消息傳遞的,并提供了方便的消息路由 API,而 RPC 模塊使開發(fā)人員可以輕松控制暴露的端口并維護(hù)微服務(wù)之間的契約。這兩種解決方案都有應(yīng)用場(chǎng)景,可以輕松地與 RSocket 底層 API 結(jié)合使用單一協(xié)議以一致的方式滿足最復(fù)雜的要求。
系列總結(jié)
本文是與 RSocket 有關(guān)的微型系列的最后一部分,RSocket 是一種新的二進(jìn)制協(xié)議,可以徹底改變?cè)浦蟹?wù)之間的通信。其豐富的交互模型,性能和其他功能,例如,客戶端負(fù)載平衡和可恢復(fù)性使其成為幾乎所有可能的業(yè)務(wù)案例的理想選擇。RSocket 的使用還可以通過(guò)可用的抽象層進(jìn)行簡(jiǎn)化:Spring Boot 集成和 RPC 模塊——它們可以解決最典型的日常場(chǎng)景。 請(qǐng)注意,RSocket 處于候選版本(1.0.0-RC2)中,因此不建議在生產(chǎn)環(huán)境中使用該協(xié)議。盡管如此,還是應(yīng)當(dāng)保持關(guān)注的,因?yàn)椴粩嘣鲩L(zhǎng)的社區(qū)和大型科技公司(例如 Netflix,F(xiàn)acebook,阿里巴巴,Netifi)的支持可能會(huì)使 RSocket 成為云中的主要通信協(xié)議。