用 RSocket 解決響應(yīng)式服務(wù)之間的的通訊-Part 3:基于 RSocket 進(jìn)行抽象

作者:Rafa? Kowalski ,原文:Reactive Service to Service Communication With RSocket (Part 3): Abstraction Over RSocket

RSocket 系列的第三篇

如果你看過(guò)本系列的前兩篇文章,應(yīng)該已經(jīng)已經(jīng)發(fā)現(xiàn) RSocket 提供了一些底層的 API。可以直接使用交互模型中的方法進(jìn)行操作,而且可以沒(méi)有任何限制來(lái)回發(fā)送幀。這些基礎(chǔ)的 API 為我們提供了許多自由和控制權(quán),但是它可能會(huì)引入額外的問(wèn)題,尤其是與微服務(wù)之間的契約相關(guān)的問(wèn)題。

為了解決這些問(wèn)題,我們可以使用 RSocket 作為通用抽象層。有兩種可用的解決方案:RSocket RPC 或者與 Spring Framework集成。在以下各節(jié)中,我們將簡(jiǎn)要討論它們。

基于 RSocket 的 RPC

保持微服務(wù)之間的契約干凈清晰是分布式系統(tǒng)的關(guān)鍵問(wèn)題之一。為了確保應(yīng)用程序可以交換數(shù)據(jù),我們可以利用 RPC(遠(yuǎn)程過(guò)程調(diào)用)。幸運(yùn)的是,RSocket 具有專用的 RPC 模塊,它使用 Protobuf 作為序列化工具,因此,我們可以從 RSocket 的性能中受益并且同時(shí)具有保持契約的能力。通過(guò)將生成的服務(wù)和對(duì)象與 RSocket 接受器結(jié)合在一起,我們可以啟動(dòng)完全可操作的 RPC 服務(wù)端,并使用 RPC 客戶端輕松使用它。

首先,我們需要定義服務(wù)和對(duì)象。在下面的示例中,我們創(chuàng)建了具有四個(gè)方法的簡(jiǎn)單的CustomerService服務(wù),它們每個(gè)表示交互模型相互不同的方法。

syntax = "proto3";
option java_multiple_files = true;
option java_outer_classname = "ServiceProto";
package com.rsocket.rpc;
import "google/protobuf/empty.proto";
message SingleCustomerRequest {
    string id = 1;
}
message MultipleCustomersRequest {
    repeated string ids = 1;
}
message CustomerResponse {
    string id = 1;
    string name = 2;
}
service CustomerService {
    rpc getCustomer(SingleCustomerRequest) returns (CustomerResponse) {} //request-response
    rpc getCustomers(MultipleCustomersRequest) returns (stream CustomerResponse) {} //request-stream
    rpc deleteCustomer(SingleCustomerRequest) returns (google.protobuf.Empty) {} //fire'n'forget
    rpc customerChannel(stream MultipleCustomersRequest) returns (stream CustomerResponse) {} //request-channel
}

其次,我們必須使用上面顯示的 proto 文件來(lái)生成類。為此,我們可以創(chuàng)建一個(gè) Gradle 任務(wù),如下所示:

protobuf {
    protoc {
        artifact = 'com.google.protobuf:protoc:3.6.1'
    }
    generatedFilesBaseDir = "${projectDir}/build/generated-sources/"
    plugins {
        rsocketRpc {
            artifact = 'io.rsocket.rpc:rsocket-rpc-protobuf:0.2.17'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            rsocketRpc {}
        }
    }
}

作為generateProto任務(wù)的結(jié)果,我們應(yīng)該能夠獲得服務(wù)接口、服務(wù)客戶端和服務(wù)服務(wù)端類

  • CustomerService
  • CustomerServiceClient
  • CustomerServiceServer

再次,我們必須實(shí)現(xiàn)服務(wù)接口(CustomerService)的相關(guān)業(yè)務(wù)邏輯:

