netty是一個(gè)nio的框架,這個(gè)與原生的java的nio的不同之處在于原生的nio比較難以使用,通過netty可以不用了解底層的編寫,快速實(shí)現(xiàn)網(wǎng)絡(luò)編程,更快實(shí)現(xiàn)我們需要的業(yè)務(wù)//雖然我推薦多了解底層
netty的使用比較廣泛,比如游戲的數(shù)據(jù)包傳輸,還有rpc的調(diào)用,比如dubbo。因?yàn)閭€(gè)人傾向了解一樣?xùn)|西可以先從demo入手,因此決定寫一個(gè)簡(jiǎn)單的rpc調(diào)用來了解netty,show code!
先新建一個(gè)maven項(xiàng)目,在里面創(chuàng)建三個(gè)moudle,分別是start_interface,start_rpc_consumer和start_rpc_provider。在interface的pom.xml加依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
然后新建RPCClient作為客戶端的使用
package com.example.start_interface;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RPCClient {
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static RPCClientHandler client;
/**
* 創(chuàng)建一個(gè)代理對(duì)象
*/
public Object createProxy(final Class<?> serviceClass, final String providerName
, final RPCClientHandler rpcClientHandler, String ip, int port) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serviceClass}, (proxy, method, args) -> {
if (client == null) {
initClient(rpcClientHandler, ip, port);
}
String param = providerName;
String[] params = param.split("#");
String className = params[0];
String methodName = params[1];
RPCParam rpcParam =new RPCParam();
rpcParam.setClazzName(className);
rpcParam.setMethodName(methodName);
rpcParam.initParams(args.length);
for (int i = 0; i < args.length; i++) {
rpcParam.getParams()[i]=args[i];
rpcParam.getParamTypes()[i] = args[i].getClass();
}
// 設(shè)置參數(shù)
client.setRpcParam(rpcParam);
return executor.submit(client).get();
});
}
/**
* 初始化客戶端
*/
private static void initClient(RPCClientHandler rpcClientHandler, String ip, int port) {
client = rpcClientHandler;
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
p.addLast(client);
}
});
try {
b.connect(ip, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
以及他的Handler
package com.example.start_interface;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Callable;
public class RPCClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
//結(jié)果
private Object result;
//參數(shù)
private RPCParam rpcParam;
@Override
public void channelActive(ChannelHandlerContext ctx) {
context = ctx;
}
/**
* 收到服務(wù)端數(shù)據(jù),喚醒等待線程
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) {
result = msg;
notify();
}
/**
* 寫出數(shù)據(jù),開始等待喚醒
*/
@Override
public synchronized Object call() throws InterruptedException {
context.writeAndFlush(rpcParam);
//context.writeAndFlush("as");
wait();
return result;
}
public RPCParam getRpcParam() {
return rpcParam;
}
public void setRpcParam(RPCParam rpcParam) {
this.rpcParam = rpcParam;
}
}
以及RPCServer
package com.example.start_interface;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class RPCServer {
public static void startServer(String hostName, int port) {
try {
ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
p.addLast(new RPCServerHandler());
}
});
bootstrap.bind(hostName, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
以及RPCServerHandler
package com.example.start_interface;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class RPCServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(msg.toString());
RPCParam rpcParam = (RPCParam) msg;
String className = rpcParam.getClazzName();
String methodName = rpcParam.getMethodName();
try {
Class clazz = Class.forName(className);
Object object = clazz.newInstance();
Method method = object.getClass().getDeclaredMethod(methodName,rpcParam.getParamTypes());
Object result = method.invoke(object,rpcParam.getParams());
ctx.writeAndFlush(result);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
其中為了獲取參數(shù),我們需要自己寫一個(gè)param,來方便傳輸?shù)玫椒椒ê蛥?shù)
package com.example.start_interface;
import java.io.Serializable;
public class RPCParam implements Serializable {
private String clazzName;
private String methodName;
private Object[] params;
private Class<?>[] paramTypes;
public String getClazzName() {
return clazzName;
}
public void setClazzName(String clazzName) {
this.clazzName = clazzName;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
public void initParams(int size){
params = new Object[size];
paramTypes = new Class[size];
}
public Class<?>[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class<?>[] paramTypes) {
this.paramTypes = paramTypes;
}
}
其實(shí)返回結(jié)果感覺也是應(yīng)該自己寫一個(gè)了,但是因?yàn)槭且粋€(gè)練手demo,就算了,然后再加一個(gè)interface
package com.example.start_interface;
public interface UserService {
/**
* helloWorld
* @param word
* @return
*/
String sayHello(String word);
String test(Integer i);
}
這時(shí)的整體框架算是搭建好了,這時(shí)候開始測(cè)試一下
這時(shí)候在consume包里實(shí)現(xiàn)調(diào)用
package com.example.start_rpc_consumer;
import com.example.start_interface.RPCClient;
import com.example.start_interface.RPCClientHandler;
import com.example.start_interface.UserService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StartRpcConsumerApplication {
public static final String providerName = "com.example.start_rpc_provider.UserServiceImpl#test#";
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(StartRpcConsumerApplication.class, args);
RPCClient rpcClient = new RPCClient();
UserService userService = (UserService) rpcClient.createProxy(UserService.class,providerName
,new RPCClientHandler(),"localhost", 8888);
for (;;) {
System.out.println("ok???");
Thread.sleep(1000);
System.out.println(userService.test(12));
System.out.println("ok");
}
}
}
在provider包實(shí)現(xiàn)impl的類以及開啟rpc服務(wù)
package com.example.start_rpc_provider;
import com.example.start_interface.UserService;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class UserServiceImpl implements UserService {
@Override
public String sayHello(String word) {
System.out.println("調(diào)用成功--參數(shù):" + word);
return "調(diào)用成功--參數(shù):" + word;
}
@Override
public String test(Integer i){
return "canshu"+i;
}
}
package com.example.start_rpc_provider;
import com.example.start_interface.RPCServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StartRpcProviderApplication {
public static void main(String[] args) {
SpringApplication.run(StartRpcProviderApplication.class, args);
//UserServiceImpl.startServer("localhost", 8990);
RPCServer.startServer("localhost", 8888);
}
}
以下是輸出結(jié)果

其實(shí)在這個(gè)demo里面學(xué)到了不少知識(shí),除了粘包拆包以及netty的調(diào)用,還有就是動(dòng)態(tài)代理以及反射機(jī)制,其中比較有意思的是,這個(gè)其實(shí)并不支持基本數(shù)據(jù)類型,只支持object類,我在網(wǎng)上看到的比較有意思的解決方法是,1.默認(rèn)以后的參數(shù)都使用object類不使用基本數(shù)據(jù)類型。2寫一個(gè)工具把基本數(shù)據(jù)類型進(jìn)行裝箱,這個(gè)就以后有機(jī)會(huì)再玩玩啦。
這個(gè)是根據(jù)之前的一篇博客進(jìn)行改寫的,但是忘了那個(gè)網(wǎng)址了,希望有知道的童鞋可以告訴我然后我再加上出處