RPC基本原理以及如何用Netty來實(shí)現(xiàn)RPC

前言

在微服務(wù)大行其道的今天,分布式系統(tǒng)越來越重要,實(shí)現(xiàn)服務(wù)化首先就要考慮服務(wù)之間的通信問題。這里面涉及序列化、反序列化、尋址、連接等等問題。。不過,有了RPC框架,我們就無需苦惱。

一、什么是RPC?

RPC(Remote Procedure Call)— 遠(yuǎn)程過程調(diào)用,是一個(gè)計(jì)算機(jī)通信協(xié)議。該協(xié)議允許運(yùn)行于一臺(tái)計(jì)算機(jī)的程序調(diào)用另一臺(tái)計(jì)算機(jī)的子程序,而程序員無需額外地為這個(gè)交互作用編程。

值得注意是,兩個(gè)或多個(gè)應(yīng)用程序都分布在不同的服務(wù)器上,它們之間的調(diào)用都像是本地方法調(diào)用一樣。


RPC遠(yuǎn)程過程調(diào)用

RPC框架有很多,比較知名的如阿里的Dubbo、google的gRPC、Go語言的rpcx、Apache的thrift。當(dāng)然了,還有Spring Cloud,不過對(duì)于Spring Cloud來說,RPC只是它的一個(gè)功能模塊。

復(fù)雜的先不講,如果要實(shí)現(xiàn)一個(gè)基本功能、簡(jiǎn)單的RPC,要涉及哪些東西呢?

  • 動(dòng)態(tài)代理
  • 反射
  • 序列化、反序列化
  • 網(wǎng)絡(luò)通信
  • 編解碼
  • 服務(wù)發(fā)現(xiàn)和注冊(cè)
  • 心跳與鏈路檢測(cè)
  • ......

下面我們一起通過代碼來分析,怎么把這些技術(shù)點(diǎn)串到一起,實(shí)現(xiàn)我們自己的RPC。

二、環(huán)境準(zhǔn)備

在開始之前,筆者先介紹一下所用到的軟件環(huán)境。

SpringBoot、Netty、zookeeper、zkclient、fastjson

  • SpringBoot
    項(xiàng)目的基礎(chǔ)框架,方便打成JAR包,便于測(cè)試。
  • Netty
    通信服務(wù)器
  • zookeeper
    服務(wù)的發(fā)現(xiàn)與注冊(cè)
  • zkclient
    zookeeper客戶端
  • fastjson
    序列化、反序列化

三、RPC生產(chǎn)者

1、服務(wù)接口API

整個(gè)RPC,我們分為生產(chǎn)者和消費(fèi)者。首先它們有一個(gè)共同的服務(wù)接口API。在這里,我們搞一個(gè)操作用戶信息的service接口。

public interface InfoUserService {
    List<InfoUser> insertInfoUser(InfoUser infoUser);
    InfoUser getInfoUserById(String id);
    void deleteInfoUserById(String id);
    String getNameById(String id);
    Map<String,InfoUser> getAllUser();
}

2、服務(wù)類實(shí)現(xiàn)

作為生產(chǎn)者,它當(dāng)然要有實(shí)現(xiàn)類,我們創(chuàng)建InfoUserServiceImpl實(shí)現(xiàn)類,并用注解把它標(biāo)注為RPC的服務(wù),然后注冊(cè)到Spring的Bean容器中。在這里,我們把infoUserMap當(dāng)做數(shù)據(jù)庫,存儲(chǔ)用戶信息。

package com.viewscenes.netsupervisor.service.impl;

@RpcService
public class InfoUserServiceImpl implements InfoUserService {

    Logger logger = LoggerFactory.getLogger(this.getClass());
    //當(dāng)做數(shù)據(jù)庫,存儲(chǔ)用戶信息
    Map<String,InfoUser> infoUserMap = new HashMap<>();

    public List<InfoUser> insertInfoUser(InfoUser infoUser) {
        logger.info("新增用戶信息:{}", JSONObject.toJSONString(infoUser));
        infoUserMap.put(infoUser.getId(),infoUser);
        return getInfoUserList();
    }
    public InfoUser getInfoUserById(String id) {
        InfoUser infoUser = infoUserMap.get(id);
        logger.info("查詢用戶ID:{}",id);
        return infoUser;
    }