public class DefaultCustomerService implements CustomerService {
        private static final List RANDOM_NAMES = Arrays.asList("Andrew", "Joe", "Matt", "Rachel", "Robin", "Jack");
        @Override
        public Mono getCustomer(SingleCustomerRequest message, ByteBuf metadata) {
            log.info("Received 'getCustomer' request [{}]", message);
            return Mono.just(CustomerResponse.newBuilder()
                    .setId(message.getId())
                    .setName(getRandomName())
                    .build());
        }
        @Override
        public Flux getCustomers(MultipleCustomersRequest message, ByteBuf metadata) {
            return Flux.interval(Duration.ofMillis(1000))
                    .map(time -> CustomerResponse.newBuilder()
                            .setId(UUID.randomUUID().toString())
                            .setName(getRandomName())
                            .build());
        }
        @Override
        public Mono deleteCustomer(SingleCustomerRequest message, ByteBuf metadata) {
            log.info("Received 'deleteCustomer' request [{}]", message);
            return Mono.just(Empty.newBuilder().build());
        }
        @Override
        public Flux customerChannel(Publisher messages, ByteBuf metadata) {
            return Flux.from(messages)
                    .doOnNext(message -> log.info("Received 'customerChannel' request [{}]", message))
                    .map(message -> CustomerResponse.newBuilder()
                            .setId(UUID.randomUUID().toString())
                            .setName(getRandomName())
                            .build());
        }
        private String getRandomName() {
            return RANDOM_NAMES.get(new Random().nextInt(RANDOM_NAMES.size() - 1));
        }
}

最后,我們可以通過(guò) RSocket 暴露服務(wù)。為此,我們必須創(chuàng)建服務(wù)端的實(shí)例(CustomerServiceServer)并注入服務(wù)的實(shí)現(xiàn)(DefaultCustomerService)。然后,我們準(zhǔn)備創(chuàng)建一個(gè) RSocket 接受器實(shí)例。該 API 提供了RequestHandlingRSocket,該服務(wù)包裝服務(wù)端實(shí)例,并將契約中定義的端點(diǎn)轉(zhuǎn)換為 RSocket 交互模型中可用的方法。

public class Server {
    public static void main(String[] args) throws InterruptedException {
        CustomerServiceServer serviceServer = new CustomerServiceServer(new DefaultCustomerService(), Optional.empty(), Optional.empty());
        RSocketFactory
                .receive()
                .acceptor((setup, sendingSocket) -> Mono.just(
                        new RequestHandlingRSocket(serviceServer)
                ))
                .transport(TcpServerTransport.create(7000))
                .start()
                .block();
        Thread.currentThread().join();
    }
}

在客戶端,實(shí)現(xiàn)非常簡(jiǎn)單。我們需要做的就是創(chuàng)建 RSocket 實(shí)例,并通過(guò)構(gòu)造函數(shù)將其注入到客戶端實(shí)例中,然后就可以開始了。

@Slf4j
public class Client {
    public static void main(String[] args) {
        RSocket rSocket = RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(7000))
                .start()
                .block();
        CustomerServiceClient customerServiceClient = new CustomerServiceClient(rSocket);
        customerServiceClient.deleteCustomer(SingleCustomerRequest.newBuilder()
                .setId(UUID.randomUUID().toString()).build())
                .block();
        customerServiceClient.getCustomer(SingleCustomerRequest.newBuilder()
                .setId(UUID.randomUUID().toString()).build())
                .doOnNext(response -> log.info("Received response for 'getCustomer': [{}]", response))
                .block();
        customerServiceClient.getCustomers(MultipleCustomersRequest.newBuilder()
                .addIds(UUID.randomUUID().toString()).build())
                .doOnNext(response -> log.info("Received response for 'getCustomers': [{}]", response))
                .subscribe();
        customerServiceClient.customerChannel(s -> s.onNext(MultipleCustomersRequest.newBuilder()
                .addIds(UUID.randomUUID().toString())
                .build()))
                .doOnNext(customerResponse -> log.info("Received response for 'customerChannel' [{}]", customerResponse))
                .blockLast();
    }
}

將 RSocket 與 RPC 方法結(jié)合使用有助于維護(hù)微服務(wù)之間的契約,并改善日常開發(fā)人員的體驗(yàn)。它適用于不需要完全控制幀的典型場(chǎng)景,但是另一方面,它不限制協(xié)議的靈活性。我們?nèi)匀豢梢栽谕粦?yīng)用程序中暴露 RPC 端點(diǎn)以及普通的 RSocket 接受器,以便我們可以輕松地為給定用例選擇最佳的通信模式。 在 RSocket 上進(jìn)行 RPC 的情況下,可能會(huì)出現(xiàn)一個(gè)基本的問(wèn)題:它比 gRPC 好嗎?這個(gè)問(wèn)題沒(méi)有簡(jiǎn)單的答案。 RSocket 是一項(xiàng)新技術(shù),它需要一些時(shí)間才能達(dá)到與 gRPC 相同的成熟度。另一方面,它在兩個(gè)方面超過(guò)了 gRPC:性能(這里可以使用基準(zhǔn)測(cè)試)和靈活性——可以作為傳輸層用于 RPC 或作為普通消息傳遞解決方案。 在決定在生產(chǎn)環(huán)境中使用哪種軟件之前,應(yīng)該確定 RSocket 是否符合的“早期采用”策略,并且不會(huì)使軟件面臨風(fēng)險(xiǎn)。就個(gè)人而言,我建議在不太重要的區(qū)域引入 RSocket,然后再擴(kuò)展到系統(tǒng)的其余部分。

