(4)自己手寫rmi的框架

基本思路也是基于socket完成通信,遠(yuǎn)程的調(diào)用主要封裝了一個(gè)RpcRequest的實(shí)體對(duì)用的類名稱和方法名稱以及調(diào)用參數(shù)進(jìn)行了封裝

不多說,直接看代碼的實(shí)現(xiàn),實(shí)現(xiàn)主要分為共用的apI的部分,還有就是客戶端部分服務(wù)端的部分
客戶端需要發(fā)起socket的調(diào)用,完成對(duì)需要調(diào)用的類的名稱以及方法名稱以及參數(shù)的封裝成RpcRequest實(shí)體,同樣客戶端需要對(duì)調(diào)用接口進(jìn)行動(dòng)態(tài)代理
服務(wù)端需要new 一個(gè)ServerSocket對(duì)到來的請(qǐng)求進(jìn)行監(jiān)聽,需要從socket的shuc輸入流中對(duì)發(fā)過來的RpcRequest進(jìn)行解析,然后通過反射進(jìn)行對(duì)實(shí)際的遠(yuǎn)程實(shí)現(xiàn)類反射調(diào)用,最終完成結(jié)果的寫入socket的輸出流中

代碼的下載可以參考: https://github.com/sunkang123/jdk/tree/master/java-rmi

具體的代碼如下:

  • 先看共用的api部分
/**
 * @Project: 3.DistributedProject
 * @description:   IHello 調(diào)用的接口
 * @author: sunkang
 * @create: 2018-06-23 11:30
 * @ModificationHistory who      when       What
 **/
public interface IHello {
    String sayHello(String msg) throws RemoteException;
}
/**
 * @Project: 3.DistributedProject
 * @description:  封裝的請(qǐng)求實(shí)體
 * @author: sunkang
 * @create: 2018-06-23 11:33
 * @ModificationHistory who      when       What
 **/
public class RpcRequest implements Serializable {

    private String className;

    private String methodName;

    private Object[] paramters;

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getParamters() {
        return paramters;
    }

    public void setParamters(Object[] paramters) {
        this.paramters = paramters;
    }
}
  • 客戶端的實(shí)現(xiàn)

/**
 * @Project: 3.DistributedProject
 * @description:  對(duì)接口的代理類
 * @author: sunkang
 * @create: 2018-06-23 11:38
 * @ModificationHistory who      when       What
 **/
public class RpcProxyClient {

    public Object createProxy(Class<?> clazz,String host,int port){
        return Proxy.newProxyInstance(clazz.getClassLoader(),new Class[]{clazz},new RemoteInvocationHandler(host,port));
    }
}
/**
 * @Project: 3.DistributedProject
 * @description:   具體的代理實(shí)現(xiàn)
 * @author: sunkang
 * @create: 2018-06-23 11:41
 * @ModificationHistory who      when       What
 **/
public class RemoteInvocationHandler implements InvocationHandler {
    private String host;
    private int port;

    public RemoteInvocationHandler(String host, int port) {
        this.host=host;
        this.port=port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest request =new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParamters(args);

        TcpTransport  tsp =new TcpTransport(host,port);

        return tsp.send(request);
    }
}
/**
 * @Project: 3.DistributedProject
 * @description:   模擬tcp層 發(fā)起socket連接
 * @author: sunkang
 * @create: 2018-06-23 11:45
 * @ModificationHistory who      when       What
 **/
public class TcpTransport {
    private String host;
    private int port;