    public List<InfoUser> getInfoUserList() {
        List<InfoUser> userList = new ArrayList<>();
        Iterator<Map.Entry<String, InfoUser>> iterator = infoUserMap.entrySet().iterator();
        while (iterator.hasNext()){
            Map.Entry<String, InfoUser> next = iterator.next();
            userList.add(next.getValue());
        }
        logger.info("返回用戶信息記錄數(shù):{}",userList.size());
        return userList;
    }
    public void deleteInfoUserById(String id) {
        logger.info("刪除用戶信息:{}",JSONObject.toJSONString(infoUserMap.remove(id)));
    }
    public String getNameById(String id){
        logger.info("根據(jù)ID查詢用戶名稱:{}",id);
        return infoUserMap.get(id).getName();
    }
    public Map<String,InfoUser> getAllUser(){
        logger.info("查詢所有用戶信息{}",infoUserMap.keySet().size());
        return infoUserMap;
    }
}

元注解定義如下:

package com.viewscenes.netsupervisor.annotation;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {}

3、請(qǐng)求信息和返回信息

所有的請(qǐng)求信息和返回信息,我們用兩個(gè)JavaBean來表示。其中的重點(diǎn)是,返回信息要帶有請(qǐng)求信息的ID。

package com.viewscenes.netsupervisor.entity;
public class Request {
    private String id;
    private String className;// 類名
    private String methodName;// 函數(shù)名稱
    private Class<?>[] parameterTypes;// 參數(shù)類型
    private Object[] parameters;// 參數(shù)列表
    get/set ...
}

package com.viewscenes.netsupervisor.entity;
public class Response {
    private String requestId;
    private int code;
    private String error_msg;
    private Object data;
    get/set ...
}

4、Netty服務(wù)端

Netty作為高性能的NIO通信框架,在很多RPC框架中都有它的身影。我們也采用它當(dāng)做通信服務(wù)器。說到這,我們先看個(gè)配置文件,重點(diǎn)有兩個(gè),zookeeper的注冊(cè)地址和Netty通信服務(wù)器的地址。

TOMCAT端口
server.port=8001
#zookeeper注冊(cè)地址
registry.address=192.168.245.131:2181,192.168.245.131:2182,192.168.245.131:2183
#RPC服務(wù)提供者地址
rpc.server.address=192.168.197.1:18868

為了方便管理,我們把它也注冊(cè)成Bean,同時(shí)實(shí)現(xiàn)ApplicationContextAware接口,把上面@RpcService注解的服務(wù)類撈出來,緩存起來,供消費(fèi)者調(diào)用。同時(shí),作為服務(wù)器,還要對(duì)客戶端的鏈路進(jìn)行心跳檢測(cè),超過60秒未讀寫數(shù)據(jù),關(guān)閉此連接。

package com.viewscenes.netsupervisor.netty.server;
@Component
public class NettyServer implements ApplicationContextAware,InitializingBean{

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    private static final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private static final EventLoopGroup workerGroup = new NioEventLoopGroup(4);

    private Map<String, Object> serviceMap = new HashMap<>();

    @Value("${rpc.server.address}")
    private String serverAddress;

