前一篇文章簡單介紹了下RPC的基本原理,同時附上了一個小的demo,但是這個小的demo并不能在生產(chǎn)上使用,因為生產(chǎn)上的RPC還需要考慮很多因素,比如接入注冊中心、高性能的網(wǎng)絡(luò)通信、高性能的序列化和反序列化、自動路由、容錯處理等等。要實現(xiàn)生產(chǎn)上使用的先不談,我們先來實現(xiàn)一個稍微復雜的RPC,借助這個RPC例子來更深刻的理解RPC原理,為后續(xù)Dubbo源碼的分析做準備。
一、簡單RPC架構(gòu)設(shè)計
回顧下RPC原理圖:

如果要自己設(shè)計實現(xiàn)稍微簡單的一個rpc框架,應該需要考慮注冊中心、網(wǎng)絡(luò)通信、序列化等內(nèi)容,因為可以設(shè)計出如下鍵略的RPC架構(gòu)圖:

二、項目起步說明
1、本RCP項目涉及的功能點
動態(tài)代理、反射、序列化、反序列化、網(wǎng)絡(luò)通信、編解碼、服務(wù)發(fā)現(xiàn)和注冊、心跳與鏈路檢測等
2、本小節(jié)內(nèi)容說明
設(shè)計的技術(shù)點:動態(tài)代理技術(shù)、反射(有關(guān)動態(tài)代理可以看我的另一篇博文:代理模式)
實現(xiàn)的功能:簡易的基于socket的rpc
3、本小節(jié)項目總體框架圖

