動手寫RPC起步

前一篇文章簡單介紹了下RPC的基本原理,同時附上了一個小的demo,但是這個小的demo并不能在生產(chǎn)上使用,因為生產(chǎn)上的RPC還需要考慮很多因素,比如接入注冊中心、高性能的網(wǎng)絡(luò)通信、高性能的序列化和反序列化、自動路由、容錯處理等等。要實現(xiàn)生產(chǎn)上使用的先不談,我們先來實現(xiàn)一個稍微復雜的RPC,借助這個RPC例子來更深刻的理解RPC原理,為后續(xù)Dubbo源碼的分析做準備。

一、簡單RPC架構(gòu)設(shè)計

回顧下RPC原理圖:



如果要自己設(shè)計實現(xiàn)稍微簡單的一個rpc框架,應該需要考慮注冊中心、網(wǎng)絡(luò)通信、序列化等內(nèi)容,因為可以設(shè)計出如下鍵略的RPC架構(gòu)圖:


二、項目起步說明

1、本RCP項目涉及的功能點

動態(tài)代理、反射、序列化、反序列化、網(wǎng)絡(luò)通信、編解碼、服務(wù)發(fā)現(xiàn)和注冊、心跳與鏈路檢測等

2、本小節(jié)內(nèi)容說明

設(shè)計的技術(shù)點:動態(tài)代理技術(shù)、反射(有關(guān)動態(tài)代理可以看我的另一篇博文:代理模式
實現(xiàn)的功能:簡易的基于socket的rpc

3、本小節(jié)項目總體框架圖

三、編碼實現(xiàn)

1、api模塊

創(chuàng)建用于測試的接口

public interface HelloService {
    String sayHello(String name);
}
2、common模塊

request請求實體類

public class Request implements Serializable {
    private static final long serialVersionUID = 7929047349488932740L;
    /**
     * 請求表示id
     */
    private String requestId;
    /**
     * 請求服務(wù)類型
     */
    private String className;
    /**
     * 請求方法名稱
     */
    private String methodName;
    /**
     * 請求方法參數(shù)類型數(shù)組
     */
    private Class<?>[] parameterTypes;
    /**
     * 請求參數(shù)列表
     */
    private Object[] args;
  ......省略getter/setter
}

response響應實體類:

public class Response {
    private static final long serialVersionUID = -1023480952777229650L;

    private String requestId;
    /**
     * 響應狀態(tài)嗎
     */
    private int code;
    /**
     * 響應消息說明
     */
    private String msg;
    /**
     * 相應數(shù)據(jù)
     */
    private Object data;
......省略getter/setter
3、provider模塊

服務(wù)的暴露(包好服務(wù)的注冊和服務(wù)的發(fā)布),服務(wù)端基本流程是

服務(wù)注冊->服務(wù)發(fā)布->服務(wù)啟動監(jiān)聽請求(socket)->處理請求

//rpc代理服務(wù),用于暴露服務(wù)
public class RpcProxyServer {
    /**
     * 創(chuàng)建一個線程池
     */
    ExecutorService executorService = Executors.newCachedThreadPool();
    /**
     * 端口號
     */
    private int port;

    /**
     * 1、服務(wù)注冊
     * @param serviceInterface
     * @param impClass
     * @return
     */
    public RpcProxyServer register(Class serviceInterface, Class impClass) {
        //注冊服務(wù)(接口名:實現(xiàn)類名)
        ProcessorHandler.register(serviceInterface, impClass);
        return this;
    }

    public RpcProxyServer(int port) {
        this.port = port;
    }

    /**
     *2、 啟動發(fā)布(啟動)
     */
    public void start() {
        System.out.println("服務(wù)啟動====");
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);

            while (true) {//3、通過循環(huán)不斷接受請求
                Socket socket = serverSocket.accept();//監(jiān)聽客戶端的請求
                //4、每一個socket交給一個processorhandler處理,這里的target就是真正的業(yè)務(wù)類
                executorService.execute(new ProcessorHandler(socket));//處理客戶端的請求
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

具體處理請求的handler

//服務(wù)端接受請求處理線程
public class ProcessorHandler implements Runnable {

    private static final HashMap<String, Class<?>> serviceRegistry = new HashMap<String, Class<?>>();
    /**
     * socket
     */
    private Socket socket;

    public static void register(Class serviceInterface, Class impClass) {
        //注冊服務(wù)(接口名:實現(xiàn)類名)
        serviceRegistry.put(serviceInterface.getName(), impClass);
    }
    public ProcessorHandler(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        //用于定義輸入流和輸出流
        ObjectInputStream objectInputStream = null;
        ObjectOutputStream objectOutputStream = null;

        try {
            objectInputStream = new ObjectInputStream(socket.getInputStream());
            //從socket中讀取請求流對象
            Request rpcRequest = (Request) objectInputStream.readObject();
            //調(diào)用正真的處理方法
            Object result = invoke(rpcRequest);
            Response response = new Response();
            response.setRequestId(rpcRequest.getRequestId());
            response.setData(result);
            response.setMsg(ResponseCodeEnum.SUCCESS.getMsg());
            response.setCode(ResponseCodeEnum.SUCCESS.getCode());
            //將結(jié)果通過socket輸出
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(result);
            objectOutputStream.flush();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeOpenSource(objectInputStream, objectOutputStream);
        }
    }

    private void closeOpenSource(ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream) {
        if (objectInputStream != null) {
            try {
                objectInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (objectOutputStream != null) {
            try {
                objectOutputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (socket != null) {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 利用反射技術(shù)執(zhí)行正真的方法(這里只是簡單的實現(xiàn),沒有容錯處理)
     *
     * @param rpcRequest
     * @return
     */
    private Object invoke(Request rpcRequest) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException {
        //獲取目標對象并執(zhí)行目標方法(也就是獲取注冊后的接口實現(xiàn)類對象)
        Class<?> targetClass = serviceRegistry.get(rpcRequest.getClassName());
        Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
        Method method = targetClass.getMethod(rpcRequest.getMethodName(), parameterTypes);
        Object[] args = rpcRequest.getArgs();
        return method.invoke(targetClass.newInstance(), args);
    }
}

用到的枚舉

public enum ResponseCodeEnum {
    SUCCESS(0, "success"),
    FAIL(1, "fail");
....省略

rpc接口實現(xiàn)類:

public class HelloServiceImpl implements HelloService {
    public String sayHello(String name) {
        return "hello " + name;
    }
}

啟動服務(wù)main方法:

public class Demo1Main {
    public static void main(String[] args) {
        //創(chuàng)建代理服務(wù)
        RpcProxyServer rpcProxyServer = new RpcProxyServer(8888);
        //注冊服務(wù)
        rpcProxyServer.register(HelloService.class, HelloServiceImpl.class);
        //啟動服務(wù)
        rpcProxyServer.start();
    }
}
4、consumer模塊

消費端模塊主要是通過jdk動態(tài)代理的方式實現(xiàn)rpc接口代理請求遠程,基本流程

client->創(chuàng)建代理對象->通過代理對象請求遠程服務(wù)->接受返回的信息

public class ClientProxy<T> {
    /**
     * 服務(wù)端代理接口
     */
    private Class<T> serverInstance;

    /**
     * 服務(wù)端地址
     */
    private InetSocketAddress address;

    public ClientProxy(Class<T> serverInstance, String ip, Integer port) {
        this.address = new InetSocketAddress(ip, port);
        this.serverInstance = serverInstance;
    }

    /**
     * 獲取客戶端代理對象
     *
     * @return
     */
    public T getClientInstance() {
        return (T) Proxy.newProxyInstance(serverInstance.getClassLoader(), new Class<?>[]{serverInstance}, new RemoteInvocationHandler(address));
    }
}

具體遠程調(diào)用invoke方法(jdk動態(tài)代理InvocationHandler)

public class RemoteInvocationHandler implements InvocationHandler {

    /**
     * 服務(wù)端地址
     */
    private InetSocketAddress address;

    public RemoteInvocationHandler(InetSocketAddress address) {
        this.address=address;
    }


    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Request rpcRequest = new Request();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParameterTypes(method.getParameterTypes());
        rpcRequest.setArgs(args);

        //通過網(wǎng)絡(luò)發(fā)送正式請求
        RpcNetTransport netTransport = new RpcNetTransport(address.getPort(), address.getHostName());
        Object result = (Object) netTransport.send(rpcRequest);
        return result;//返回收到的結(jié)果
    }
}

具體的rpc網(wǎng)絡(luò)請求(socket)

//網(wǎng)絡(luò)傳送
public class RpcNetTransport {

    private int port;
    private String host;

    public RpcNetTransport(int port, String host) {
        this.port = port;
        this.host = host;
    }

    /**
     * 發(fā)送請求
     *
     * @param request
     */
    public Object send(Request request) throws IOException, ClassNotFoundException {
        Socket socket = null;
        ObjectOutputStream outputStream = null;
        ObjectInputStream inputStream = null;
        try {
            // 1.創(chuàng)建Socket客戶端,根據(jù)指定地址連接遠程服務(wù)提供者
            socket = new Socket(host, port);
            //2、將遠程服務(wù)調(diào)用所需的接口類、方法名、參數(shù)列表等編碼后發(fā)送給服務(wù)提供者
            outputStream=new ObjectOutputStream(socket.getOutputStream());
            outputStream.writeObject(request);
            //3、同步阻塞等待服務(wù)器返回應答,獲取應答后返回
            inputStream = new ObjectInputStream(socket.getInputStream());
            return inputStream.readObject();
        } finally {
            if (socket != null) {
                socket.close();
            }
            if (outputStream != null) {
                outputStream.close();
            }
            if (inputStream != null) {
                inputStream.close();
            }
        }
    }

}

消費端消費服務(wù)main方法:

public class Demo1Main {
    public static void main(String[] args) {
        ClientProxy clientProxy = new ClientProxy(HelloService.class, "127.0.0.1", 8888);
        HelloService helloService = (HelloService) clientProxy.getClientInstance();
        String result = helloService.sayHello("嘿嘿嘿");
        System.out.println(result);
    }
}

三、總結(jié)與思考

總結(jié):本節(jié)實現(xiàn)了一個非常簡單的rpc原型項目,包含了服務(wù)注冊、采用BIO的網(wǎng)絡(luò)通信模型傳送數(shù)據(jù)、采用jdk原生代理模式進行服務(wù)代理、采用jdk原生的序列化方式進行序列化和反序列化等。后續(xù)將會針對該原型項目不斷的改進,不斷的引入新的“武器”,來豐富整個rpc項目。
后期預熱:引入注冊中心(解決服務(wù)治理問題)、引入多種高效的序列化機制、引入NIO的網(wǎng)絡(luò)通信模型、引入軟負載均衡機制、引入spi擴展機制、接入spring等等,敬請期待。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文將從大的框架層面來聊聊RPC原理和實現(xiàn),既然叫跨語言RPC,也將以thrift為例講講跨語言RPC如何實現(xiàn)。在...
    彥幀閱讀 15,267評論 0 19
  • 前言 在微服務(wù)當?shù)赖慕裉?,分布式系統(tǒng)越來越重要,實現(xiàn)服務(wù)化首先就要考慮服務(wù)之間的通信問題。這里面涉及序列化、反序列...
    habit_learning閱讀 2,452評論 1 26
  • 網(wǎng)絡(luò)通信模塊是分布式系統(tǒng)中最底層的模塊,他直接支撐了上層分布式環(huán)境下復雜的進程間通信邏輯,是所有分布式系統(tǒng)的基礎(chǔ)。...
    SmallBird_閱讀 2,359評論 0 1
  • 國家電網(wǎng)公司企業(yè)標準(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報批稿:20170802 前言: 排版 ...
    庭說閱讀 12,302評論 6 13
  • 我想有時候我一定是瘋了,是被s逼瘋了
    砸扁回憶閱讀 162評論 0 0

友情鏈接更多精彩內(nèi)容