    public TcpTransport(String host, int port) {
        this.host=host;
        this.port=port;
    }
    private Socket newSocket(){
        try {
            Socket socket = new Socket(host,port);
            return socket;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
    public Object send(RpcRequest request) {
        Socket socket = newSocket();
        ObjectOutputStream oos = null ;
        ObjectInputStream ois =null;
        try {
             oos = new ObjectOutputStream(socket.getOutputStream());
             oos.writeObject(request);
             ois = new ObjectInputStream(socket.getInputStream());
            Object obj =  ois.readObject();
            return obj;
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
             if(socket!=null){
                 try {
                     socket.close();
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
             }
             if(oos != null){
                 try {
                     oos.close();
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
             }
             if(ois !=null){
                 try {
                     ois.close();
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
             }
        }

        return null;
    }

}
/**
 * @Project: 3.DistributedProject
 * @description:   具體的客戶端的調(diào)用者
 * @author: sunkang
 * @create: 2018-06-23 11:39
 * @ModificationHistory who      when       What
 **/
public class ClentDemo {

    public static void main(String[] args) throws RemoteException {

        RpcProxyClient proxyClient = new RpcProxyClient();
        IHello hello =(IHello) proxyClient.createProxy(IHello.class,"localhost",8099);
        System.out.println(hello.sayHello("sunkang you are very cool"));
    }
}
  • 服務(wù)端的調(diào)用

/**
 * @Project: 3.DistributedProject
 * @description:  具體的服務(wù)實(shí)現(xiàn)者
 * @author: sunkang
 * @create: 2018-06-23 11:33
 * @ModificationHistory who      when       What
 **/
public class HelloImpl implements IHello {
    @Override
    public String sayHello(String msg) throws RemoteException {
        return "hello world "+ msg;
    }
}

/**
 * @Project: 3.DistributedProject
 * @description:   模擬服務(wù)發(fā)布,發(fā)起ServerSocket的監(jiān)聽
 * @author: sunkang
 * @create: 2018-06-23 11:59
 * @ModificationHistory who      when       What
 **/
public class RpcServer {

    private static  final ExecutorService cashedThreadPool = Executors.newCachedThreadPool();

    public void  publish(final Object service, int port){
        try {
            ServerSocket serverSocket  = new ServerSocket(port);
            while(true){
                 Socket socket =  serverSocket.accept();
                cashedThreadPool.execute(new ProcessHandler(socket,service));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
/**
 * @Project: 3.DistributedProject
 * @description:   完成具體服務(wù)的調(diào)用邏輯和結(jié)果的socket寫入
 * @author: sunkang
 * @create: 2018-06-23 12:37
 * @ModificationHistory who      when       What
 **/
public class ProcessHandler  implements  Runnable {

    private Socket socket;
    private Object service;

    public ProcessHandler(Socket socket, Object service) {
        this.socket = socket;
        this.service = service;
    }

    @Override
    public void run() {
        ObjectInputStream ois = null;
        ObjectOutputStream oos = null;
        try {
            ois = new ObjectInputStream(socket.getInputStream());
            RpcRequest request = (RpcRequest) ois.readObject();
            Object obj = invoke(request, service);
            oos = new ObjectOutputStream(socket.getOutputStream());
            oos.writeObject(obj);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (ois != null) {
                try {
                    ois.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (oos != null) {
                try {
                    oos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }


    private Object invoke(RpcRequest request, Object service) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        Object[] types = request.getParamters();
        Class<?>[] methodParamters = new Class<?>[types.length];
        for (int i = 0; i < types.length; i++) {
            methodParamters[i] = types[i].getClass();
        }
        Method method = service.getClass().getMethod(request.getMethodName(), methodParamters);
        return method.invoke(service, types);
    }

}
/**
 * @Project: 3.DistributedProject
 * @description:   模擬服務(wù)發(fā)布,發(fā)起ServerSocket的監(jiān)聽
 * @author: sunkang
 * @create: 2018-06-23 11:59
 * @ModificationHistory who      when       What
 **/
public class RpcServer {

    private static  final ExecutorService cashedThreadPool = Executors.newCachedThreadPool();

    public void  publish(final Object service, int port){
        try {
            ServerSocket serverSocket  = new ServerSocket(port);
            while(true){
                 Socket socket =  serverSocket.accept();
                cashedThreadPool.execute(new ProcessHandler(socket,service));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容