用Socket進行遠程調(diào)用
要調(diào)用的接口
public interface HelloService {
String hello(HelloObject object);
}
Java原生序列化方式
@Data
@AllArgsConstructor
public class HelloObject implements Serializable {
private Integer id;
private String message;
}
接口的實現(xiàn)類
public class HelloServiceImpl implements HelloService {
private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
public String hello(HelloObject object) {
logger.info("服務(wù)器收到調(diào)用信息:{}", object.getMessage());
return "客戶端收到相應(yīng)信息,id=" + object.getId();
}
}
將調(diào)用請求封裝起來
@Data
@Builder
public class RPCRequest implements Serializable {
//待調(diào)用接口名稱
private String interfaceName;
//待調(diào)用方法名稱
private String methodName;
//調(diào)用方法的參數(shù)
private Object[] parameters;
//調(diào)用方法的參數(shù)類型
private Class<?>[] paramTypes;
}
將響應(yīng)請求封裝起來
@Data
public class RPCResponse<T> implements Serializable {
//響應(yīng)狀態(tài)碼
private Integer statusCode;
//響應(yīng)狀態(tài)補充信息
private String message;
//響應(yīng)數(shù)據(jù)
private T data;
構(gòu)建一個客戶端對象,用來處理發(fā)送請求
使用try-with-resource來關(guān)閉資源
public class RPCClient {
private static final Logger logger = LoggerFactory.getLogger(RPCClient.class);
public Object sendRequest(RPCRequest rpcRequest,String host,int port){
try(Socket socket = new Socket(host,port)){
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(rpcRequest);
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
objectOutputStream.flush();
RPCResponse rpcResponse = (RPCResponse)objectInputStream.readObject();
return rpcResponse.getData();
}
catch (IOException | ClassNotFoundException e) {
logger.error("調(diào)用時發(fā)生錯誤:", e);
throw new RPCException("調(diào)用失敗: ", e);
}
}
}
遠程調(diào)用通過動態(tài)代理來實現(xiàn)
public class RPCClientProxy implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(RPCClientProxy.class);
private String host;
private int port;
public RPCClientProxy(String host, int port) {
this.host = host;
this.port = port;
}
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
logger.info("調(diào)用方法: {}#{}", method.getDeclaringClass().getName(), method.getName());
RPCRequest rpcRequest = RPCRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameters(args)
.paramTypes(method.getParameterTypes())
.build();
RPCClient rpcClient = new RPCClient();
return rpcClient.sendRequest(rpcRequest, host, port);
}
}
重寫invoke()方法,它在客戶端調(diào)用接口的時候生成請求對象,并發(fā)送出去,然后得到返回值
服務(wù)端接受服務(wù)
public interface ServiceRegistry {
<T> void register(T service);
Object getService(String serviceName);
}
接收到服務(wù)時,將服務(wù)加入Set集合中,用ConcurrentHashMap來保存服務(wù)實現(xiàn)的接口名稱。
public class ServiceRegistryImpl implements ServiceRegistry{
private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
private final Map<String,Object> serviceMap = new ConcurrentHashMap<>();
private final Set<String> registeredService = ConcurrentHashMap.newKeySet();
@Override
public synchronized <T> void register(T service) {
String serviceName = service.getClass().getCanonicalName();
if(registeredService.contains(serviceName))
return;
registeredService.add(serviceName);
Class<?>[] interfaces = service.getClass().getInterfaces();
if(interfaces.length == 0){
throw new RPCException(RPCError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
}
for(Class<?> i:interfaces){
serviceMap.put(i.getCanonicalName(),service);
}
logger.info("向接口 {} 注冊服務(wù) {}",interfaces,serviceName);
}
@Override
public synchronized Object getService(String serviceName) {
Object service = serviceMap.get(serviceName);
if(service == null)
throw new RPCException(RPCError.SERVICE_NOT_FOUND);
return service;
}
}
用一個線程接收需要提供服務(wù)的對象,把它交給處理器,處理完后發(fā)出去
public class RequestHandlerThread implements Runnable{
private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class);
private Socket socket;
private RequestHandler requestHandler;
private ServiceRegistry serviceRegistry;
public RequestHandlerThread(Socket socket, RequestHandler requestHandler, ServiceRegistry serviceRegistry) {
this.socket = socket;
this.requestHandler = requestHandler;
this.serviceRegistry = serviceRegistry;
}
@Override
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RPCRequest rpcRequest = (RPCRequest) objectInputStream.readObject();
String interfaceName = rpcRequest.getInterfaceName();
Object service = serviceRegistry.getService(interfaceName);
Object result = requestHandler.handle(rpcRequest, service);
objectOutputStream.writeObject(RPCResponse.success(result));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException e) {
logger.error("調(diào)用或發(fā)送時有錯誤發(fā)生:", e);
}
}
}
處理器通過反射,獲取服務(wù)需要執(zhí)行的方法并調(diào)用
public class RequestHandler {
private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);
public Object handle(RPCRequest rpcRequest,Object service){
Object result = null;
try{
result = invokeTargetMethod(rpcRequest, service);
logger.info("服務(wù):{} 成功調(diào)用方法:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
} catch (IllegalAccessException | InvocationTargetException e) {
logger.error("調(diào)用或發(fā)送時有錯誤發(fā)生:", e);
} return result;
}
private Object invokeTargetMethod(RPCRequest rpcRequest,Object service) throws InvocationTargetException, IllegalAccessException {
Method method;
try {
method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
} catch (NoSuchMethodException e) {
return RPCResponse.fail(ResponseCode.METHOD_NOT_FOUND);
}
return method.invoke(service, rpcRequest.getParameters());
}
}
服務(wù)端加入線程池
public class RPCServer {
private static final Logger logger = LoggerFactory.getLogger(RPCServer.class);
private static final int CORE_POOL_SIZE = 5;
private static final int MAXIMUM_POOL_SIZE = 50;
private static final int KEEP_ALIVE_TIME = 60;
private static final int BLOCKING_QUEUE_CAPACITY = 100;
private final ExecutorService threadPool;
private RequestHandler requestHandler = new RequestHandler();
private final ServiceRegistry serviceRegistry;
public RPCServer(ServiceRegistry serviceRegistry){
this.serviceRegistry = serviceRegistry;
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,MAXIMUM_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,workingQueue,threadFactory);
}
public void start(int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
logger.info("服務(wù)器啟動……");
Socket socket;
while((socket = serverSocket.accept()) != null) {
logger.info("消費者連接: {}:{}", socket.getInetAddress(), socket.getPort());
threadPool.execute(new RequestHandlerThread(socket, requestHandler, serviceRegistry));
}
threadPool.shutdown();
} catch (IOException e) {
logger.error("服務(wù)器啟動時有錯誤發(fā)生:", e);
}
}
}
啟動服務(wù)端的時候手動注冊服務(wù)
public class ServerTest {
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
ServiceRegistry serviceRegistry = new ServiceRegistryImpl();
serviceRegistry.register(helloService);
RPCServer rpcServer = new RPCServer(serviceRegistry);
rpcServer.start(9000);
}
}
然后啟動客戶端
public class ClientTest {
public static void main(String[] args) {
RPCClientProxy proxy = new RPCClientProxy("127.0.0.1", 9000);
HelloService helloService = proxy.getProxy(HelloService.class);
HelloObject object = new HelloObject(12, "這是客戶端傳來的消息");
String res = helloService.hello(object);
System.out.println(res);
}
}