Spring Boot 集成

第二個(gè)可用的解決方案是通過(guò)與 Spring Boot 的集成提供對(duì) RSocket 的抽象,我們將 RSocket 用作反應(yīng)式消息傳遞解決方案,并利用 Spring 注解輕松地將方法與路由連接起來(lái)。在下面的示例中,我們實(shí)現(xiàn)了兩個(gè) Spring Boot 應(yīng)用程序(請(qǐng)求者和響應(yīng)者)。響應(yīng)者通過(guò)CustomerController暴露 RSocket 接口,并映射到三個(gè)路徑:customer、customer-streamcustomer-channel。這些映射中的每一個(gè)都反映了來(lái)自 RSocket 交互模型的不同方法(分別是請(qǐng)求-響應(yīng),請(qǐng)求流和通道)。CustomerController還實(shí)現(xiàn)了簡(jiǎn)單的業(yè)務(wù)邏輯,并返回帶有隨機(jī)名稱的CustomerResponse對(duì)象,如下例所示:

@Slf4j
@SpringBootApplication
public class RSocketResponderApplication {
    public static void main(String[] args) {
        SpringApplication.run(RSocketResponderApplication.class);
    }
    @Controller
    public class CustomerController {
        private final List RANDOM_NAMES = Arrays.asList("Andrew", "Joe", "Matt", "Rachel", "Robin", "Jack");
        @MessageMapping("customer")
        CustomerResponse getCustomer(CustomerRequest customerRequest) {
            return new CustomerResponse(customerRequest.getId(), getRandomName());
        }
        @MessageMapping("customer-stream")
        Flux getCustomers(MultipleCustomersRequest multipleCustomersRequest) {
            return Flux.range(0, multipleCustomersRequest.getIds().size())
                    .delayElements(Duration.ofMillis(500))
                    .map(i -> new CustomerResponse(multipleCustomersRequest.getIds().get(i), getRandomName()));
        }
        @MessageMapping("customer-channel")
        Flux getCustomersChannel(Flux requests) {
            return Flux.from(requests)
                    .doOnNext(message -> log.info("Received 'customerChannel' request [{}]", message))
                    .map(message -> new CustomerResponse(message.getId(), getRandomName()));
        }
        private String getRandomName() {
            return RANDOM_NAMES.get(new Random().nextInt(RANDOM_NAMES.size() - 1));
        }
    }
}

請(qǐng)注意,下面提供的示例基于 Spring Boot RSocket starter 2.2.0.M4 版本,這意味著它不是正式版本,API 可能會(huì)更改。

值得注意的是,Spring Boot 會(huì)自動(dòng)檢測(cè)類路徑上的 RSocket 庫(kù)并啟動(dòng)服務(wù)端。我們需要做的就是指定端口:

spring:
  rsocket:
    server:
      port: 7000

這幾行代碼和配置設(shè)置了完全可操作的響應(yīng)者程序。 讓我們?cè)倏匆幌抡?qǐng)求方。在這里,我們實(shí)現(xiàn)了CustomerServiceAdapter,它負(fù)責(zé)與響應(yīng)者進(jìn)行通信。它使用RSocketRequester bean 封裝 RSocket 實(shí)例,該 bean 中還包含數(shù)據(jù)類型以及封裝在 RSocketStrategies 對(duì)象中編碼/解碼的詳細(xì)信息。 再用反應(yīng)式的方式給 RSocketRequester 配置路由消息以及處理數(shù)據(jù)的序列化/反序列化信息??偨Y(jié)下來(lái),我們需要做的就是提供路由、數(shù)據(jù)以及消費(fèi)響應(yīng)者的消息的方式——作為單個(gè)對(duì)象(Mono)或作為流(Flux)。