三、編碼實現(xiàn)
1、api模塊
創(chuàng)建用于測試的接口
public interface HelloService {
String sayHello(String name);
}
2、common模塊
request請求實體類
public class Request implements Serializable {
private static final long serialVersionUID = 7929047349488932740L;
/**
* 請求表示id
*/
private String requestId;
/**
* 請求服務(wù)類型
*/
private String className;
/**
* 請求方法名稱
*/
private String methodName;
/**
* 請求方法參數(shù)類型數(shù)組
*/
private Class<?>[] parameterTypes;
/**
* 請求參數(shù)列表
*/
private Object[] args;
......省略getter/setter
}
response響應實體類:
public class Response {
private static final long serialVersionUID = -1023480952777229650L;
private String requestId;
/**
* 響應狀態(tài)嗎
*/
private int code;
/**
* 響應消息說明
*/
private String msg;
/**
* 相應數(shù)據(jù)
*/
private Object data;
......省略getter/setter
3、provider模塊
服務(wù)的暴露(包好服務(wù)的注冊和服務(wù)的發(fā)布),服務(wù)端基本流程是
服務(wù)注冊->服務(wù)發(fā)布->服務(wù)啟動監(jiān)聽請求(socket)->處理請求
//rpc代理服務(wù),用于暴露服務(wù)
public class RpcProxyServer {
/**
* 創(chuàng)建一個線程池
*/
ExecutorService executorService = Executors.newCachedThreadPool();
/**
* 端口號
*/
private int port;
/**
* 1、服務(wù)注冊
* @param serviceInterface
* @param impClass
* @return
*/
public RpcProxyServer register(Class serviceInterface, Class impClass) {
//注冊服務(wù)(接口名:實現(xiàn)類名)
ProcessorHandler.register(serviceInterface, impClass);
return this;
}
public RpcProxyServer(int port) {
this.port = port;
}
/**
*2、 啟動發(fā)布(啟動)
*/
public void start() {
System.out.println("服務(wù)啟動====");
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
while (true) {//3、通過循環(huán)不斷接受請求
Socket socket = serverSocket.accept();//監(jiān)聽客戶端的請求
//4、每一個socket交給一個processorhandler處理,這里的target就是真正的業(yè)務(wù)類
executorService.execute(new ProcessorHandler(socket));//處理客戶端的請求
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
具體處理請求的handler
//服務(wù)端接受請求處理線程
public class ProcessorHandler implements Runnable {
private static final HashMap<String, Class<?>> serviceRegistry = new HashMap<String, Class<?>>();
/**
* socket
*/
private Socket socket;
public static void register(Class serviceInterface, Class impClass) {
//注冊服務(wù)(接口名:實現(xiàn)類名)
serviceRegistry.put(serviceInterface.getName(), impClass);
}
public ProcessorHandler(Socket socket) {
this.socket = socket;
}
public void run() {
//用于定義輸入流和輸出流
ObjectInputStream objectInputStream = null;
ObjectOutputStream objectOutputStream = null;
try {
objectInputStream = new ObjectInputStream(socket.getInputStream());
//從socket中讀取請求流對象
Request rpcRequest = (Request) objectInputStream.readObject();
//調(diào)用正真的處理方法
Object result = invoke(rpcRequest);
Response response = new Response();
response.setRequestId(rpcRequest.getRequestId());
response.setData(result);
response.setMsg(ResponseCodeEnum.SUCCESS.getMsg());
response.setCode(ResponseCodeEnum.SUCCESS.getCode());
//將結(jié)果通過socket輸出
objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(result);
objectOutputStream.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
closeOpenSource(objectInputStream, objectOutputStream);
}
}
private void closeOpenSource(ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream) {
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (objectOutputStream != null) {
try {
objectOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 利用反射技術(shù)執(zhí)行正真的方法(這里只是簡單的實現(xiàn),沒有容錯處理)
*
* @param rpcRequest
* @return
*/
private Object invoke(Request rpcRequest) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException {
//獲取目標對象并執(zhí)行目標方法(也就是獲取注冊后的接口實現(xiàn)類對象)
Class<?> targetClass = serviceRegistry.get(rpcRequest.getClassName());
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
Method method = targetClass.getMethod(rpcRequest.getMethodName(), parameterTypes);
Object[] args = rpcRequest.getArgs();
return method.invoke(targetClass.newInstance(), args);
}
}
用到的枚舉
public enum ResponseCodeEnum {
SUCCESS(0, "success"),
FAIL(1, "fail");
....省略
rpc接口實現(xiàn)類:
public class HelloServiceImpl implements HelloService {
public String sayHello(String name) {
return "hello " + name;
}
}
啟動服務(wù)main方法:
public class Demo1Main {
public static void main(String[] args) {
//創(chuàng)建代理服務(wù)
RpcProxyServer rpcProxyServer = new RpcProxyServer(8888);
//注冊服務(wù)
rpcProxyServer.register(HelloService.class, HelloServiceImpl.class);
//啟動服務(wù)
rpcProxyServer.start();
}
}
4、consumer模塊
消費端模塊主要是通過jdk動態(tài)代理的方式實現(xiàn)rpc接口代理請求遠程,基本流程
client->創(chuàng)建代理對象->通過代理對象請求遠程服務(wù)->接受返回的信息
public class ClientProxy<T> {
/**
* 服務(wù)端代理接口
*/
private Class<T> serverInstance;
/**
* 服務(wù)端地址
*/
private InetSocketAddress address;
public ClientProxy(Class<T> serverInstance, String ip, Integer port) {
this.address = new InetSocketAddress(ip, port);
this.serverInstance = serverInstance;
}
/**
* 獲取客戶端代理對象
*
* @return
*/
public T getClientInstance() {
return (T) Proxy.newProxyInstance(serverInstance.getClassLoader(), new Class<?>[]{serverInstance}, new RemoteInvocationHandler(address));
}
}
具體遠程調(diào)用invoke方法(jdk動態(tài)代理InvocationHandler)
public class RemoteInvocationHandler implements InvocationHandler {
/**
* 服務(wù)端地址
*/
private InetSocketAddress address;
public RemoteInvocationHandler(InetSocketAddress address) {
this.address=address;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request rpcRequest = new Request();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setArgs(args);
//通過網(wǎng)絡(luò)發(fā)送正式請求
RpcNetTransport netTransport = new RpcNetTransport(address.getPort(), address.getHostName());
Object result = (Object) netTransport.send(rpcRequest);
return result;//返回收到的結(jié)果
}
}
具體的rpc網(wǎng)絡(luò)請求(socket)
//網(wǎng)絡(luò)傳送
public class RpcNetTransport {
private int port;
private String host;
public RpcNetTransport(int port, String host) {
this.port = port;
this.host = host;
}
/**
* 發(fā)送請求
*
* @param request
*/
public Object send(Request request) throws IOException, ClassNotFoundException {
Socket socket = null;
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
// 1.創(chuàng)建Socket客戶端,根據(jù)指定地址連接遠程服務(wù)提供者
socket = new Socket(host, port);
//2、將遠程服務(wù)調(diào)用所需的接口類、方法名、參數(shù)列表等編碼后發(fā)送給服務(wù)提供者
outputStream=new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(request);
//3、同步阻塞等待服務(wù)器返回應答,獲取應答后返回
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} finally {
if (socket != null) {
socket.close();
}
if (outputStream != null) {
outputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
}
}
}
消費端消費服務(wù)main方法:
public class Demo1Main {
public static void main(String[] args) {
ClientProxy clientProxy = new ClientProxy(HelloService.class, "127.0.0.1", 8888);
HelloService helloService = (HelloService) clientProxy.getClientInstance();
String result = helloService.sayHello("嘿嘿嘿");
System.out.println(result);
}
}
三、總結(jié)與思考
總結(jié):本節(jié)實現(xiàn)了一個非常簡單的rpc原型項目,包含了服務(wù)注冊、采用BIO的網(wǎng)絡(luò)通信模型傳送數(shù)據(jù)、采用jdk原生代理模式進行服務(wù)代理、采用jdk原生的序列化方式進行序列化和反序列化等。后續(xù)將會針對該原型項目不斷的改進,不斷的引入新的“武器”,來豐富整個rpc項目。
后期預熱:引入注冊中心(解決服務(wù)治理問題)、引入多種高效的序列化機制、引入NIO的網(wǎng)絡(luò)通信模型、引入軟負載均衡機制、引入spi擴展機制、接入spring等等,敬請期待。