設(shè)計(jì)思路
RPC:遠(yuǎn)程過程調(diào)用,像是調(diào)用本地代碼一樣來調(diào)用遠(yuǎn)程的服務(wù)。所以一個(gè)簡(jiǎn)單的RPC至少包括兩個(gè)角色:
- 服務(wù)提供方
- 服務(wù)調(diào)用方
服務(wù)調(diào)用方像調(diào)用本地代碼一樣調(diào)用遠(yuǎn)程服務(wù),自然得有遠(yuǎn)程服務(wù)的接口信息。而服務(wù)提供方需要來實(shí)現(xiàn)接口提供服務(wù)返回到調(diào)用方。這樣問題就變成了:
- 服務(wù)調(diào)用方如何通過服務(wù)接口將請(qǐng)求給到服務(wù)提供方?
- 服務(wù)提供方如何將數(shù)據(jù)回傳到服務(wù)提供方?
先看第一個(gè)問題,因?yàn)榉?wù)在遠(yuǎn)程,我們可以告訴遠(yuǎn)程服務(wù)我們調(diào)用的是哪個(gè)接口,用的哪些參數(shù)就可以了。自然我們會(huì)想到使用動(dòng)態(tài)代理,使用代理增強(qiáng)來接口方法,達(dá)到傳輸接口名參數(shù)的目的。服務(wù)端接收到這些數(shù)據(jù)后通過反射來執(zhí)行。
Netty的簡(jiǎn)單描述
Netty是基于nio的網(wǎng)絡(luò)編程框架。java nio由來已久,不過用的不多,原因在于使用原生的nio進(jìn)行網(wǎng)絡(luò)編程上手難度有些大。Netty對(duì)原生nio做了封裝,原生的nio的核心組件selector來管理通道,而Netty則為Reactor。一個(gè)Reactor負(fù)責(zé)接收客戶端的請(qǐng)求,同時(shí)也負(fù)責(zé)將不同的客戶端請(qǐng)求放入到Channel中,這個(gè)便是Netty Reactor的單線程模型。最常用的還是主從模型:在多線程的基礎(chǔ)上,讓處理客戶端的線程也變?yōu)槎嗑€程,一組線程池接收請(qǐng)求,一組線程池處理IO。做到了前面多個(gè)服務(wù)員,后面多個(gè)廚子的模式。
Reactor模型最為核心的是兩個(gè)線程池,Netty使用 NioEventLoopGroup 來初始化線程池。一個(gè) NioEventLoopGroup 下包含多個(gè) NioEventLoop,NioEventLoop封裝了selector,用于注冊(cè)niochannel,每個(gè) NioChannel 都綁定有一個(gè)自己的 ChannelPipeline。
EventLoopGroup 提供 next 接口,可以從組里面按照一定規(guī)則獲取其中一個(gè) EventLoop來處理任務(wù)。在 Netty 服務(wù)器端編程中,我們一般都需要提供兩個(gè) EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
通常一個(gè)服務(wù)端口即一個(gè)ServerSocketChannel對(duì)應(yīng)一個(gè)Selector和一個(gè)EventLoop線程。BossEventLoop 負(fù)責(zé)接收客戶端的連接并將 SocketChannel 交給 WorkerEventLoopGroup 來進(jìn)行 IO 處理。BossEventLoopGroup 通常是一個(gè)單線程的 EventLoop,EventLoop 維護(hù)著一個(gè)注冊(cè)了ServerSocketChannel 的 Selector 實(shí)例,BossEventLoop 不斷輪詢 Selector 將連接事件分離出來,通常是 OP_ACCEPT 事件,然后將接收到的 SocketChannel 交給 WorkerEventLoopGroup,WorkerEventLoopGroup 會(huì)由 next 選擇其中一個(gè) EventLoopGroup 來將這個(gè) SocketChannel 注冊(cè)到其維護(hù)的 Selector 并對(duì)其后續(xù)的 IO 事件進(jìn)行處理。
另外一個(gè)比較重要的類便是ChannelHandler,ChannelHandler 接口定義了許多事件處理的方法,我們可以通過重寫這些方法去實(shí)現(xiàn)具體的業(yè)務(wù)邏輯。
代碼實(shí)現(xiàn)
服務(wù)端代碼:
public class NettyRPCServer {
private static final int PORT = 9090;
public void run(){
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap boot = new ServerBootstrap();
boot.group(bossGroup, workerGroup).
channel(NioServerSocketChannel.class).
option(ChannelOption.SO_BACKLOG, 128).
childOption(ChannelOption.SO_KEEPALIVE, true).
localAddress(PORT).
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//使用netty內(nèi)置的object編解碼器
//編碼器
pipeline.addLast("encoder", new ObjectEncoder());
//解碼器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new InvokeHandler());
}
});
ChannelFuture channelFuture = boot.bind(PORT).sync();
System.out.println("server is ready");
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new NettyRPCServer().run();
}
}
InvokeHandler來實(shí)現(xiàn)反射調(diào)用:
/**
* 服務(wù)端業(yè)務(wù)處理類
*/
public class InvokeHandler extends ChannelInboundHandlerAdapter {
private static final String SERVER_PATH = "com.ucal.dc.nio.netty.rpc.server";
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ClassInfo cl = (ClassInfo) msg;
Class impl = Class.forName(getImplClassName(cl));
Method method = impl.getDeclaredMethod(cl.getMethod(), cl.getType());
Object result = method.invoke(impl.newInstance(),cl.getObjects());
//寫入到通道中
ctx.writeAndFlush(result);
}
//通過接口找到實(shí)現(xiàn)類
private String getImplClassName(ClassInfo classInfo) throws Exception {
int i = classInfo.getClassName().lastIndexOf(".");
String interfaceName = classInfo.getClassName().substring(i);
Class service = Class.forName(SERVER_PATH + interfaceName);
//使用Reflections框架
Reflections reflections = new Reflections(SERVER_PATH);
Set<Class> types = reflections.getSubTypesOf(service);
if(types.size() == 0){
throw new RuntimeException("未能找到服務(wù)實(shí)現(xiàn)類");
}else if(types.size() > 1){
throw new RuntimeException("找到多個(gè)服務(wù)實(shí)現(xiàn)類,未能明確使用哪一個(gè)");
}else{
Class c = types.iterator().next();
return c.getName();//得到實(shí)現(xiàn)類的名字
}
}
}
客戶端代理類(使用ClassInfo類來封裝數(shù)據(jù)):
/**
* 客戶端代理類
*/
public class NettyRPCProxy {
private static final int PORT = 9090;
//根據(jù)接口創(chuàng)建動(dòng)態(tài)代理對(duì)象
public static Object create(Class target){
return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
@Override
public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
//封裝ClassInfo
ClassInfo classInfo = new ClassInfo();
classInfo.setClassName(target.getName());
classInfo.setMethod(method.getName());
classInfo.setObjects(objects);
classInfo.setType(method.getParameterTypes());
//開始使用netty發(fā)送數(shù)據(jù)
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
ResultHandler resultHandler = new ResultHandler();
try {
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//編碼器
pipeline.addLast("encoder", new ObjectEncoder());
//解碼器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
//客戶端業(yè)務(wù)處理類
pipeline.addLast(resultHandler);
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", PORT).sync();
channelFuture.channel().writeAndFlush(classInfo).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
return resultHandler.response;
}
});
}
}
ClassInfo:
/**
* 用于封裝類消息,用于數(shù)據(jù)傳輸
*/
public class ClassInfo implements Serializable {
private static final long serialVersionUID = 9159351154097982580L;
private String className; //類名
private String method; //方法名
private Class<?>[] type; //參數(shù)類型
private Object[] objects;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public Class<?>[] getType() {
return type;
}
public void setType(Class<?>[] type) {
this.type = type;
}
public Object[] getObjects() {
return objects;
}
public void setObjects(Object[] objects) {
this.objects = objects;
}
}
ResultHandler便是客戶端接收到消息的Handler:
/**
* 客戶端接收消息處理器
*/
public class ResultHandler extends ChannelInboundHandlerAdapter {
public Object response;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
ctx.close();
}
}
在客戶端有這么一個(gè)遠(yuǎn)程服務(wù)接口:
/**
* 服務(wù)接口
*/
public interface HelloNetty {
String hello();
}
而服務(wù)端有這么一個(gè)接口的實(shí)現(xiàn)類:
public class HelloNettyImpl implements HelloNetty {
@Override
public String hello() {
return "hello netty";
}
}
開始測(cè)試:先啟動(dòng)服務(wù)端,在客戶端使用NettyRPCProxy創(chuàng)建HelloNetty的代理對(duì)象,在調(diào)用代理對(duì)應(yīng)的接口方法的時(shí)候,實(shí)際上就是將類信息封裝成ClassInfo,通過netty傳輸?shù)椒?wù)端NettyRPCServer。
public class TestNettyRPC {
public static void main(String[] args) {
HelloNetty hn = (HelloNetty)NettyRPCProxy.create(HelloNetty.class);
System.out.println(hn.hello());
}
}
下面增加一個(gè)帶參的調(diào)用:
public interface HelloRPC {
String hello(String name);
}
實(shí)現(xiàn)類:
public class HelloRPCImpl implements HelloRPC {
@Override
public String hello(String name) {
return "hello "+name;
}
}
測(cè)試一下:
public class TestNettyRPC {
public static void main(String[] args) {
HelloNetty hn = (HelloNetty)NettyRPCProxy.create(HelloNetty.class);
System.out.println(hn.hello());
HelloRPC hr = (HelloRPC)NettyRPCProxy.create(HelloRPC.class);
System.out.println(hr.hello("susan"));
}
}
輸出:
