實現一個簡單的RPC框架

國慶期間閑來無事,看到掘金上一篇文章《徒手擼框架--實現 RPC 遠程調用》,覺得寫的很不錯,也推薦大家閱讀一下。于是自己也趁機實現了一個簡單的RPC框架,與他不同的是,我使用了etcd作為注冊中心來實現服務注冊與服務發(fā)現的功能。具體的內容請看下文~~。
先附上Github : https://github.com/AlexZFX/easyrpc

整體概述

先看一下一個簡單的RPC的調用流程


rpc調用流程.png
  1. Server端的服務注冊
  2. Client端獲取服務提供者的信息,保存在本地,定時更新
  3. Client收到請求,對提供相應服務的Server發(fā)起請求
  4. Server通過反射調用本地的服務類的方法,將結果或出現的異常返回。

那我們在寫一個RPC框架時,應該考慮的問題就應該包括以下幾點

  1. 服務注冊與發(fā)現
  2. Client端服務的代理
  3. 請求的發(fā)送與處理
  4. Server端方法的調用與結果返回

這四點內部又有一些細節(jié)要處理,下面我會對這幾點進行描述,并給出我自己的實現。

服務注冊與發(fā)現

對于客戶端和服務端來說,我們希望我們提供的服務是非侵入式的,也就是對客戶端或者服務端本身的服務代碼無影響。而這樣最便捷的方式便是通過注解來實現。
于是我定義了兩個注解 @RpcInterface@RpcService
注解定義如下

@RpcInterface

/**
 * Description : 注解于實現的接口類上,表示該類是用于使用遠程 rpc服務的 class,其中的method都會通過動態(tài)代理調用到遠程的服務端
 */
//注解的聲明周期為始終不會丟棄
@Retention(RetentionPolicy.RUNTIME)
//注解的使用地點為 類,接口或enum聲明
@Target(ElementType.TYPE)
@Documented
public @interface RpcInterface {
}

@RpcService