    @Autowired
    ServiceRegistry registry;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcService.class);
        for(Object serviceBean:beans.values()){
            Class<?> clazz = serviceBean.getClass();
            Class<?>[] interfaces = clazz.getInterfaces();
            for (Class<?> inter : interfaces){
                String interfaceName = inter.getName();
                logger.info("加載服務(wù)類: {}", interfaceName);
                serviceMap.put(interfaceName, serviceBean);
            }
        }
        logger.info("已加載全部服務(wù)接口:{}", serviceMap);
    }
    public void afterPropertiesSet() throws Exception {
        start();
    }
    public void start(){
        final NettyServerHandler handler = new NettyServerHandler(serviceMap);
        new Thread(() -> {
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup,workerGroup).
                        channel(NioServerSocketChannel.class).
                        option(ChannelOption.SO_BACKLOG,1024).
                        childOption(ChannelOption.SO_KEEPALIVE,true).
                        childOption(ChannelOption.TCP_NODELAY,true).
                        childHandler(new ChannelInitializer<SocketChannel>() {
                            //創(chuàng)建NIOSocketChannel成功后,在進(jìn)行初始化時(shí),將它的ChannelHandler設(shè)置到ChannelPipeline中,用于處理網(wǎng)絡(luò)IO事件
                            protected void initChannel(SocketChannel channel) throws Exception {
                                ChannelPipeline pipeline = channel.pipeline();
                                pipeline.addLast(new IdleStateHandler(0, 0, 60));
                                pipeline.addLast(new JSONEncoder());
                                pipeline.addLast(new JSONDecoder());
                                pipeline.addLast(handler);
                            }
                        });
                String[] array = serverAddress.split(":");
                String host = array[0];
                int port = Integer.parseInt(array[1]);
                ChannelFuture cf = bootstrap.bind(host,port).sync();
                logger.info("RPC 服務(wù)器啟動(dòng).監(jiān)聽端口:"+port);
                registry.register(serverAddress);
                //等待服務(wù)端監(jiān)聽端口關(guān)閉
                cf.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }).start();
    }
}

上面的代碼就把Netty服務(wù)器啟動(dòng)了,在處理器中的構(gòu)造函數(shù)中,我們先把服務(wù)Bean的Map傳進(jìn)來,所有的處理要基于這個(gè)Map才能找到對(duì)應(yīng)的實(shí)現(xiàn)類。在channelRead中,獲取請(qǐng)求方法的信息,然后通過反射調(diào)用方法獲取返回值。

package com.viewscenes.netsupervisor.netty.server;
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
    private final Map<String, Object> serviceMap;

    public NettyServerHandler(Map<String, Object> serviceMap) {
        this.serviceMap = serviceMap;
    }
    public void channelActive(ChannelHandlerContext ctx)   {
        logger.info("客戶端連接成功!"+ctx.channel().remoteAddress());
    }
    public void channelInactive(ChannelHandlerContext ctx)   {
        logger.info("客戶端斷開連接!{}",ctx.channel().remoteAddress());
        ctx.channel().close();
    }
    public void channelRead(ChannelHandlerContext ctx, Object msg)   {
        Request request = JSON.parseObject(msg.toString(),Request.class);

        if ("heartBeat".equals(request.getMethodName())) {
            logger.info("客戶端心跳信息..."+ctx.channel().remoteAddress());
        }else{
            logger.info("RPC客戶端請(qǐng)求接口:"+request.getClassName()+"   方法名:"+request.getMethodName());
            Response response = new Response();
            response.setRequestId(request.getId());
            try {
                Object result = this.handler(request);
                response.setData(result);
            } catch (Throwable e) {
                e.printStackTrace();
                response.setCode(1);
                response.setError_msg(e.toString());
                logger.error("RPC Server handle request error",e);
            }
            ctx.writeAndFlush(response);
        }
    }
    /**
     * 通過反射,執(zhí)行本地方法
     * @param request
     * @return
     * @throws Throwable
     */
    private Object handler(Request request) throws Throwable{
        String className = request.getClassName();
        Object serviceBean = serviceMap.get(className);

        if (serviceBean!=null){
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = request.getMethodName();
            Class<?>[] parameterTypes = request.getParameterTypes();
            Object[] parameters = request.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            return method.invoke(serviceBean, getParameters(parameterTypes,parameters));
        }else{
            throw new Exception("未找到服務(wù)接口,請(qǐng)檢查配置!:"+className+"#"+request.getMethodName());
        }
    }
    /**
     * 獲取參數(shù)列表
     * @param parameterTypes
     * @param parameters
     * @return
     */
    private Object[] getParameters(Class<?>[] parameterTypes,Object[] parameters){
        if (parameters==null || parameters.length==0){
            return parameters;
        }else{
            Object[] new_parameters = new Object[parameters.length];
            for(int i=0;i<parameters.length;i++){
                new_parameters[i] = JSON.parseObject(parameters[i].toString(),parameterTypes[i]);
            }
            return new_parameters;
        }
    }
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.ALL_IDLE){
                logger.info("客戶端已超過60秒未讀寫數(shù)據(jù),關(guān)閉連接.{}",ctx.channel().remoteAddress());
                ctx.channel().close();
            }
        }else{
            super.userEventTriggered(ctx,evt);
        }
    }
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)   {
        logger.info(cause.getMessage());
        ctx.close();
    }
}

