用netty實(shí)現(xiàn)rpc調(diào)用

netty是一個(gè)nio的框架,這個(gè)與原生的java的nio的不同之處在于原生的nio比較難以使用,通過netty可以不用了解底層的編寫,快速實(shí)現(xiàn)網(wǎng)絡(luò)編程,更快實(shí)現(xiàn)我們需要的業(yè)務(wù)//雖然我推薦多了解底層
netty的使用比較廣泛,比如游戲的數(shù)據(jù)包傳輸,還有rpc的調(diào)用,比如dubbo。因?yàn)閭€(gè)人傾向了解一樣?xùn)|西可以先從demo入手,因此決定寫一個(gè)簡(jiǎn)單的rpc調(diào)用來了解netty,show code!
先新建一個(gè)maven項(xiàng)目,在里面創(chuàng)建三個(gè)moudle,分別是start_interface,start_rpc_consumer和start_rpc_provider。在interface的pom.xml加依賴

  <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.16.Final</version>
        </dependency>

然后新建RPCClient作為客戶端的使用

package com.example.start_interface;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RPCClient {
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static RPCClientHandler client;


    /**
     * 創(chuàng)建一個(gè)代理對(duì)象
     */
    public Object createProxy(final Class<?> serviceClass, final String providerName
            , final RPCClientHandler rpcClientHandler, String ip, int port) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serviceClass}, (proxy, method, args) -> {
                    if (client == null) {
                        initClient(rpcClientHandler, ip, port);
                    }
                    

                    String param = providerName;
                    String[] params = param.split("#");

                    String className = params[0];
                    String methodName = params[1];
                    RPCParam rpcParam =new RPCParam();
                    rpcParam.setClazzName(className);
                    rpcParam.setMethodName(methodName);
                    rpcParam.initParams(args.length);
                    for (int i = 0; i < args.length; i++) {
                        rpcParam.getParams()[i]=args[i];
                        rpcParam.getParamTypes()[i] = args[i].getClass();
                    }
                    // 設(shè)置參數(shù)
                    client.setRpcParam(rpcParam);
                    return executor.submit(client).get();
                });
    }

    /**
     * 初始化客戶端
     */
    private static void initClient(RPCClientHandler rpcClientHandler, String ip, int port) {
        client = rpcClientHandler;
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new ObjectEncoder());
                        p.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                        p.addLast(client);
                    }
                });
        try {
            b.connect(ip, port).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

以及他的Handler

package com.example.start_interface;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

public class RPCClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    private ChannelHandlerContext context;
    //結(jié)果
    private Object result;
    //參數(shù)
    private RPCParam rpcParam;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        context = ctx;
    }

    /**
     * 收到服務(wù)端數(shù)據(jù),喚醒等待線程
     */
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) {
        result = msg;
        notify();
    }

    /**
     * 寫出數(shù)據(jù),開始等待喚醒
     */
    @Override
    public synchronized Object call() throws InterruptedException {
   
        context.writeAndFlush(rpcParam);
        //context.writeAndFlush("as");
        wait();
        return result;
    }

    public RPCParam getRpcParam() {
        return rpcParam;
    }

    public void setRpcParam(RPCParam rpcParam) {
        this.rpcParam = rpcParam;
    }
}

以及RPCServer

package com.example.start_interface;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class RPCServer {

    public static void startServer(String hostName, int port) {
        try {

            ServerBootstrap bootstrap = new ServerBootstrap();
            NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            bootstrap.group(eventLoopGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new ObjectEncoder());
                            p.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                            p.addLast(new RPCServerHandler());
                        }
                    });
            bootstrap.bind(hostName, port).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

以及RPCServerHandler

package com.example.start_interface;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public class RPCServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
       
        System.out.println(msg.toString());
        RPCParam rpcParam = (RPCParam) msg;
        String className =  rpcParam.getClazzName();
        String methodName = rpcParam.getMethodName();

        try {
            Class clazz = Class.forName(className);
            Object object = clazz.newInstance();

            Method method = object.getClass().getDeclaredMethod(methodName,rpcParam.getParamTypes());

            Object result = method.invoke(object,rpcParam.getParams());
            ctx.writeAndFlush(result);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }

    }
}

其中為了獲取參數(shù),我們需要自己寫一個(gè)param,來方便傳輸?shù)玫椒椒ê蛥?shù)

package com.example.start_interface;

import java.io.Serializable;

public class RPCParam implements Serializable {
    private String clazzName;
    private String methodName;
    private Object[] params;
    private Class<?>[] paramTypes;

    public String getClazzName() {
        return clazzName;
    }

