基本思路也是基于socket完成通信,遠(yuǎn)程的調(diào)用主要封裝了一個(gè)RpcRequest的實(shí)體對(duì)用的類名稱和方法名稱以及調(diào)用參數(shù)進(jìn)行了封裝
不多說,直接看代碼的實(shí)現(xiàn),實(shí)現(xiàn)主要分為共用的apI的部分,還有就是客戶端部分和服務(wù)端的部分
客戶端需要發(fā)起socket的調(diào)用,完成對(duì)需要調(diào)用的類的名稱以及方法名稱以及參數(shù)的封裝成RpcRequest實(shí)體,同樣客戶端需要對(duì)調(diào)用接口進(jìn)行動(dòng)態(tài)代理
服務(wù)端需要new 一個(gè)ServerSocket對(duì)到來的請(qǐng)求進(jìn)行監(jiān)聽,需要從socket的shuc輸入流中對(duì)發(fā)過來的RpcRequest進(jìn)行解析,然后通過反射進(jìn)行對(duì)實(shí)際的遠(yuǎn)程實(shí)現(xiàn)類反射調(diào)用,最終完成結(jié)果的寫入socket的輸出流中
代碼的下載可以參考: https://github.com/sunkang123/jdk/tree/master/java-rmi
具體的代碼如下:
先看共用的api部分
/**
* @Project: 3.DistributedProject
* @description: IHello 調(diào)用的接口
* @author: sunkang
* @create: 2018-06-23 11:30
* @ModificationHistory who when What
**/
public interface IHello {
String sayHello(String msg) throws RemoteException;
}
/**
* @Project: 3.DistributedProject
* @description: 封裝的請(qǐng)求實(shí)體
* @author: sunkang
* @create: 2018-06-23 11:33
* @ModificationHistory who when What
**/
public class RpcRequest implements Serializable {
private String className;
private String methodName;
private Object[] paramters;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParamters() {
return paramters;
}
public void setParamters(Object[] paramters) {
this.paramters = paramters;
}
}
客戶端的實(shí)現(xiàn)
/**
* @Project: 3.DistributedProject
* @description: 對(duì)接口的代理類
* @author: sunkang
* @create: 2018-06-23 11:38
* @ModificationHistory who when What
**/
public class RpcProxyClient {
public Object createProxy(Class<?> clazz,String host,int port){
return Proxy.newProxyInstance(clazz.getClassLoader(),new Class[]{clazz},new RemoteInvocationHandler(host,port));
}
}
/**
* @Project: 3.DistributedProject
* @description: 具體的代理實(shí)現(xiàn)
* @author: sunkang
* @create: 2018-06-23 11:41
* @ModificationHistory who when What
**/
public class RemoteInvocationHandler implements InvocationHandler {
private String host;
private int port;
public RemoteInvocationHandler(String host, int port) {
this.host=host;
this.port=port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request =new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParamters(args);
TcpTransport tsp =new TcpTransport(host,port);
return tsp.send(request);
}
}
/**
* @Project: 3.DistributedProject
* @description: 模擬tcp層 發(fā)起socket連接
* @author: sunkang
* @create: 2018-06-23 11:45
* @ModificationHistory who when What
**/
public class TcpTransport {
private String host;
private int port;
public TcpTransport(String host, int port) {
this.host=host;
this.port=port;
}
private Socket newSocket(){
try {
Socket socket = new Socket(host,port);
return socket;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public Object send(RpcRequest request) {
Socket socket = newSocket();
ObjectOutputStream oos = null ;
ObjectInputStream ois =null;
try {
oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(request);
ois = new ObjectInputStream(socket.getInputStream());
Object obj = ois.readObject();
return obj;
} catch (Exception e) {
e.printStackTrace();
}finally {
if(socket!=null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(oos != null){
try {
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(ois !=null){
try {
ois.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return null;
}
}
/**
* @Project: 3.DistributedProject
* @description: 具體的客戶端的調(diào)用者
* @author: sunkang
* @create: 2018-06-23 11:39
* @ModificationHistory who when What
**/
public class ClentDemo {
public static void main(String[] args) throws RemoteException {
RpcProxyClient proxyClient = new RpcProxyClient();
IHello hello =(IHello) proxyClient.createProxy(IHello.class,"localhost",8099);
System.out.println(hello.sayHello("sunkang you are very cool"));
}
}
- 服務(wù)端的調(diào)用
/**
* @Project: 3.DistributedProject
* @description: 具體的服務(wù)實(shí)現(xiàn)者
* @author: sunkang
* @create: 2018-06-23 11:33
* @ModificationHistory who when What
**/
public class HelloImpl implements IHello {
@Override
public String sayHello(String msg) throws RemoteException {
return "hello world "+ msg;
}
}
/**
* @Project: 3.DistributedProject
* @description: 模擬服務(wù)發(fā)布,發(fā)起ServerSocket的監(jiān)聽
* @author: sunkang
* @create: 2018-06-23 11:59
* @ModificationHistory who when What
**/
public class RpcServer {
private static final ExecutorService cashedThreadPool = Executors.newCachedThreadPool();
public void publish(final Object service, int port){
try {
ServerSocket serverSocket = new ServerSocket(port);
while(true){
Socket socket = serverSocket.accept();
cashedThreadPool.execute(new ProcessHandler(socket,service));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* @Project: 3.DistributedProject
* @description: 完成具體服務(wù)的調(diào)用邏輯和結(jié)果的socket寫入
* @author: sunkang
* @create: 2018-06-23 12:37
* @ModificationHistory who when What
**/
public class ProcessHandler implements Runnable {
private Socket socket;
private Object service;
public ProcessHandler(Socket socket, Object service) {
this.socket = socket;
this.service = service;
}
@Override
public void run() {
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
try {
ois = new ObjectInputStream(socket.getInputStream());
RpcRequest request = (RpcRequest) ois.readObject();
Object obj = invoke(request, service);
oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(obj);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (ois != null) {
try {
ois.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (oos != null) {
try {
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private Object invoke(RpcRequest request, Object service) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Object[] types = request.getParamters();
Class<?>[] methodParamters = new Class<?>[types.length];
for (int i = 0; i < types.length; i++) {
methodParamters[i] = types[i].getClass();
}
Method method = service.getClass().getMethod(request.getMethodName(), methodParamters);
return method.invoke(service, types);
}
}
/**
* @Project: 3.DistributedProject
* @description: 模擬服務(wù)發(fā)布,發(fā)起ServerSocket的監(jiān)聽
* @author: sunkang
* @create: 2018-06-23 11:59
* @ModificationHistory who when What
**/
public class RpcServer {
private static final ExecutorService cashedThreadPool = Executors.newCachedThreadPool();
public void publish(final Object service, int port){
try {
ServerSocket serverSocket = new ServerSocket(port);
while(true){
Socket socket = serverSocket.accept();
cashedThreadPool.execute(new ProcessHandler(socket,service));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}