4、服務(wù)注冊(cè)

我們啟動(dòng)了Netty通信服務(wù)器,并且把服務(wù)實(shí)現(xiàn)類加載到緩存,等待請(qǐng)求時(shí)調(diào)用。這一步,我們要進(jìn)行服務(wù)注冊(cè)。為了簡(jiǎn)單化處理,我們只注冊(cè)通信服務(wù)器的監(jiān)聽地址即可。
在上面代碼中,bind之后我們執(zhí)行了registry.register(serverAddress); 它的作用就是,將Netty監(jiān)聽的IP端口注冊(cè)到zookeeper。

package com.viewscenes.netsupervisor.registry;
@Component
public class ServiceRegistry {
    Logger logger = LoggerFactory.getLogger(this.getClass());
    @Value("${registry.address}")
    private String registryAddress;
    private static final String ZK_REGISTRY_PATH = "/rpc";

    public void register(String data) {
        if (data != null) {
            ZkClient client = connectServer();
            if (client != null) {
                AddRootNode(client);
                createNode(client, data);
            }
        }
    }
    //連接zookeeper
    private ZkClient connectServer() {
        ZkClient client = new ZkClient(registryAddress,20000,20000);
        return client;
    }
    //創(chuàng)建根目錄/rpc
    private void AddRootNode(ZkClient client){
        boolean exists = client.exists(ZK_REGISTRY_PATH);
        if (!exists){
            client.createPersistent(ZK_REGISTRY_PATH);
            logger.info("創(chuàng)建zookeeper主節(jié)點(diǎn) {}",ZK_REGISTRY_PATH);
        }
    }
    //在/rpc根目錄下,創(chuàng)建臨時(shí)順序子節(jié)點(diǎn)
    private void createNode(ZkClient client, String data) {
        String path = client.create(ZK_REGISTRY_PATH + "/provider", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        logger.info("創(chuàng)建zookeeper數(shù)據(jù)節(jié)點(diǎn) ({} => {})", path, data);
    }
}

有一點(diǎn)需要注意,子節(jié)點(diǎn)必須是臨時(shí)節(jié)點(diǎn)。這樣,生產(chǎn)者端停掉之后,才能通知到消費(fèi)者,把此服務(wù)從服務(wù)列表中剔除。到此為止,生產(chǎn)者端已經(jīng)完成。我們看一下它的啟動(dòng)日志:

加載服務(wù)類: com.viewscenes.netsupervisor.service.InfoUserService
已加載全部服務(wù)接口:{com.viewscenes.netsupervisor.service.InfoUserService=com.viewscenes.netsupervisor.service.impl.InfoUserServiceImpl@46cc127b}
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 8001 (http) with context path ''
Started RpcProviderApplication in 2.003 seconds (JVM running for 3.1)
RPC 服務(wù)器啟動(dòng).監(jiān)聽端口:18868
Starting ZkClient event thread.
Socket connection established to 192.168.245.131/192.168.245.131:2183, initiating session
Session establishment complete on server 192.168.245.131/192.168.245.131:2183, sessionid = 0x367835b48970010, negotiated timeout = 4000
zookeeper state changed (SyncConnected)
創(chuàng)建zookeeper主節(jié)點(diǎn) /rpc
創(chuàng)建zookeeper數(shù)據(jù)節(jié)點(diǎn) (/rpc/provider0000000000 => 192.168.197.1:28868)

四、RPC消費(fèi)者

首先,我們需要把生產(chǎn)者端的服務(wù)接口API,即InfoUserService。以相同的目錄放到消費(fèi)者端。路徑不同,調(diào)用會(huì)找不到的哦。

1、代理

RPC的目標(biāo)其中有一條,《程序員無需額外地為這個(gè)交互作用編程?!匪?,我們?cè)谡{(diào)用的時(shí)候,就像調(diào)用本地方法一樣。就像下面這樣:

@Controller
public class IndexController {  
    @Autowired
    InfoUserService userService;
    
    @RequestMapping("getById")
    @ResponseBody
    public InfoUser getById(String id){
        logger.info("根據(jù)ID查詢用戶信息:{}",id);
        return userService.getInfoUserById(id);
    }
}

那么,問題來了。消費(fèi)者端并沒有此接口的實(shí)現(xiàn),怎么調(diào)用到的呢?這里,首先就是代理。筆者這里用的是Spring的工廠Bean機(jī)制創(chuàng)建的代理對(duì)象,涉及的代碼較多,就不在文章中體現(xiàn)了,如果有不懂的同學(xué),請(qǐng)想象一下,MyBatis中的Mapper接口怎么被調(diào)用的??梢詤⒖脊P者文章:Mybatis源碼分析(四)mapper接口方法是怎樣被調(diào)用到的

總之,在調(diào)用userService方法的時(shí)候,會(huì)調(diào)用到代理對(duì)象的invoke方法。在這里,封裝請(qǐng)求信息,然后調(diào)用Netty的客戶端方法發(fā)送消息。然后根據(jù)方法返回值類型,轉(zhuǎn)成相應(yīng)的對(duì)象返回。

package com.viewscenes.netsupervisor.configurer.rpc;

@Component
public class RpcFactory<T> implements InvocationHandler {

    @Autowired
    NettyClient client;

    Logger logger = LoggerFactory.getLogger(this.getClass());
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Request request = new Request();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameters(args);
        request.setParameterTypes(method.getParameterTypes());
        request.setId(IdUtil.getId());

        Object result = client.send(request);
        Class<?> returnType = method.getReturnType();

        Response response = JSON.parseObject(result.toString(), Response.class);
        if (response.getCode()==1){
            throw new Exception(response.getError_msg());
        }
        if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)){
            return response.getData();
        }else if (Collection.class.isAssignableFrom(returnType)){
            return JSONArray.parseArray(response.getData().toString(),Object.class);
        }else if(Map.class.isAssignableFrom(returnType)){
            return JSON.parseObject(response.getData().toString(),Map.class);
        }else{
            Object data = response.getData();
            return JSONObject.parseObject(data.toString(), returnType);
        }
    }
}