/**
 * Description : 注解于實現了接口的服務類上,表示該類是用于提供rpc服務的 class,其中的method都會被注冊到etcd中
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
public @interface RpcService {
}

注解的作用和解釋寫在了注釋里,有了這兩個注解之后,我們就需要對注解進行處理,@RpcService 對應將相應服務接口名注冊在etcd上,@RpcInterface 對應查找注冊中心中的服務名,在本地通過動態(tài)代理將本地的方法的結果修改為遠程調用得到的結果。
服務的注冊和發(fā)現通過一個封裝好的 EtcdRegistry 來實現,這里的代碼主要參考了阿里第四屆中間件性能大賽初賽的服務注冊發(fā)現代碼,只做了一點點小的修改。
完整內容還是看Github。

Register方法和Find方法的內容如下~

    //注冊類名,一個類對應一個client
    @Override
    public void register(String serviceName, int port) throws Exception {
        String strKey = MessageFormat.format("/{0}/{1}/{2}:{3}", ROOTPATH, serviceName, getHostIp(), String.valueOf(port));
        ByteSequence key = ByteSequence.fromString(strKey);
        // 目前只需要創(chuàng)建這個key,對應的value暫不使用,先留空
        ByteSequence val = ByteSequence.fromString("");
        //等待put結束之后繼續(xù)運行
        kv.put(key, val, PutOption.newBuilder().withLeaseId(leaseId).build()).get();
        log.info("Register a new service at :" + strKey);
    }

    private String getHostIp() throws UnknownHostException {
        return Inet4Address.getLocalHost().getHostAddress();
    }
    
    @Override
    public List<EndPoint> find(String serviceName) throws Exception {
        String strkey = MessageFormat.format("/{0}/{1}", ROOTPATH, serviceName);
        log.info("start to find service, Name :" + strkey);
        ByteSequence key = ByteSequence.fromString(strkey);
        GetResponse response = kv.get(key, GetOption.newBuilder().withPrefix(key).build()).get();
        List<EndPoint> list = new ArrayList<>();
        response.getKvs().forEach(kv -> {
            String s = kv.getKey().toStringUtf8();
            int index = s.lastIndexOf("/");
            String endPointStr = s.substring(index + 1, s.length());
            String host = endPointStr.split(":")[0];
            int post = Integer.parseInt(endPointStr.split(":")[1]);
            list.add(new EndPoint(host, post));
        });
        return list;
    }

對注解的處理方面,為了方便,我沒有自己寫一整套掃描包獲取注解的方法,而是使用了Java的開源反射框架 Reflections ,簡單的使用如下。

    Reflections reflections = new Reflections(packagePath);
    Set<Class<?>> classes = reflections.getTypesAnnotatedWith(RpcService.class);

這樣我們就獲取到的標識了相應注解的類。
在微服務中,我們往往通過實現同一個接口來保證客戶端方法和服務端方法的同步,所以我們在注冊過程中也應該注冊的是接口名,這樣保證了服務的可拓展性。
這些具體的操作都被放在了 ServerMain 這個類中,服務端的啟動也是通過這個類的start方法來實現。

@Slf4j
public class ServerMain {

    private static final int DEFAULT_SERVER_PORT = 8890;

    private IRegistry registry;

    private int port;

    private final String packagePath;


    public ServerMain(String packagePath) {
        this(packagePath, new EtcdRegistry());
    }

    public ServerMain(String packagePath, IRegistry registry) {
        this.registry = registry;
        this.packagePath = packagePath;
        this.port = System.getProperty("server.port") == null ? DEFAULT_SERVER_PORT : Integer.parseInt(System.getProperty("server.port"));
    }

    public void start() {
        Reflections reflections = new Reflections(packagePath);
        Set<Class<?>> classes = reflections.getTypesAnnotatedWith(RpcService.class);
        classes.forEach(clazz -> {
            try {
                Class<?>[] interfaces = clazz.getInterfaces();
                String clazzName = clazz.getName();
                if (interfaces != null && interfaces.length > 0) {
                    //簡單實現,所以只獲取了第一個interface的name,實際上并不準確,可能有誤。
                    clazzName = interfaces[0].getName();
                }
                //注冊的是 接口名 和 服務實例
                //clazzMap是用來保存一個實例對象,相當于服務端的單例
                ServerHandler.clazzMap.put(clazzName, clazz.newInstance());
                registry.register(clazzName, port);
            } catch (Exception e) {
                log.error("register service failed : " + e.getLocalizedMessage(), e);
            }
        });
//      //新開線程的話會程序會退出(如果在springboot的構造函數中則另開線程啟動,否則會阻塞項目的啟動)
//        new Thread(() -> {
        Server server = new Server(port);
        server.start();
//        }).start();
    }
}

客戶端也通過一個 ClientServer 類實現了客戶端服務的啟動工作,將注解處理,服務查找和緩存等任務進行實現。
find方法返回的結果是一個EndPoint的list,對應提供相應服務的n個節(jié)點,每個相同的endpoint對應了我封裝的一個netty的client,具體會在client的部分講明~~

@Slf4j
public class ClientServer {

    private IRegistry registry;

    //設置一個endpoint使用一個client,netty高效理論上滿足使用
    private static ConcurrentHashMap<EndPoint, Client> clientMap = new ConcurrentHashMap<>();

    private static ConcurrentHashMap<String, List<EndPoint>> serviceMap = new ConcurrentHashMap<>();

    private final String packagePath;

    private static final Random random = new Random();

    public ClientServer(String packagePath) {
        this.packagePath = packagePath;
        this.registry = new EtcdRegistry();
    }

    public void start() {
        Reflections reflections = new Reflections(packagePath);
        Set<Class<?>> classes = reflections.getTypesAnnotatedWith(RpcInterface.class);
        EventLoopGroup eventLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(4) : new NioEventLoopGroup(4);
        //定時任務線程池,定時更新服務列表,設置為3分鐘
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2);
        classes.forEach(clazz -> executorService.scheduleAtFixedRate(() -> {
            try {
                //拿到當前仍在注冊中心中的相應服務列表
                // TODO 刪除掉對應失效的endpoint
                Class<?>[] interfaces = clazz.getInterfaces();
                String className = clazz.getName();
                if (interfaces != null && interfaces.length > 0) {
                    className = interfaces[0].getName();
                }
                List<EndPoint> list = registry.find(className);
                serviceMap.put(className, list);
                list.forEach(endPoint -> {
                    if (clientMap.get(endPoint) == null) {
                        //所有的Client共用一個EventLoopGroup
                        Client client = new Client(endPoint.getHost(), endPoint.getPort(), eventLoopGroup);
                        clientMap.put(endPoint, client);
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, 0, 3 * 60, TimeUnit.SECONDS));

    }

    public static Client getClient(String serviceName) {
        List<EndPoint> endPoints = serviceMap.get(serviceName);
        // 簡單的負載均衡,只使用了隨機選擇
        if (endPoints != null) {
            EndPoint endPoint = endPoints.get(random.nextInt(endPoints.size()));
            return clientMap.get(endPoint);
        }
        return null;
    }
}

以上就完成了服務注冊與發(fā)現的工作,同時也提供了非常簡單易用的啟動接口,等會看看example就知道啦。

Client端的服務代理

瀏覽器的請求被發(fā)送到Client端后,雖然表面上是通過本地的方法返回,但實質上其實是遠程方法的調用,這主要是動過Java中的動態(tài)代理來實現的。
這個項目中的動態(tài)代理是通過CGLIB來實現的,關于CGLIB和JDK原生動態(tài)代理的區(qū)別,我就不細述,網上有很多相關的文章。
先聲明一個ProxyFactory類,用于在注入服務對象時生成代理

public class ProxyFactory {

    public static <T> T create(Class<T> clazz) {
        Enhancer enhancer = new Enhancer();
        enhancer.setSuperclass(clazz);
        enhancer.setCallback(new ProxyIntercepter());
        return (T) enhancer.create();
    }
}

在調用代理類的方法時,實際上會調用到 ProxyIntercepter 中的intercepter方法。所以實際上的請求會在intercepter方法里發(fā)送。
ProxyIntercepter 主要內容如下

@Slf4j
public class ProxyIntercepter implements MethodInterceptor {
   @Override
   public Object intercept(Object o, Method method, Object[] parameters, MethodProxy methodProxy) throws Throwable {
       RpcRequest rpcRequest = new RpcRequest();
       Class clazz = method.getDeclaringClass();
       Class<?>[] interfaces = clazz.getInterfaces();
       //存在接口時使用的是接口名稱
       String clazzName = clazz.getName();
       if (interfaces != null && interfaces.length > 0) {
           clazzName = interfaces[0].getName();
       }
       rpcRequest.setClassName(clazzName);
       rpcRequest.setServiceName(method.getName());
       rpcRequest.setParameterTypes(method.getParameterTypes());
       rpcRequest.setParameters(parameters);

       Client client = ClientServer.getClient(rpcRequest.getClassName());

       RpcFuture rpcFuture;
       if (client != null) {
           ChannelFuture channelFuture = client.connectChannel();
           rpcFuture = new RpcFuture(channelFuture.channel().eventLoop());
           if (channelFuture.isSuccess()) {
               sendRequest(rpcRequest, rpcFuture, channelFuture);
           } else {
               channelFuture.addListener((ChannelFutureListener) future -> {
                   if (future.isSuccess()) {
                       sendRequest(rpcRequest, rpcFuture, future);
                   } else {
                       log.error("send request error ", future.cause());
                   }
               });
           }
           //這里沒有用listener & getNow的方式獲取主要是考慮客戶端本身非異步的情形,同時是為了簡便實現。
           RpcResponse rpcResponse = rpcFuture.get(5, TimeUnit.SECONDS);
           if (rpcResponse.getException() == null) {
               return rpcResponse.getResult();
           } else {
               throw rpcResponse.getException();
           }
       } else {
           log.error("no rpcService is available :" + rpcRequest.getClassName());
           return null;
       }
   }

   private void sendRequest(RpcRequest rpcRequest, RpcFuture rpcFuture, ChannelFuture channelFuture) {
       channelFuture.channel().writeAndFlush(rpcRequest)
               .addListener((ChannelFutureListener) writefuture -> {
                   if (writefuture.isSuccess()) {
                       FutureHolder.registerFuture(rpcRequest.getRequestId(), rpcFuture);
                       log.info("send request success");
                   } else {
                       rpcFuture.tryFailure(writefuture.cause());
                       log.error("send request failed", writefuture.cause());
                   }
               });
   }
}

可以看到我仍然是獲取了方法所在類的接口,并根據接口名查找了相應的Client對象,發(fā)送請求。

請求的發(fā)送與處理

為了服務端能成功利用反射進行方法調用,客戶端的請求應該包含一些參數,在 RpcRequest 類中。

@Data
public class RpcRequest {
    private static AtomicLong atomicLong = new AtomicLong(0);
    // 請求Id  netty的請求是異步的,為了復用連接,一般會帶個id,這樣在收到返回信息的時候能一一對應。
    private long requestId;
    // 類名
    private String className;
    // 服務名
    private String serviceName;
    // 參數類型
    private Class<?>[] parameterTypes;
    // 參數
    private Object[] parameters;

    public RpcRequest() {
        this.requestId = atomicLong.getAndIncrement();
    }

    public RpcRequest(long requestId) {
        this.requestId = requestId;
    }
}

服務端返回的數據包含的內容如下

@Data
public class RpcResponse {
    private long requestId;
    private Throwable exception;
    private Object result;

    public RpcResponse(long requestId) {
        this.requestId = requestId;
    }

    public RpcResponse() {
    }
}

Client端和Server端均使用了Netty作為網絡框架。

其實利用HTTP協議+Json也可以完成我們的基本需求,但是在大部分Rpc框架中,為了提高性能,往往都會自定義一個滿足需求的協議,并采用一些更為高效的序列化方案,如ProtoBuf,Tyro等。 本項目采用了ProtoStuff作為序列化方案, 和ProtoBuf相比主要是省去了初始生成proto文件等步驟,提供了較為易用的接口。(網上說ProtoStuff在序列化一些集合類的時候會有bug,我自己測試了一下HashMap和ArrayList,都沒有出現問題,就沒有專門去解決這一問題)
編寫了一個ProtoStuffUtil的工具類,提供serializer和deserializer方法(網上有好多這種代碼,參考了一些,因為很多文章,就沒有標注來源了)。

public class ProtoStuffUtil {

    public static <T> byte[] serializer(T o) {
        Schema schema = RuntimeSchema.getSchema(o.getClass());
        return ProtostuffIOUtil.toByteArray(o, schema, LinkedBuffer.allocate());
    }

    public static <T> T deserializer(byte[] bytes, Class<T> clazz) {
        Schema<T> schema = RuntimeSchema.createFrom(clazz);
        T message = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(bytes, message, schema);
        return message;
    }
}

工具類提供的方法在Netty的Encoder和Decoder中使用(參考了這篇文章《使用netty結合Protostuff傳輸對象例子》),這兩個類會在Netty啟動時被我利用一個Initializer添加到Netty的pipeline中去,關于Netty的責任鏈模式可以查看其他相關文章。
RpcEncoder

public class RpcEncoder extends MessageToByteEncoder {

    private Class<?> targetClazz;

    public RpcEncoder(Class<?> targetClazz) {
        this.targetClazz = targetClazz;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        if (targetClazz.isInstance(msg)) {
            byte[] data = ProtoStuffUtil.serializer(msg);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

RpcDecoder

public class RpcDecoder extends ByteToMessageDecoder {

    private Class<?> targerClass;

    public RpcDecoder(Class<?> targerClass) {
        this.targerClass = targerClass;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLen = in.readInt();
        if (dataLen < 0) {
            ctx.close();
        }
        if (in.readableBytes() < dataLen) {
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLen];
        in.readBytes(data);
        Object obj = ProtoStuffUtil.deserializer(data, targerClass);
        out.add(obj);
    }
}

接下來來說用于發(fā)送請求的Client類,為了復用TCP連接,Netty的channel都添加了KEEPALIVE的參數,同時,我也在每次獲取了一個EndPoint的屬性后建立一個新的Client,儲存在了ClientMap中。每個Client是對應一個EndPoint的,如果并發(fā)量大的話,其實可以多個Client對應于一個EndPoint,這時就可以將我們的Client設計為一個連接池,也可以是Netty自身提供的連接池,這方面擴展我就不細講,我github上的iustu-agent里面Connection那一塊本來有用到連接池,后來發(fā)現性能并沒有什么提升就沒再用了。

Client本質上就是一個Netty的Client,我設置成同一個客戶端的所有Client都會共用一個EventLoopGroup,主要是為了資源的合理利用吧。因為負載不高的情況下同一個EventLoopGroup其實是夠用的,并且還會有浪費。

Client 的構成如下

@Slf4j
public class Client {

    private EventLoopGroup eventLoopGroup;

    private Channel channel;

    private ChannelFuture channelFuture;

    private String host;

    private int port;

    public Client(String host, int port) {
        this(host, port, Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1));
    }

    public Client(String host, int port, EventLoopGroup eventLoopGroup) {
        this.host = host;
        this.port = port;
        this.eventLoopGroup = eventLoopGroup;
    }

    public ChannelFuture connectChannel() {
        if (channelFuture == null) {
            channelFuture = new Bootstrap().group(eventLoopGroup)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT)
                    .channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
                    .handler(new ClientInitialzer())
                    .connect(host, port)
                    .addListener((ChannelFutureListener) future -> {
                        if (future.isSuccess()) {
                            channel = future.channel();
                            log.info("start a client to " + host + ":" + port);
                            channel.closeFuture().addListener((ChannelFutureListener) closefuture -> {
                                log.info("stop the client to " + host + ":" + port);
                            });
                        } else {
                            log.error("start a Client failed", future.cause());
                        }
                    })
            ;
        }
        return channelFuture;
    }

    public Channel getChannel() {
        if (channel != null) {
            return channel;
        } else {
            channelFuture = connectChannel();
            return channelFuture.channel();
        }
    }
}

可以看見只有第一次調用 connectChannel 方法的時候才會真正的向服務端發(fā)起連接,這會返回一個channelFuture,但是channelFuture的結果并不一定已經成功了。
所以在 ProxyIntecepter 里我也做了相應的處理,完整代碼上面已附。處理部分的代碼如下

    ChannelFuture channelFuture = client.connectChannel();
    rpcFuture = new RpcFuture(channelFuture.channel().eventLoop());
    if (channelFuture.isSuccess()) {
        sendRequest(rpcRequest, rpcFuture, channelFuture);
    } else {
        channelFuture.addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                sendRequest(rpcRequest, rpcFuture, future);
            } else {
                log.error("send request error ", future.cause());
            }
         });
     }

請求發(fā)送的時候本地會在 RpcHolder(內部使用了ThreadLocal的HashMap) 中存儲一個 RpcFuture(實質上只是繼承了Netty中的DefaultPromise),與唯一的 RequestId 相對應,rpcfuture.get()會先wait,并在請求得到返回之后被notify并返回結果,如果結果中包含Exception,則會拋出異常。

服務端調用與返回

相比于Client而言,Server端的實現就簡單了許多,只是開啟了一個ServerBootStarp并綁定在指定的端口上接受請求。

收到請求后,會通過請求中攜帶的服務名查找到相應的對象(在注冊服務的同時被添加到clazzMap中去的impl實例),并利用反射調用其中方法,對結果進行返回。這里用的也是CGLIB中提供的FastClass來實現反射。

主要的代碼都在 ServerHandler 中了

@Slf4j
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

    public static ConcurrentHashMap<String, Object> clazzMap = new ConcurrentHashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
        log.info("recieve a request id : " + msg.getRequestId());
        RpcResponse rpcResponse = getResponse(msg);
        ctx.writeAndFlush(rpcResponse).addListener((GenericFutureListener<ChannelFuture>) future -> {
            if (!future.isSuccess()) {
                log.error(future.cause().getLocalizedMessage());
            }
        });
    }

    private RpcResponse getResponse(RpcRequest rpcRequest) {
        RpcResponse rpcResponse = new RpcResponse(rpcRequest.getRequestId());
        try {
            Class<?> clazz = Class.forName(rpcRequest.getClassName());

            Object c = clazzMap.get(rpcRequest.getClassName());
            if (c == null) {
                clazzMap.put(rpcRequest.getClassName(), clazz.newInstance());
                c = clazzMap.get(rpcRequest.getClassName());
            }
            String methodName = rpcRequest.getServiceName();
            Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
            Object[] parameters = rpcRequest.getParameters();
            FastClass fastClass = FastClass.create(clazz);
            FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
            //與Spring聯合使用時應該調用ApplicationContext里面的已有的bean
            Object result = fastMethod.invoke(c, parameters);

            rpcResponse.setResult(result);
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
            e.printStackTrace();
            rpcResponse.setException(e);
        }
        return rpcResponse;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            if (cause.getLocalizedMessage().equals("遠程主機強迫關閉了一個現有的連接。")) {
                log.info("一個客戶端連接斷開。");
                return;
            }
        }
        super.exceptionCaught(ctx, cause);
    }

}

試著使用一下~

先將當前的項目安裝到本地倉庫中去以供其他項目使用~
mvn install
這樣就OK了,最好還是用idea做這些比較方便

然后新建一個SpringBoot項目,加入web的組件,添加三個模塊,組成如下圖


test組成圖

同時在pom.xml中加入easyrpc的引用

    <dependency>
        <groupId>com.alexzfx</groupId>
        <artifactId>easyrpc</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>

在common包中,我們在com.alexzfx.easyrpc.com目錄下創(chuàng)建 HelloService 接口,聲明 sayHello方法

package com.alexzfx.easyrpc.common;

public interface HelloService {
    String sayHello();
}

然后在server包中創(chuàng)建一個實現類,標識 @RpcService 注解

package com.alexzfx.easyrpc.server;

import com.alexzfx.easyrpc.commom.annotation.RpcService;
import com.alexzfx.easyrpc.common.HelloService;

@RpcService
public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello() {
        return "Hello EasyRPC";
    }
}

在client包中也創(chuàng)建一個實現類,標識 @RpcInterface 注解,但只是實現,不做其他方法內容的實現

package com.alexzfx.easyrpc.client;

import com.alexzfx.easyrpc.commom.annotation.RpcInterface;
import com.alexzfx.easyrpc.common.HelloService;

@RpcInterface
public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello() {
        return null;
    }
}

在server包中創(chuàng)建一個main方法,實現我們的服務啟動功能

package com.alexzfx.easyrpc.server;

public class ServerApplication {

    public static void main(String[] args) {
        ServerMain serverMain = new ServerMain("com.alexzfx.easyrpc.server");
        serverMain.start();
    }
}

在client端中實現一個controller,并返回 sayHello 方法的結果,啟動前要先完成我們client端的啟動工作,并將本地使用的服務類創(chuàng)建為我們的代理類(因為本項目沒有完全對應Spring做配置,如果是專門為了和Spring集成的話,可以直接在applicationContext加載的時候將代理類添加進去,這樣就可以使 @Autowire 注解注入的是我們的代理類了)。

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class ClientApplication {

    public ClientApplication() {
        ClientServer clientServer = new ClientServer("com.alexzfx.easyrpc.client");
        clientServer.start();
        helloService = ProxyFactory.create(HelloServiceImpl.class);
    }

    private HelloService helloService;

    public static void main(String[] args) {
        SpringApplication.run(ClientApplication.class);
    }

    @RequestMapping("/")
    public String sayhello() {
        return helloService.sayHello();
    }
}

這樣,我們整體的測試項目就構建完成了。
啟動etcd,Windows上只要下載鏈接中相應的版本,解壓縮后直接雙擊啟動 etcd.exe 即可,其他系統也類似。

啟動 server 端的 main 函數,啟動時注冊了服務,所在端口等日志都會被打印出來。
再啟動client端 SpringBoot的 main 函數。
訪問 http://localhost:8080 你就可以看到 Hello EasyRPC 的出現啦。

總結

通過以上的內容,實現了一個簡單易用,包括了服務注冊與發(fā)現功能的RPC框架。實際上一個可通用的RPC框架,要處理的事情遠遠不止上面做的這么簡單,還需要包括監(jiān)控,熔斷等等許多功能。
這是我第一次寫完整的技術類博客,有許多不足的地方,希望發(fā)現的老哥幫忙指正~
如果你覺得還不錯的話,希望能給我的這個項目點個小小的 star ,這就是對我最大的鼓勵啦。EasyRPC

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容