Remote procedure call (RPC,遠程過程調(diào)用)
In the second tutorial we learned how to use Work Queues to distribute time-consuming tasks among multiple workers.
在教程2里我們學習了如何使用工作隊列在多個工作者之間分發(fā)耗時任務。
But what if we need to run a function on a remote computer and wait for the result? Well, that's a different story. This pattern is commonly known as Remote Procedure Call or RPC.
但如果我們需要在一個遠程電腦上運行一個函數(shù)并且等待運行結(jié)果的話要怎么辦呢?這就變成另一個問題了。這種模式通常被稱為遠程過程調(diào)用(Remote Procedure Call),或者簡稱RPC。
In this tutorial we're going to use RabbitMQ to build an RPC system: a client and a scalable RPC server. As we don't have any time-consuming tasks that are worth distributing, we're going to create a dummy RPC service that returns Fibonacci numbers.
在本節(jié)教程里,我們將用RabbitMQ來構(gòu)建一個RPC系統(tǒng),這個系統(tǒng)包括一個客戶端和一個可伸縮的RPC服務端。由于我們沒有什么耗時任務值得分發(fā),所以我們準備創(chuàng)建一個假的RPC服務,這個服務返回斐波那契(Fibonacci)數(shù)值。
Client interface(客戶端接口)
To illustrate how an RPC service could be used we're going to change the names of our profiles from "Sender" and "Receiver” to "Client" and "Server". When we call the server we will get back the fibonacci of the argument we call with.
為了說明RPC服務可以如何被使用,我們準備修改我們的配置組,將名稱從“Sender”和“Receiver”換成“Client”和“Server”。當我們調(diào)用服務端時,我們將會獲得我們傳入的參數(shù)所對應的斐波那契數(shù)值。
Integer response = (Integer) template.convertSendAndReceive
(exchange.getName(), "rpc", start++);
System.out.println(" [.] Got '" + response + "'");
A note on RPC(RPC的注意點)
Although RPC is a pretty common pattern in computing, it's often criticised. The problems arise when a programmer is not aware whether a function call is local or if it's a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.
雖然RPC在計算領(lǐng)域是很常見的模式,但它通常也是受爭議的。但程序員不知道一個函數(shù)調(diào)用是本地的還是慢速的RPC時就會出現(xiàn)一些問題。像這樣的混亂會導致不可預知的系統(tǒng),而且會給調(diào)試增加不必要的復雜性。不恰當?shù)厥褂肦PC不僅不會簡化程序,還會導致代碼變得很難維護。
Bearing that in mind, consider the following advice:
記住這一點,然后考慮一下幾點建議:
- Make sure it's obvious which function call is local and which is remote.(確保哪個函數(shù)調(diào)用是本地的,哪個是遠程的。)
- Document your system. Make the dependencies between components clear.(為你的系統(tǒng)做好文檔。清晰化組件間的依賴。)
- Handle error cases. How should the client react when the RPC server is down for a long time?(處理好會發(fā)生錯誤的場景。但RPC服務端長時間掛掉時,客戶端應該做出什么反應?)
When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.
當你無法對這些問題無法做出明確回答時,就不要使用RPC。如果可以的話,你應該使用異步pipeline,而不是類似于阻塞的RPC。使用異步pipeline,計算結(jié)果可以異步推入到下一個計算階段。
Callback queue(回調(diào)隊列)
In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response we need to send a 'callback' queue address with the request. Spring-amqp's RabbitTemplate handles the callback queue for us when we use the above 'convertSendAndReceive()' method. There is no need to do any other setup when using the RabbitTemplate. For a thorough explanation please see Request/Reply Message.
一般情況下,在RabbitMQ上實現(xiàn)RPC挺簡單的??蛻舳税l(fā)送請求消息然后服務端返回一個響應消息。為了接收響應消息,我們必須傳送一個用于處理請求的回調(diào)隊列。在我們使用“convertSendAndReceive()”方法時,Spring-amqp框架的RabbitTemplate類為我們做好了回調(diào)隊列的處理工作。使用RabbitTemplate類時無需在做其它配置。若想看完整的文檔,請參閱請求/發(fā)送消息。
Message properties(消息屬性)
The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
AMQP 0-9-1協(xié)議預定義了14個消息屬性。大部分的屬性都很少用到,除了以下幾個:
- deliveryMode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
- deliveryMode:將消息標記為要持久化(此時屬性值為2)或者瞬態(tài)(此時屬性值為2以外的其它數(shù)字)。教程2里提到過這
個屬性,你應該還記得。- contentType: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
- contentType:用來描述編碼的mime類型。例如,對于常用的JSON格式,最好將這個屬性值設為application/json。
- replyTo: Commonly used to name a callback queue.
- replayTo:通常用來命名一個回調(diào)隊列。
- correlationId: Useful to correlate RPC responses with requests.
- correlationId:該屬性用來將RPC響應與請求進行關(guān)聯(lián)。
Correlation Id(關(guān)聯(lián)Id)
Spring-amqp allows you to focus on the message style you're working with and hide the details of message plumbing required to support this style. For example, typically the native client would create a callback queue for every RPC request. That's pretty inefficient so an alternative is to create a single callback queue per client.
Spring-amqp能讓你專注于正在處理的消息類型,并隱藏了支持該類型的消息所需的消息管道的實現(xiàn)細節(jié)。例如,通常情況下,本地客戶端會為每個RPC請求都創(chuàng)建一個回調(diào)隊列。這種做法效率很低,所以替換方案是每個客戶端只創(chuàng)建一個回調(diào)隊列。
That raises a new issue, having received a response in that queue it's not clear to which request the response belongs. That's when the correlationId property is used. Spring-amqp automatically sets a unique value for every request. In addition it handles the details of matching the response with the correct correlationId.
但這會導致一個新的問題,那就是,對于從這個隊列里接收的響應,我們無法知道它對應的是哪個請求。這時候,correlationId就派上用場了。spring-amqp自動幫我們?yōu)槊恳粋€請求設好了唯一的correlationId值。而且,它還幫我們做好了將響應與correlationId進行匹配的細節(jié)。
One reason that spring-amqp makes rpc style easier is that sometimes you may want to ignore unknown messages in the callback queue, rather than failing with an error. It's due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending us the answer, but before sending an acknowledgment message for the request. If that happens, the restarted RPC server will process the request again. The spring-amqp client handles the duplicate responses gracefully, and the RPC should ideally be idempotent.
spring-amqp使得rpc模式變得簡單的一個原因是,有時你可能會想忽略回調(diào)隊列里的一些未知消息,而不是拋出錯誤。這是因為服務端可能會出現(xiàn)競爭的情況。有可能RPC服務端在給我們發(fā)送完響應但卻還沒來得及發(fā)送確認消息時,它就掛了,雖然看起來不大像會這樣。如果發(fā)生了這種情況,重啟RPC服務端會繼續(xù)再去處理這條請求。spring-amqp客戶端會優(yōu)雅地處理重復的響應,這種情況下,RPC應該是完美冪等的。
Summary(總結(jié))