2、服務(wù)發(fā)現(xiàn)

在生產(chǎn)者端,我們把服務(wù)IP端口都注冊(cè)到zookeeper中,所以這里,我們要去拿到服務(wù)地址,然后通過Netty連接。重要的是,還要對(duì)根目錄進(jìn)行監(jiān)聽子節(jié)點(diǎn)變化,這樣隨著生產(chǎn)者的上線和下線,消費(fèi)者端可以及時(shí)感知。

package com.viewscenes.netsupervisor.connection;

@Component
public class ServiceDiscovery {

    @Value("${registry.address}")
    private String registryAddress;
    @Autowired
    ConnectManage connectManage;

    // 服務(wù)地址列表
    private volatile List<String> addressList = new ArrayList<>();
    private static final String ZK_REGISTRY_PATH = "/rpc";
    private ZkClient client;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @PostConstruct
    public void init(){
        client = connectServer();
        if (client != null) {
            watchNode(client);
        }
    }
    
    //連接zookeeper
    private ZkClient connectServer() {
        ZkClient client = new ZkClient(registryAddress,30000,30000);
        return client;
    }
    //監(jiān)聽子節(jié)點(diǎn)數(shù)據(jù)變化
    private void watchNode(final ZkClient client) {
        List<String> nodeList = client.subscribeChildChanges(ZK_REGISTRY_PATH, (s, nodes) -> {
            logger.info("監(jiān)聽到子節(jié)點(diǎn)數(shù)據(jù)變化{}",JSONObject.toJSONString(nodes));
            addressList.clear();
            getNodeData(nodes);
            updateConnectedServer();
        });
        getNodeData(nodeList);
        logger.info("已發(fā)現(xiàn)服務(wù)列表...{}", JSONObject.toJSONString(addressList));
        updateConnectedServer();
    }
    //連接生產(chǎn)者端服務(wù)
    private void updateConnectedServer(){
        connectManage.updateConnectServer(addressList);
    }

    private void getNodeData(List<String> nodes){
        logger.info("/rpc子節(jié)點(diǎn)數(shù)據(jù)為:{}", JSONObject.toJSONString(nodes));
        for(String node:nodes){
            String address = client.readData(ZK_REGISTRY_PATH+"/"+node);
            addressList.add(address);
        }
    }
}