    public void setClazzName(String clazzName) {
        this.clazzName = clazzName;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getParams() {
        return params;
    }

    public void setParams(Object[] params) {
        this.params = params;
    }

    public void initParams(int size){
        params = new Object[size];
        paramTypes = new Class[size];
    }

    public Class<?>[] getParamTypes() {
        return paramTypes;
    }

    public void setParamTypes(Class<?>[] paramTypes) {
        this.paramTypes = paramTypes;
    }
}

其實(shí)返回結(jié)果感覺也是應(yīng)該自己寫一個(gè)了,但是因?yàn)槭且粋€(gè)練手demo,就算了,然后再加一個(gè)interface

package com.example.start_interface;

public interface UserService {
    /**
     * helloWorld
     * @param word
     * @return
     */
    String sayHello(String word);

    String test(Integer i);
}

這時(shí)的整體框架算是搭建好了,這時(shí)候開始測(cè)試一下
這時(shí)候在consume包里實(shí)現(xiàn)調(diào)用

package com.example.start_rpc_consumer;

import com.example.start_interface.RPCClient;
import com.example.start_interface.RPCClientHandler;
import com.example.start_interface.UserService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StartRpcConsumerApplication {

    public static final String providerName = "com.example.start_rpc_provider.UserServiceImpl#test#";


    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(StartRpcConsumerApplication.class, args);

        RPCClient rpcClient = new RPCClient();
        UserService userService = (UserService) rpcClient.createProxy(UserService.class,providerName
                ,new RPCClientHandler(),"localhost", 8888);
        for (;;) {
            System.out.println("ok???");
            Thread.sleep(1000);
            System.out.println(userService.test(12));
            System.out.println("ok");
        }
    }

}

在provider包實(shí)現(xiàn)impl的類以及開啟rpc服務(wù)

package com.example.start_rpc_provider;

import com.example.start_interface.UserService;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class UserServiceImpl implements UserService {
    @Override
    public String sayHello(String word) {
        System.out.println("調(diào)用成功--參數(shù):" + word);
        return "調(diào)用成功--參數(shù):" + word;
    }

    @Override
    public String test(Integer i){
        return "canshu"+i;
    }

   
}

package com.example.start_rpc_provider;

import com.example.start_interface.RPCServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StartRpcProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(StartRpcProviderApplication.class, args);
        //UserServiceImpl.startServer("localhost", 8990);
        RPCServer.startServer("localhost", 8888);
    }

}

以下是輸出結(jié)果


image.png

其實(shí)在這個(gè)demo里面學(xué)到了不少知識(shí),除了粘包拆包以及netty的調(diào)用,還有就是動(dòng)態(tài)代理以及反射機(jī)制,其中比較有意思的是,這個(gè)其實(shí)并不支持基本數(shù)據(jù)類型,只支持object類,我在網(wǎng)上看到的比較有意思的解決方法是,1.默認(rèn)以后的參數(shù)都使用object類不使用基本數(shù)據(jù)類型。2寫一個(gè)工具把基本數(shù)據(jù)類型進(jìn)行裝箱,這個(gè)就以后有機(jī)會(huì)再玩玩啦。
這個(gè)是根據(jù)之前的一篇博客進(jìn)行改寫的,但是忘了那個(gè)網(wǎng)址了,希望有知道的童鞋可以告訴我然后我再加上出處

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 微服務(wù),已經(jīng)是每個(gè)互聯(lián)網(wǎng)開發(fā)者必須掌握的一項(xiàng)技術(shù)。而 RPC 框架,是構(gòu)成微服務(wù)最重要的組成部分之一。趁最近有時(shí)間...
    java菜閱讀 449評(píng)論 0 0
  • 最近幾天,工作忙,壓力大,心里不無抱怨。剛剛同事又打來電話,又出現(xiàn)了一些情況。心里默默想了該怎么應(yīng)對(duì),恍然間想到:...
    前公閱讀 277評(píng)論 0 0
  • 今天是2019年4月16日 周二 起床: 5:00 就寢:22:30 心情:愉快 任務(wù)清單 1.習(xí)慣養(yǎng)成 每日更文...
    渺塵03閱讀 567評(píng)論 4 8
  • 解決方案 npm install eslint-config-standard@next即可 參考Warning ...
    zackxizi閱讀 3,415評(píng)論 1 0
  • “春分,你立起過雞蛋嗎?”二十四節(jié)氣中的春分這一天太陽將會(huì)從正東升起,正西落下。 天文專家說,春分在天文學(xué)上是個(gè)重...
    豆豆花閱讀 944評(píng)論 0 1

友情鏈接更多精彩內(nèi)容