RPC的處理流程
- 當(dāng)客戶(hù)端啟動(dòng)時(shí),創(chuàng)建一個(gè)匿名的回調(diào)隊(duì)列。
- 客戶(hù)端為RPC請(qǐng)求設(shè)置2個(gè)屬性:replyTo,設(shè)置回調(diào)隊(duì)列名字;correlationId,標(biāo)記request。
- 請(qǐng)求被發(fā)送到請(qǐng)求隊(duì)列中。
- RPC服務(wù)器端監(jiān)聽(tīng)請(qǐng)求隊(duì)列中的請(qǐng)求,當(dāng)請(qǐng)求到來(lái)時(shí),服務(wù)器端會(huì)處理并且把帶有結(jié)果的消息發(fā)送給客戶(hù)端。接收的隊(duì)列就是replyTo設(shè)定的回調(diào)隊(duì)列。
- 客戶(hù)端監(jiān)聽(tīng)回調(diào)隊(duì)列,當(dāng)有消息時(shí),檢查correlationId屬性,如果與request中匹配,則返回。

832799-20161224004437839-1074972304.png
代碼實(shí)現(xiàn)
// 客戶(hù)端
package com.shawntime.test.rabbitmq.rpc.rabbit;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.Maps;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.shawntime.test.rabbitmq.rpc.IRpcService;
import com.shawntime.test.rabbitmq.rpc.RpcInvokeModel;
import org.apache.commons.lang3.SerializationUtils;
/**
* Created by shma on 2017/5/8.
*/
public class Client implements IRpcService {
private Channel produceChannel;
private Channel consumeChannel;
private String callBackQueueName;
private final Map<String, BlockingQueue<byte[]>> completionQueueMap;
public Client(ConnectModel connectModel) throws IOException, TimeoutException {
connect(connectModel);
this.completionQueueMap = Maps.newConcurrentMap();
}
public byte[] call(RpcInvokeModel model) throws IOException, InterruptedException, ExecutionException {
model.setDid(UUID.randomUUID().toString());
model.setCallBackQueueName(callBackQueueName);
byte[] body = SerializationUtils.serialize(model);
BlockingQueue<byte[]> blockingQueue = new LinkedBlockingQueue<byte[]>(1);
completionQueueMap.put(model.getDid(), blockingQueue);
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
.builder()
.correlationId(model.getDid())
.replyTo(callBackQueueName)
.build();
produceChannel.basicPublish(Constant.REQUEST_EXCHANGE_NAME, Constant.REQUEST_ROUTING_NAME, basicProperties, body);
return blockingQueue.take();
}
private void connect(ConnectModel connectModel) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost(connectModel.getVirtualHost());
factory.setPort(connectModel.getPort());
factory.setUsername(connectModel.getUserName());
factory.setPassword(connectModel.getPassword());
factory.setHost(connectModel.getHost());
Connection connection = factory.newConnection();
produceChannel = connection.createChannel();
consumeChannel = connection.createChannel();
produceChannel.queueDeclare(Constant.REQUEST_QUEUE_NAME, true, false, false, null);
produceChannel.exchangeDeclare(Constant.REQUEST_EXCHANGE_NAME, "direct");
produceChannel.basicQos(1);
callBackQueueName = produceChannel.queueDeclare().getQueue();
consumeChannel.exchangeDeclare(Constant.REPLY_EXCHANGE_NAME, "direct");
consumeChannel.queueBind(callBackQueueName, Constant.REPLY_EXCHANGE_NAME, callBackQueueName);
consumeChannel.basicQos(1);
consumeChannel.basicConsume(callBackQueueName, true, new DefaultConsumer(consumeChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, final byte[] body) throws IOException {
BlockingQueue<byte[]> blockingQueue = completionQueueMap.get(properties.getCorrelationId());
blockingQueue.add(body);
}
});
}
}
// 服務(wù)端
package com.shawntime.test.rabbitmq.rpc.rabbit;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.shawntime.test.rabbitmq.rpc.JsonHelper;
import com.shawntime.test.rabbitmq.rpc.RpcInvokeModel;
import com.shawntime.test.rabbitmq.rpc.operator.bean.User;
import org.apache.commons.lang3.SerializationUtils;
/**
* Created by shma on 2017/5/8.
*/
public class Service {
private Channel produceChannel;
private Channel consumeChannel;
private ConnectModel connectModel;
public Service(ConnectModel connectModel) throws IOException, TimeoutException {
this.connectModel = connectModel;
}
public void start() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost(connectModel.getVirtualHost());
factory.setPort(connectModel.getPort());
factory.setUsername(connectModel.getUserName());
factory.setPassword(connectModel.getPassword());
factory.setHost(connectModel.getHost());
Connection connection = factory.newConnection();
produceChannel = connection.createChannel();
produceChannel.exchangeDeclare(Constant.REPLY_EXCHANGE_NAME, "direct");
produceChannel.basicQos(1);
consumeChannel = connection.createChannel();
consumeChannel.queueDeclare(Constant.REQUEST_QUEUE_NAME, true, false, false, null);
consumeChannel.exchangeDeclare(Constant.REQUEST_EXCHANGE_NAME, "direct");
consumeChannel.basicQos(1);
consumeChannel.queueBind(Constant.REQUEST_QUEUE_NAME, Constant.REQUEST_EXCHANGE_NAME, Constant
.REQUEST_ROUTING_NAME);
consumeChannel.basicConsume(Constant.REQUEST_QUEUE_NAME, true, new DefaultConsumer(consumeChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[]
body) throws IOException {
RpcInvokeModel model = SerializationUtils.deserialize(body);
Class cls;
try {
cls = Class.forName(model.getClassName());
Object[] arguments = model.getArguments();
Class[] clazz = new Class[arguments.length];
for (int index = 0 ; index < clazz.length; ++index) {
clazz[index] = arguments[index].getClass();
}
Method method = cls.getDeclaredMethod(model.getMethodName(), clazz);
Object object = method.invoke(cls.newInstance(), arguments);
byte[] resultData = JsonHelper.serialize(object).getBytes("UTF-8");
String queueName = properties.getReplyTo();
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId()).build();
produceChannel.basicPublish(Constant.REPLY_EXCHANGE_NAME, queueName, replyProps, resultData);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
});
}
}
package com.shawntime.test.rabbitmq.rpc.rabbit;
/**
* Created by shma on 2017/5/8.
*/
public class ConnectModel {
private String virtualHost;
private String host;
private String userName;
private String password;
private int port;
public String getVirtualHost() {
return virtualHost;
}
public void setVirtualHost(String virtualHost) {
this.virtualHost = virtualHost;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
package com.shawntime.test.rabbitmq.rpc;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
/**
* Created by shma on 2017/5/8.
*/
public interface IRpcService<T> {
T call(RpcInvokeModel model) throws IOException, InterruptedException, ExecutionException;
}
package com.shawntime.test.rabbitmq.rpc;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.shawntime.test.rabbitmq.rpc.rabbit.ConnectModel;
import com.shawntime.test.rabbitmq.rpc.rabbit.Service;
/**
* Created by shma on 2017/5/8.
*/
public class TestServerMain {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectModel model = new ConnectModel();
model.setHost("127.0.0.1");
model.setPassword("shawntime");
model.setUserName("shawntime");
model.setVirtualHost("Test");
model.setPort(5672);
Service service = new Service(model);
service.start();
}
}
package com.shawntime.test.rabbitmq.rpc;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.shawntime.test.rabbitmq.rpc.operator.IBaseClientService;
import com.shawntime.test.rabbitmq.rpc.operator.bean.User;
import com.shawntime.test.rabbitmq.rpc.operator.client.BaseClientService;
import com.shawntime.test.rabbitmq.rpc.rabbit.Client;
import com.shawntime.test.rabbitmq.rpc.rabbit.ConnectModel;
/**
* Created by shma on 2017/5/8.
*/
public class TestClientMain {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectModel model = new ConnectModel();
model.setHost("127.0.0.1");
model.setPassword("shawntime");
model.setUserName("shawntime");
model.setVirtualHost("Test");
model.setPort(5672);
Client client = new Client(model);
IBaseClientService baseClientService = new BaseClientService(client);
User userInfo = baseClientService.getUserInfo(1);
System.out.println(userInfo.getUserId());
System.out.println(userInfo.getUserName());
User user = new User();
user.setUserName("AAA");
user.setUserId(222);
System.out.println(baseClientService.save(user));
}
}