使用Netty實(shí)現(xiàn)一個(gè)簡(jiǎn)單的RPC

設(shè)計(jì)思路

RPC:遠(yuǎn)程過程調(diào)用,像是調(diào)用本地代碼一樣來調(diào)用遠(yuǎn)程的服務(wù)。所以一個(gè)簡(jiǎn)單的RPC至少包括兩個(gè)角色:

  1. 服務(wù)提供方
  2. 服務(wù)調(diào)用方

服務(wù)調(diào)用方像調(diào)用本地代碼一樣調(diào)用遠(yuǎn)程服務(wù),自然得有遠(yuǎn)程服務(wù)的接口信息。而服務(wù)提供方需要來實(shí)現(xiàn)接口提供服務(wù)返回到調(diào)用方。這樣問題就變成了:

  1. 服務(wù)調(diào)用方如何通過服務(wù)接口將請(qǐng)求給到服務(wù)提供方?
  2. 服務(wù)提供方如何將數(shù)據(jù)回傳到服務(wù)提供方?

先看第一個(gè)問題,因?yàn)榉?wù)在遠(yuǎn)程,我們可以告訴遠(yuǎn)程服務(wù)我們調(diào)用的是哪個(gè)接口,用的哪些參數(shù)就可以了。自然我們會(huì)想到使用動(dòng)態(tài)代理,使用代理增強(qiáng)來接口方法,達(dá)到傳輸接口名參數(shù)的目的。服務(wù)端接收到這些數(shù)據(jù)后通過反射來執(zhí)行。

Netty的簡(jiǎn)單描述

Netty是基于nio的網(wǎng)絡(luò)編程框架。java nio由來已久,不過用的不多,原因在于使用原生的nio進(jìn)行網(wǎng)絡(luò)編程上手難度有些大。Netty對(duì)原生nio做了封裝,原生的nio的核心組件selector來管理通道,而Netty則為Reactor。一個(gè)Reactor負(fù)責(zé)接收客戶端的請(qǐng)求,同時(shí)也負(fù)責(zé)將不同的客戶端請(qǐng)求放入到Channel中,這個(gè)便是Netty Reactor的單線程模型。最常用的還是主從模型:在多線程的基礎(chǔ)上,讓處理客戶端的線程也變?yōu)槎嗑€程,一組線程池接收請(qǐng)求,一組線程池處理IO。做到了前面多個(gè)服務(wù)員,后面多個(gè)廚子的模式。
Reactor模型最為核心的是兩個(gè)線程池,Netty使用 NioEventLoopGroup 來初始化線程池。一個(gè) NioEventLoopGroup 下包含多個(gè) NioEventLoop,NioEventLoop封裝了selector,用于注冊(cè)niochannel,每個(gè) NioChannel 都綁定有一個(gè)自己的 ChannelPipeline。
EventLoopGroup 提供 next 接口,可以從組里面按照一定規(guī)則獲取其中一個(gè) EventLoop來處理任務(wù)。在 Netty 服務(wù)器端編程中,我們一般都需要提供兩個(gè) EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
通常一個(gè)服務(wù)端口即一個(gè)ServerSocketChannel對(duì)應(yīng)一個(gè)Selector和一個(gè)EventLoop線程。BossEventLoop 負(fù)責(zé)接收客戶端的連接并將 SocketChannel 交給 WorkerEventLoopGroup 來進(jìn)行 IO 處理。BossEventLoopGroup 通常是一個(gè)單線程的 EventLoop,EventLoop 維護(hù)著一個(gè)注冊(cè)了ServerSocketChannel 的 Selector 實(shí)例,BossEventLoop 不斷輪詢 Selector 將連接事件分離出來,通常是 OP_ACCEPT 事件,然后將接收到的 SocketChannel 交給 WorkerEventLoopGroup,WorkerEventLoopGroup 會(huì)由 next 選擇其中一個(gè) EventLoopGroup 來將這個(gè) SocketChannel 注冊(cè)到其維護(hù)的 Selector 并對(duì)其后續(xù)的 IO 事件進(jìn)行處理。
另外一個(gè)比較重要的類便是ChannelHandler,ChannelHandler 接口定義了許多事件處理的方法,我們可以通過重寫這些方法去實(shí)現(xiàn)具體的業(yè)務(wù)邏輯。

代碼實(shí)現(xiàn)

服務(wù)端代碼:

public class NettyRPCServer {
    private static final int PORT = 9090;

    public void run(){
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap boot = new ServerBootstrap();
            boot.group(bossGroup, workerGroup).
                    channel(NioServerSocketChannel.class).
                    option(ChannelOption.SO_BACKLOG, 128).
                    childOption(ChannelOption.SO_KEEPALIVE, true).
                    localAddress(PORT).
                    childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //使用netty內(nèi)置的object編解碼器
                            //編碼器
                            pipeline.addLast("encoder", new ObjectEncoder());
                            //解碼器
                            pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(new InvokeHandler());
                        }
                    });
            ChannelFuture channelFuture = boot.bind(PORT).sync();
            System.out.println("server is ready");
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyRPCServer().run();
    }

}

InvokeHandler來實(shí)現(xiàn)反射調(diào)用:

/**
 * 服務(wù)端業(yè)務(wù)處理類
 */
public class InvokeHandler extends ChannelInboundHandlerAdapter {