Our RPC will work like this:
我們的RPC系統(tǒng)
1.The Tut6Config will setup a new DirectExchange and a client
在Tut6Config文件里將建立一個新的DirectExchange和一個客戶端。
2.The client will leverage the convertSendAndReceive passing the exchange name, the routingKey, and the message.
客戶端將使用convertSendAndReceive,并傳入交換器名字,路由鍵和消息。
3.The request is sent to an rpc_queue("tut.rpc") queue.
請求被發(fā)送到用于rpc的隊列里(“tut.rpc”)。
4.The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it performs the task and sends a message with the result back to the Client, using the queue from the replyTo field.
RPC工作者(也就是服務器)等待發(fā)送到隊列里的請求。但一個請求出現(xiàn)時,它就執(zhí)行任務,然后通過使用replyTo域里配置的隊列將帶有結(jié)果的消息發(fā)回給客戶端。
5.The client waits for data on the callback queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application. Again, this is done automagically via the RabbitTemplate.
客戶端等待回調(diào)隊列里的數(shù)據(jù)。當一條消息出現(xiàn)時,它會校驗correlationId屬性。如果屬性值與請求匹配,它就將響應返回給應用。這個工作RabbitTemplate自動幫我們完成了。
Putting it all together(代碼整合)
The Fibonacci task is a @RabbitListener and is defined as:
計算斐波那契的任務用@RabbitListener進行標注,任務內(nèi)容的定義如下:
public int fib(int n) {
return n == 0 ? 0 : n == 1 ? 1 : (fib(n - 1) + fib(n - 2));
}
We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible).
我們聲明了斐波那契函數(shù)。它假定輸入的參數(shù)是有效的正整數(shù)。(不要期望它能用于大數(shù)的場景,而且這種方式是最低效的遞歸實現(xiàn))。
The code for our Tut6Config looks like this:
Tut6Config的代碼看起來是如下這樣子的:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile({"tut6","rpc"})
@Configuration
public class Tut6Config {
@Profile("client")
private static class ClientConfig {
@Bean
public DirectExchange exchange() {
return new DirectExchange("tut.rpc");
}
@Bean
public Tut6Client client() {
return new Tut6Client();
}
}
@Profile("server")
private static class ServerConfig {
@Bean
public Queue queue() {
return new Queue("tut.rpc.requests");
}
@Bean
public DirectExchange exchange() {
return new DirectExchange("tut.rpc");
}
@Bean
public Binding binding(DirectExchange exchange, Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("rpc");
}
@Bean
public Tut6Server server() {
return new Tut6Server();
}
}
}
It setups up our profiles as "tut6" or "rpc". It also setups a "client" profile with two beans; 1) the DirectExchange we are using and 2) the Tut6Client itself. We also configure the "server" profile with three beans, the "tut.rpc.requests" queue, the DirectExchange, which matches the client's exchange, and the binding from the queue to the exchange with the "rpc" routing-key.
它建立了我們的配置組,叫“tut6”或者“rpc”。同時,還建立了一個“client”配置組,這個組里配置了兩個bean:一個是我們將要用到的DirectExchange類型的交換器,一個是Tut6Client本身。我們還建立了一個“server”配置組,這個組里配置了三個bean:一個名為“tut.rpc.requests”的隊列,一個與客戶端交換器相匹配的DirectExchange類型的交換器,以及用名為“rpc”的路由鍵將隊列和交換器的綁定器。
The server code is rather straightforward:
服務端代碼更直觀點:
1.As usual we start annotating our receiver method with a @RabbitListener and defining the queue its listening on.
像之前那樣,我們先用@RabbitListener來注解我們的接收者方法,然后定義它要監(jiān)聽的隊列。
2.Our fibanacci method calls fib() with the payload parameter and returns the result.
我們的斐波那契方法被命名為fib(),接收有效參數(shù)并返回結(jié)果。
The code for our RPC server Tut6Server.java:
以下為我們的RPC服務端代碼Tut6Server.java:
package org.springframework.amqp.tutorials.tut6;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
public class Tut6Server {
@RabbitListener(queues = "tut.rpc.requests")
// @SendTo("tut.rpc.replies") used when the
// client doesn't set replyTo.
public int fibonacci(int n) {
System.out.println(" [x] Received request for " + n);
int result = fib(n);
System.out.println(" [.] Returned " + result);
return result;
}
public int fib(int n) {
return n == 0 ? 0 : n == 1 ? 1 : (fib(n - 1) + fib(n - 2));
}
}
The client code Tut6Client is as easy as the server:
客戶端代碼Tut6Client與服務端代碼一樣簡單:
1.We autowire the RabbitTemplate and the DirectExchange bean as defined in the Tut6Config.
我們自動注入Tut6Config里定義的類型為RabbitTemplate和DirectExchange的bean。
2.We invoke template.convertSendAndReceive with the parameters exchange name, routing key and message.
我們調(diào)用template.convertSendAndReceive,傳入的參數(shù)為交換器名字,路由鍵以及消息。
3.We print the result.
打印出結(jié)果。
Making the Client request is simply:
發(fā)起客戶端請求也很簡單:
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
public class Tut6Client {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange exchange;
int start = 0;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
System.out.println(" [x] Requesting fib(" + start + ")");
Integer response = (Integer) template.convertSendAndReceive(exchange.getName(), "rpc", start++);
System.out.println(" [.] Got '" + response + "'");
}
}
Using the project setup as defined in (see tutorial one) with start.spring.io and SpringInitialzr the preparing the runtime is the same as the other tutorials:
mvn clean package
We can start the server with:
我們可以用以下命令來啟動服務端:
java -jar target/rabbit-tutorials-1.7.1.RELEASE.jar
--spring.profiles.active=rpc,server
--tutorial.client.duration=6000
To request a fibonacci number run the client:
若要請求斐波那契數(shù)值,可以用以下命令來啟動客戶端:
java -jar target/rabbit-tutorials-1.7.1.RELEASE.jar
--spring.profiles.active=rpc,server
java -jar target/rabbit-tutorials-1.7.1.RELEASE.jar
--spring.profiles.active=rpc,client
The design presented here is not the only possible implementation of a RPC service, but it has some important advantages:
以上展示的設計不僅僅是實現(xiàn)RPC服務的一種方式,而且還有一些重要的優(yōu)點:
1.If the RPC server is too slow, you can scale up by just running another one. Try running a second RPC Server in a new console.
如果RPC服務端太慢,你可以僅僅通過額外再運行一個服務端來增大其規(guī)模??梢試L試著在新的控制臺里運行第二個RPC服務端。
2.On the client side, the RPC requires sending and receiving only one message with one method. No synchronous calls like queueDeclare are required. As a result the RPC client needs only one network round trip for a single RPC request.
在客戶端這一邊,RPC要求只能一個方法里發(fā)送和接收一條消息。不需要像queueDelare那樣的同步調(diào)用。于是,對于一個RPC請求,RPC客戶端只需一個網(wǎng)絡回路。
Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:
我們的代碼仍然很簡單,而且沒有處理很多復雜(但重要)的問題,像:
1.How should the client react if there are no servers running?
如果沒有服務端正在運行,客戶端應該做出什么反應?
2.Should a client have some kind of timeout for the RPC?
客戶端需要為RPC做超時處理嗎?
3.If the server malfunctions and raises an exception, should it be forwarded to the client?
如果服務端出現(xiàn)故障并引起了異常,它是否應該告知客戶端?
4.Protecting against invalid incoming messages (eg checking bounds, type) before processing.
在處理進來的消息之前先處理掉無效的消息(如校驗邊界,類型)。
If you want to experiment, you may find the management UI useful for viewing the queues.
如果你想做些實驗,可以通過管理界面來查看隊列,你會發(fā)現(xiàn)它很有用的。
There is one other nice feature of RabbitMQ. It is featured as a supported tile on Pivotal Cloud Foundry (PCF) as a service.
RabbitMQ還有另一個很棒的特性。它作為服務在Pivotal Cloud Foundry(PCF)上被支持。
轉(zhuǎn)自我的博客:https://jiapengcai.github.io/posts/40514/