其中,connectManage.updateConnectServer(addressList);就是根據(jù)服務(wù)地址,去連接生產(chǎn)者端的Netty服務(wù)。然后創(chuàng)建一個(gè)Channel列表,在發(fā)送消息的時(shí)候,從中選取一個(gè)Channel和生產(chǎn)者端進(jìn)行通信。

3、Netty客戶端

Netty客戶端有兩個(gè)方法比較重要,一個(gè)是根據(jù)IP端口連接服務(wù)器,返回Channel,加入到連接管理器;一個(gè)是用Channel發(fā)送請(qǐng)求數(shù)據(jù)。同時(shí),作為客戶端,空閑的時(shí)候還要往服務(wù)端發(fā)送心跳信息。

package com.viewscenes.netsupervisor.netty.client;

@Component
public class NettyClient {
    Logger logger = LoggerFactory.getLogger(this.getClass());
    private EventLoopGroup group = new NioEventLoopGroup(1);
    private Bootstrap bootstrap = new Bootstrap();
    @Autowired
    NettyClientHandler clientHandler;
    @Autowired
    ConnectManage connectManage;
   
    public Object send(Request request) throws InterruptedException{

        Channel channel = connectManage.chooseChannel();
        if (channel!=null && channel.isActive()) {
            SynchronousQueue<Object> queue = clientHandler.sendRequest(request,channel);
            Object result = queue.take();
            return JSONArray.toJSONString(result);
        }else{
            Response res = new Response();
            res.setCode(1);
            res.setError_msg("未正確連接到服務(wù)器.請(qǐng)檢查相關(guān)配置信息!");
            return JSONArray.toJSONString(res);
        }
    }
    public Channel doConnect(SocketAddress address) throws InterruptedException {
        ChannelFuture future = bootstrap.connect(address);
        Channel channel = future.sync().channel();
        return channel;
    }
    ....其他方法略
}

我們必須重點(diǎn)關(guān)注send方法,它是在代理對(duì)象invoke方法調(diào)用到的。首先從連接器中輪詢選擇一個(gè)Channel,然后發(fā)送數(shù)據(jù)。但是,Netty是異步操作,我們還要轉(zhuǎn)為同步,就是說要等待生產(chǎn)者端返回?cái)?shù)據(jù)才往下執(zhí)行。筆者在這里用的是同步隊(duì)列SynchronousQueue,它的take方法會(huì)阻塞在這里,直到里面有數(shù)據(jù)可讀。然后在處理器中,拿到返回信息寫到隊(duì)列中,take方法返回。

package com.viewscenes.netsupervisor.netty.client;
@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Autowired
    NettyClient client;
    @Autowired
    ConnectManage connectManage;
    Logger logger = LoggerFactory.getLogger(this.getClass());
    private ConcurrentHashMap<String,SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();

    public void channelActive(ChannelHandlerContext ctx)   {
        logger.info("已連接到RPC服務(wù)器.{}",ctx.channel().remoteAddress());
    }
    public void channelInactive(ChannelHandlerContext ctx)   {
        InetSocketAddress address =(InetSocketAddress) ctx.channel().remoteAddress();
        logger.info("與RPC服務(wù)器斷開連接."+address);
        ctx.channel().close();
        connectManage.removeChannel(ctx.channel());
    }
    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
        Response response = JSON.parseObject(msg.toString(),Response.class);
        String requestId = response.getRequestId();
        SynchronousQueue<Object> queue = queueMap.get(requestId);
        queue.put(response);
        queueMap.remove(requestId);
    }
    public SynchronousQueue<Object> sendRequest(Request request,Channel channel) {
        SynchronousQueue<Object> queue = new SynchronousQueue<>();
        queueMap.put(request.getId(), queue);
        channel.writeAndFlush(request);
        return queue;
    }
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
        logger.info("已超過30秒未與RPC服務(wù)器進(jìn)行讀寫操作!將發(fā)送心跳消息...");
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.ALL_IDLE){
                Request request = new Request();
                request.setMethodName("heartBeat");
                ctx.channel().writeAndFlush(request);
            }
        }else{
            super.userEventTriggered(ctx,evt);
        }
    }
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        logger.info("RPC通信服務(wù)器發(fā)生異常.{}",cause);
        ctx.channel().close();
    }
}