    private static final String SERVER_PATH = "com.ucal.dc.nio.netty.rpc.server";

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ClassInfo cl = (ClassInfo) msg;
        Class impl = Class.forName(getImplClassName(cl));
        Method method = impl.getDeclaredMethod(cl.getMethod(), cl.getType());
        Object result = method.invoke(impl.newInstance(),cl.getObjects());
        //寫入到通道中
        ctx.writeAndFlush(result);
    }
    //通過接口找到實(shí)現(xiàn)類
    private String getImplClassName(ClassInfo classInfo) throws Exception {
        int i = classInfo.getClassName().lastIndexOf(".");
        String interfaceName = classInfo.getClassName().substring(i);
        Class service = Class.forName(SERVER_PATH + interfaceName);
        //使用Reflections框架
        Reflections reflections = new Reflections(SERVER_PATH);
        Set<Class> types = reflections.getSubTypesOf(service);
        if(types.size() == 0){
            throw new RuntimeException("未能找到服務(wù)實(shí)現(xiàn)類");
        }else if(types.size() > 1){
            throw new RuntimeException("找到多個(gè)服務(wù)實(shí)現(xiàn)類,未能明確使用哪一個(gè)");
        }else{
            Class c = types.iterator().next();
            return c.getName();//得到實(shí)現(xiàn)類的名字
        }
    }

}

客戶端代理類(使用ClassInfo類來封裝數(shù)據(jù)):

/**
 * 客戶端代理類
 */
public class NettyRPCProxy {
    private static final int PORT = 9090;
    //根據(jù)接口創(chuàng)建動(dòng)態(tài)代理對(duì)象
    public static Object create(Class target){
        return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
            @Override
            public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
                //封裝ClassInfo
                ClassInfo classInfo = new ClassInfo();
                classInfo.setClassName(target.getName());
                classInfo.setMethod(method.getName());
                classInfo.setObjects(objects);
                classInfo.setType(method.getParameterTypes());

                //開始使用netty發(fā)送數(shù)據(jù)
                NioEventLoopGroup group = new NioEventLoopGroup();
                Bootstrap bootstrap = new Bootstrap();
                ResultHandler resultHandler = new ResultHandler();
                try {
                    bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //編碼器
                            pipeline.addLast("encoder", new ObjectEncoder());
                            //解碼器
                            pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.cacheDisabled(null)));
                            //客戶端業(yè)務(wù)處理類
                            pipeline.addLast(resultHandler);
                        }
                    });
                    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", PORT).sync();
                    channelFuture.channel().writeAndFlush(classInfo).sync();
                    channelFuture.channel().closeFuture().sync();
                }finally {
                    group.shutdownGracefully();
                }
                return resultHandler.response;

            }
        });
    }
}

ClassInfo:

/**
 * 用于封裝類消息,用于數(shù)據(jù)傳輸
 */
public class ClassInfo implements Serializable {

    private static final long serialVersionUID = 9159351154097982580L;

    private String className; //類名
    private String method; //方法名
    private Class<?>[] type; //參數(shù)類型
    private Object[] objects;


    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethod() {
        return method;
    }

    public void setMethod(String method) {
        this.method = method;
    }

    public Class<?>[] getType() {
        return type;
    }

    public void setType(Class<?>[] type) {
        this.type = type;
    }

    public Object[] getObjects() {
        return objects;
    }

    public void setObjects(Object[] objects) {
        this.objects = objects;
    }
}

ResultHandler便是客戶端接收到消息的Handler:

/**
 * 客戶端接收消息處理器
 */
public class ResultHandler extends ChannelInboundHandlerAdapter {
    public Object response;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        response = msg;
        ctx.close();
    }

}

在客戶端有這么一個(gè)遠(yuǎn)程服務(wù)接口:

/**
 * 服務(wù)接口
 */
public interface HelloNetty {
    String hello();
}

而服務(wù)端有這么一個(gè)接口的實(shí)現(xiàn)類:

public class HelloNettyImpl implements HelloNetty {
    @Override
    public String hello() {
        return "hello netty";
    }
}

開始測(cè)試:先啟動(dòng)服務(wù)端,在客戶端使用NettyRPCProxy創(chuàng)建HelloNetty的代理對(duì)象,在調(diào)用代理對(duì)應(yīng)的接口方法的時(shí)候,實(shí)際上就是將類信息封裝成ClassInfo,通過netty傳輸?shù)椒?wù)端NettyRPCServer。

public class TestNettyRPC {
    public static void main(String[] args) {
        HelloNetty hn = (HelloNetty)NettyRPCProxy.create(HelloNetty.class);
        System.out.println(hn.hello());
    }
}

下面增加一個(gè)帶參的調(diào)用:

public interface HelloRPC {
    String hello(String name);
}

實(shí)現(xiàn)類:

public class HelloRPCImpl implements HelloRPC {
    @Override
    public String hello(String name) {
        return "hello "+name;
    }
}

測(cè)試一下:

public class TestNettyRPC {
    public static void main(String[] args) {
        HelloNetty hn = (HelloNetty)NettyRPCProxy.create(HelloNetty.class);
        System.out.println(hn.hello());

        HelloRPC hr = (HelloRPC)NettyRPCProxy.create(HelloRPC.class);
        System.out.println(hr.hello("susan"));

    }
}

輸出:

image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容