@Slf4j
@SpringBootApplication
public class RSocketRequesterApplication {
    public static void main(String[] args) {
        SpringApplication.run(RSocketRequesterApplication.class);
    }
    @Bean
    RSocket rSocket() {
        return RSocketFactory
                .connect()
                .frameDecoder(PayloadDecoder.ZERO_COPY)
                .dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)
                .transport(TcpClientTransport.create(7000))
                .start()
                .block();
    }
    @Bean
    RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket, MimeTypeUtils.APPLICATION_JSON,
                rSocketStrategies);
    }
    @Component
    class CustomerServiceAdapter {
        private final RSocketRequester rSocketRequester;
        CustomerServiceAdapter(RSocketRequester rSocketRequester) {
            this.rSocketRequester = rSocketRequester;
        }
        Mono getCustomer(String id) {
            return rSocketRequester
                    .route("customer")
                    .data(new CustomerRequest(id))
                    .retrieveMono(CustomerResponse.class)
                    .doOnNext(customerResponse -> log.info("Received customer as mono [{}]", customerResponse));
        }
        Flux getCustomers(List ids) {
            return rSocketRequester
                    .route("customer-stream")
                    .data(new MultipleCustomersRequest(ids))
                    .retrieveFlux(CustomerResponse.class)
                    .doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));
        }
        Flux getCustomerChannel(Flux customerRequestFlux) {
            return rSocketRequester
                    .route("customer-channel")
                    .data(customerRequestFlux, CustomerRequest.class)
                    .retrieveFlux(CustomerResponse.class)
                    .doOnNext(customerResponse -> log.info("Received customer as flux [{}]", customerResponse));
        }
    }
}

除了與響應(yīng)者進(jìn)行通信之外,請(qǐng)求者還通過(guò)三種路徑暴露 RESTful API:/customers/{id},/customers,/customers-channel。在這里,我們使用 Spring WebFlux,基于 HTTP2 協(xié)議。請(qǐng)注意,最后兩個(gè)映射會(huì)生成文本事件流,這意味著數(shù)據(jù)可用時(shí)將被流式傳輸?shù)?Web 瀏覽器。

@RestController
class CustomerController {
private final CustomerServiceAdapter customerServiceAdapter;
CustomerController(CustomerServiceAdapter customerServiceAdapter) {
    this.customerServiceAdapter = customerServiceAdapter;
}
@GetMapping("/customers/{id}")
Mono getCustomer(@PathVariable String id) {
    return customerServiceAdapter.getCustomer(id);
}
@GetMapping(value = "/customers", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Publisher getCustomers() {
    return customerServiceAdapter.getCustomers(getRandomIds(10));
}
@GetMapping(value = "/customers-channel", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Publisher getCustomersChannel() {
    return customerServiceAdapter.getCustomerChannel(Flux.interval(Duration.ofMillis(1000))
            .map(id -> new CustomerRequest(UUID.randomUUID().toString())));
}
private List getRandomIds(int amount) {
    return IntStream.range(0, amount)
            .mapToObj(n -> UUID.randomUUID().toString())
            .collect(toList());
}
}

要使用上述 REST 接口,可以使用以下curl命令:

curl http://localhost:8080/customers/1
curl http://localhost:8080/customers
curl http://localhost:8080/customers-channel

請(qǐng)注意,請(qǐng)求者應(yīng)用程序有效的代碼在here

Spring Boot 集成和 RPC 模塊的是 RSocket 之上的補(bǔ)充解決方案。第一個(gè)是面向消息傳遞的,并提供了方便的消息路由 API,而 RPC 模塊使開發(fā)人員可以輕松控制暴露的端口并維護(hù)微服務(wù)之間的契約。這兩種解決方案都有應(yīng)用場(chǎng)景,可以輕松地與 RSocket 底層 API 結(jié)合使用單一協(xié)議以一致的方式滿足最復(fù)雜的要求。

系列總結(jié)

本文是與 RSocket 有關(guān)的微型系列的最后一部分,RSocket 是一種新的二進(jìn)制協(xié)議,可以徹底改變?cè)浦蟹?wù)之間的通信。其豐富的交互模型,性能和其他功能,例如,客戶端負(fù)載平衡和可恢復(fù)性使其成為幾乎所有可能的業(yè)務(wù)案例的理想選擇。RSocket 的使用還可以通過(guò)可用的抽象層進(jìn)行簡(jiǎn)化:Spring Boot 集成和 RPC 模塊——它們可以解決最典型的日常場(chǎng)景。 請(qǐng)注意,RSocket 處于候選版本(1.0.0-RC2)中,因此不建議在生產(chǎn)環(huán)境中使用該協(xié)議。盡管如此,還是應(yīng)當(dāng)保持關(guān)注的,因?yàn)椴粩嘣鲩L(zhǎng)的社區(qū)和大型科技公司(例如 Netflix,F(xiàn)acebook,阿里巴巴,Netifi)的支持可能會(huì)使 RSocket 成為云中的主要通信協(xié)議。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容