用Socket進行遠程調(diào)用

用Socket進行遠程調(diào)用

要調(diào)用的接口

public interface HelloService {
    String hello(HelloObject object);
}

Java原生序列化方式

@Data
@AllArgsConstructor
public class HelloObject implements Serializable {
    private Integer id;
    private String message;
}

接口的實現(xiàn)類

public class HelloServiceImpl implements HelloService {
    private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
    public String hello(HelloObject object) {
        logger.info("服務(wù)器收到調(diào)用信息:{}", object.getMessage());
        return "客戶端收到相應(yīng)信息,id=" + object.getId();
    }
}

將調(diào)用請求封裝起來


@Data
@Builder
public class RPCRequest implements Serializable {
    //待調(diào)用接口名稱
    private String interfaceName;

    //待調(diào)用方法名稱
    private String methodName;

    //調(diào)用方法的參數(shù)
    private Object[] parameters;

    //調(diào)用方法的參數(shù)類型
    private Class<?>[] paramTypes;

}

將響應(yīng)請求封裝起來

@Data
public class RPCResponse<T> implements Serializable {
    //響應(yīng)狀態(tài)碼
    private Integer statusCode;

    //響應(yīng)狀態(tài)補充信息
    private String message;

    //響應(yīng)數(shù)據(jù)
    private T data;

構(gòu)建一個客戶端對象,用來處理發(fā)送請求
使用try-with-resource來關(guān)閉資源

public class RPCClient {
    private static final Logger logger = LoggerFactory.getLogger(RPCClient.class);
    public Object sendRequest(RPCRequest rpcRequest,String host,int port){
        try(Socket socket = new Socket(host,port)){

            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(rpcRequest);
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());

            objectOutputStream.flush();
            RPCResponse rpcResponse = (RPCResponse)objectInputStream.readObject();
            
            return rpcResponse.getData();

        }
        catch (IOException | ClassNotFoundException e) {
            logger.error("調(diào)用時發(fā)生錯誤:", e);
            throw new RPCException("調(diào)用失敗: ", e);
        }
    }
}

            

遠程調(diào)用通過動態(tài)代理來實現(xiàn)

public class RPCClientProxy implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(RPCClientProxy.class);
    private String host;
    private int port;
    public RPCClientProxy(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public <T> T getProxy(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        logger.info("調(diào)用方法: {}#{}", method.getDeclaringClass().getName(), method.getName());
        RPCRequest rpcRequest = RPCRequest.builder()
                .interfaceName(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .parameters(args)
                .paramTypes(method.getParameterTypes())
                .build();
        RPCClient rpcClient = new RPCClient();
        return rpcClient.sendRequest(rpcRequest, host, port);
    }

}

重寫invoke()方法,它在客戶端調(diào)用接口的時候生成請求對象,并發(fā)送出去,然后得到返回值

服務(wù)端接受服務(wù)

public interface ServiceRegistry {
    <T> void register(T service);

    Object getService(String serviceName);
}

接收到服務(wù)時,將服務(wù)加入Set集合中,用ConcurrentHashMap來保存服務(wù)實現(xiàn)的接口名稱。

public class ServiceRegistryImpl implements ServiceRegistry{
    private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
    private final Map<String,Object> serviceMap = new ConcurrentHashMap<>();
    private final Set<String> registeredService = ConcurrentHashMap.newKeySet();

    @Override
    public synchronized <T> void register(T service) {
        String serviceName = service.getClass().getCanonicalName();
        if(registeredService.contains(serviceName))
            return;
        registeredService.add(serviceName);
        Class<?>[] interfaces = service.getClass().getInterfaces();
        if(interfaces.length == 0){
            throw new RPCException(RPCError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
        }
        for(Class<?> i:interfaces){
            serviceMap.put(i.getCanonicalName(),service);
        }
        logger.info("向接口 {} 注冊服務(wù) {}",interfaces,serviceName);

    }

    @Override
    public synchronized Object getService(String serviceName) {
        Object service = serviceMap.get(serviceName);
        if(service == null)
            throw new RPCException(RPCError.SERVICE_NOT_FOUND);
        return service;
    }
}

用一個線程接收需要提供服務(wù)的對象,把它交給處理器,處理完后發(fā)出去

public class RequestHandlerThread implements Runnable{
    private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class);

    private Socket socket;
    private RequestHandler requestHandler;
    private ServiceRegistry serviceRegistry;

    public RequestHandlerThread(Socket socket, RequestHandler requestHandler, ServiceRegistry serviceRegistry) {
        this.socket = socket;
        this.requestHandler = requestHandler;
        this.serviceRegistry = serviceRegistry;
    }
    @Override
    public void run() {
        try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
            RPCRequest rpcRequest = (RPCRequest) objectInputStream.readObject();
            String interfaceName = rpcRequest.getInterfaceName();
            Object service = serviceRegistry.getService(interfaceName);
            Object result = requestHandler.handle(rpcRequest, service);
            objectOutputStream.writeObject(RPCResponse.success(result));
            objectOutputStream.flush();
        } catch (IOException | ClassNotFoundException e) {
            logger.error("調(diào)用或發(fā)送時有錯誤發(fā)生:", e);
        }
    }
}