至此,消費(fèi)者端也基本完成。同樣的,我們先看一下啟動(dòng)日志:

Waiting for keeper state SyncConnected
Opening socket connection to server 192.168.139.129/192.168.139.129:2181. Will not attempt to authenticate using SASL (unknown error)
Socket connection established to 192.168.139.129/192.168.139.129:2181, initiating session
Session establishment complete on server 192.168.139.129/192.168.139.129:2181, sessionid = 0x100000273ba002c, negotiated timeout = 20000
zookeeper state changed (SyncConnected)
/rpc子節(jié)點(diǎn)數(shù)據(jù)為:["provider0000000015"]
已發(fā)現(xiàn)服務(wù)列表...["192.168.100.74:18868"]
加入Channel到連接管理器./192.168.100.74:18868
已連接到RPC服務(wù)器./192.168.100.74:18868
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 7002 (http) with context path ''
Started RpcConsumerApplication in 4.218 seconds (JVM running for 5.569)

五、測(cè)試

我們以Controller里面的兩個(gè)方法為例,先開啟100個(gè)線程調(diào)用insertInfoUser方法,然后開啟1000個(gè)線程調(diào)用查詢方法getAllUser。

public class IndexController {

    Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    InfoUserService userService;

    @RequestMapping("insert")
    @ResponseBody
    public List<InfoUser> getUserList() throws InterruptedException {
        long start = System.currentTimeMillis();
        int thread_count = 100;
        CountDownLatch countDownLatch = new CountDownLatch(thread_count);
        for (int i=0;i<thread_count;i++){
            new Thread(() -> {
                InfoUser infoUser = new InfoUser(IdUtil.getId(),"Jeen","BeiJing");
                List<InfoUser> users = userService.insertInfoUser(infoUser);
                logger.info("返回用戶信息記錄:{}", JSON.toJSONString(users));
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        long end = System.currentTimeMillis();
        logger.info("線程數(shù):{},執(zhí)行時(shí)間:{}",thread_count,(end-start));
        return null;
    }
    @RequestMapping("getAllUser")
    @ResponseBody
    public Map<String,InfoUser> getAllUser() throws InterruptedException {

        long start = System.currentTimeMillis();
        int thread_count = 1000;
        CountDownLatch countDownLatch = new CountDownLatch(thread_count);
        for (int i=0;i<thread_count;i++){
            new Thread(() -> {
                Map<String, InfoUser> allUser = userService.getAllUser();
                logger.info("查詢所有用戶信息:{}",JSONObject.toJSONString(allUser));
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        long end = System.currentTimeMillis();
        logger.info("線程數(shù):{},執(zhí)行時(shí)間:{}",thread_count,(end-start));

        return null;
    }
}

結(jié)果如下:


新增用戶信息

查詢用戶信息.png

六、總結(jié)

本文簡(jiǎn)單介紹了RPC的整個(gè)流程,如果你正在學(xué)習(xí)RPC的相關(guān)知識(shí),可以根據(jù)文中的例子,自己實(shí)現(xiàn)一遍。相信寫完之后,你會(huì)對(duì)RPC會(huì)有更深一些的認(rèn)識(shí)。

生產(chǎn)者端流程:

  • 加載服務(wù),并緩存
  • 啟動(dòng)通訊服務(wù)器(Netty)
  • 服務(wù)注冊(cè)(把通訊地址放入zookeeper,也可以把加載到的服務(wù)也放進(jìn)去)
  • 反射,本地調(diào)用

消費(fèi)者端流程:

  • 代理服務(wù)接口
  • 服務(wù)發(fā)現(xiàn)(連接zookeeper,拿到服務(wù)地址列表)
  • 遠(yuǎn)程調(diào)用(輪詢生產(chǎn)者服務(wù)列表,發(fā)送消息)

限于篇幅,本文代碼并不完整,如有需要,訪問:https://github.com/taoxun/simple_rpc 或者添加筆者微信公眾號(hào),獲取完整項(xiàng)目。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容