定義RPC協(xié)議
IProxyProtocol類
public interface IProxyProtocol extends VersionedProtocol {
static final long versionID= 1L;
int Add(int number1, int number2);
}
- Hadoop中所有自定義RPC接口都需要繼承VersionedProtocol接口,它描述了協(xié)議的版本信息
- 默認(rèn)情況下,不同版本號的RPC Client和Server之間不能互相通信,因此客戶端和服務(wù)端通過版本號標(biāo)識
實現(xiàn)RPC協(xié)議
MyProxy類
public class MyProxy implements IProxyProtocol{
@Override
public int Add(int number1, int number2) {
System.out.println("被調(diào)用了");
int result = number1 + number2;
return result;
}
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
System.out.println("MyProxy.ProtocolVersion=" + IProxyProtocol.versionID);
return IProxyProtocol.versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
return null;
}
}
構(gòu)造RPC Server并啟動服務(wù)
MyServer類
public class MyServer {
public static int PORT = 5433;
public static String IPAddress = "127.0.0.1";
public static void main(String[] args) throws IOException {
MyProxy proxy = new MyProxy();
Configuration conf = new Configuration();
Server server = new RPC.Builder(conf).setProtocol(IProxyProtocol.class)
.setInstance(new MyProxy()).setBindAddress(IPAddress).setPort(PORT)
.build();
server.start();
}
}
構(gòu)造RPC Client并啟動客戶端
MyClient類
public class MyClient {
public static void main(String[] args) {
InetSocketAddress inetSocketAddress = new InetSocketAddress(MyServer.IPAddress, MyServer.PORT);
try {
IProxyProtocol proxy = RPC.waitForProxy(IProxyProtocol.class, IProxyProtocol.versionID, inetSocketAddress,
new Configuration());
int result = proxy.Add(10, 25);
System.out.println("10+25=" + result);
RPC.stopProxy(proxy);
} catch (IOException e) {
e.printStackTrace();
}
}
}