處理器通過反射,獲取服務(wù)需要執(zhí)行的方法并調(diào)用

public class RequestHandler {
    private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);
    public Object handle(RPCRequest rpcRequest,Object service){
        Object result = null;
        try{
            result = invokeTargetMethod(rpcRequest, service);
            logger.info("服務(wù):{} 成功調(diào)用方法:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
        } catch (IllegalAccessException | InvocationTargetException e) {
            logger.error("調(diào)用或發(fā)送時有錯誤發(fā)生:", e);
        } return result;
    }


    private Object invokeTargetMethod(RPCRequest rpcRequest,Object service) throws InvocationTargetException, IllegalAccessException {
        Method method;
        try {
            method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
        } catch (NoSuchMethodException e) {
            return RPCResponse.fail(ResponseCode.METHOD_NOT_FOUND);
        }
        return method.invoke(service, rpcRequest.getParameters());

    }
}

服務(wù)端加入線程池

public class RPCServer {

    private static final Logger logger = LoggerFactory.getLogger(RPCServer.class);
    private static final int CORE_POOL_SIZE = 5;
    private static final int MAXIMUM_POOL_SIZE = 50;
    private static final int KEEP_ALIVE_TIME = 60;
    private static final int BLOCKING_QUEUE_CAPACITY = 100;
    private final ExecutorService threadPool;
    private RequestHandler requestHandler = new RequestHandler();
    private final ServiceRegistry serviceRegistry;

    public RPCServer(ServiceRegistry serviceRegistry){
        this.serviceRegistry = serviceRegistry;
        BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,MAXIMUM_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,workingQueue,threadFactory);

    }
    public void start(int port) {
        try (ServerSocket serverSocket = new ServerSocket(port)) {
            logger.info("服務(wù)器啟動……");
            Socket socket;
            while((socket = serverSocket.accept()) != null) {
                logger.info("消費者連接: {}:{}", socket.getInetAddress(), socket.getPort());
                threadPool.execute(new RequestHandlerThread(socket, requestHandler, serviceRegistry));
            }
            threadPool.shutdown();
        } catch (IOException e) {
            logger.error("服務(wù)器啟動時有錯誤發(fā)生:", e);
        }
    }


}

啟動服務(wù)端的時候手動注冊服務(wù)

public class ServerTest {
    public static void main(String[] args) {
        HelloService helloService = new HelloServiceImpl();
        ServiceRegistry serviceRegistry = new ServiceRegistryImpl();
        serviceRegistry.register(helloService);
        RPCServer rpcServer = new RPCServer(serviceRegistry);
        rpcServer.start(9000);

    }
}

然后啟動客戶端

public class ClientTest {
    public static void main(String[] args) {
        RPCClientProxy proxy = new RPCClientProxy("127.0.0.1", 9000);
        HelloService helloService = proxy.getProxy(HelloService.class);
        HelloObject object = new HelloObject(12, "這是客戶端傳來的消息");
        String res = helloService.hello(object);
        System.out.println(res);

    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容