Netty是一個(gè)異步的事件驅(qū)動(dòng)網(wǎng)絡(luò)框架,使用Netty可以研發(fā)高性能的私有協(xié)議,將業(yè)務(wù)邏輯和網(wǎng)絡(luò)進(jìn)行解耦,通過(guò)Netty我們可以實(shí)現(xiàn)一些常用的協(xié)議,如HTTP。
基本概念
Channel
Channel是NIO的基礎(chǔ),它代表一個(gè)連接,通過(guò)這個(gè)鏈接可以進(jìn)行IO操作,例如讀和寫(xiě)。
Future
在Netty的Channel中的每一個(gè)IO操作都是非阻塞的。
這就意味著每一個(gè)操作都是立刻返回結(jié)果的。在Java標(biāo)準(zhǔn)庫(kù)中有Future接口,但是我們使用Future的時(shí)候只能詢問(wèn)這個(gè)操作是否執(zhí)行完成,或者阻塞當(dāng)前的線程直到結(jié)果完成,這不是Netty想要的。
Netty實(shí)現(xiàn)了自己的ChannelFuture接口,我們可以傳遞一個(gè)回調(diào)到ChannelFuture,當(dāng)操作完成的時(shí)候才會(huì)執(zhí)行回調(diào)。
Events 和 Handlers
Netty使用的是事件驅(qū)動(dòng)的應(yīng)用設(shè)計(jì),因此Handler處理的數(shù)據(jù)流,在管道中是鏈?zhǔn)降氖录?。事件和Handler可以被 輸入 和 輸出的數(shù)據(jù)流進(jìn)行關(guān)聯(lián)。
輸入(Inbound)事件可以如下:
- Channel激活和滅活
- 讀操作事件
- 異常事件
- 用戶事件
輸出(Outbound)事件比較簡(jiǎn)單,一般是打開(kāi)和關(guān)閉連接,寫(xiě)入和刷新數(shù)據(jù)。
Encoder 和 Decoder
因?yàn)槲覀円幚砭W(wǎng)絡(luò)協(xié)議,需要操作數(shù)據(jù)的序列化和反序列化。
代碼
來(lái)個(gè)實(shí)際的案例:
- 新建項(xiàng)目,添加maven依賴
<properties>
<netty-all.version>4.1.6.Final</netty-all.version>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty-all.version}</version>
</dependency>
</dependencies>
- 創(chuàng)建數(shù)據(jù)的pojo
public class RequestData {
private int intValue;
private String stringValue;
// getter 和 setter
// toString方法
}
public class ResponseData {
private int intValue;
// getter 和 setter
// toString方法
}
- 創(chuàng)建Encoder和Decoder
public class RequestDataEncoder extends MessageToByteEncoder<RequestData> {
private final Charset charset = Charset.forName("UTF-8");
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, RequestData msg, ByteBuf out) throws Exception {
out.writeInt(msg.getIntValue());
out.writeInt(msg.getStringValue().length());
out.writeCharSequence(msg.getStringValue(), charset);
}
}
public class ResponseDataDecoder extends ReplayingDecoder<ResponseData> {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf in, List<Object> out) throws Exception {
ResponseData data = new ResponseData();
data.setIntValue(in.readInt());
out.add(data);
}
}
public class RequestDecoder extends ReplayingDecoder<RequestData> {
private final Charset charset = Charset.forName("UTF-8");
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
RequestData data = new RequestData();
data.setIntValue(in.readInt());
int strLen = in.readInt();
data.setStringValue(
in.readCharSequence(strLen, charset).toString());
out.add(data);
}
}
public class ResponseDataEncoder extends MessageToByteEncoder<ResponseData> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ResponseData msg, ByteBuf out) throws Exception {
out.writeInt(msg.getIntValue());
}
}
- 創(chuàng)建請(qǐng)求的處理器
public class ProcessingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RequestData requestData = (RequestData) msg;
ResponseData responseData = new ResponseData();
responseData.setIntValue(requestData.getIntValue() * 2);
ChannelFuture future = ctx.writeAndFlush(responseData);
future.addListener(ChannelFutureListener.CLOSE);
System.out.println(requestData);
}
}
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
RequestData msg = new RequestData();
msg.setIntValue(123);
msg.setStringValue(
"正常工作");
ChannelFuture future = ctx.writeAndFlush(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println((ResponseData)msg);
ctx.close();
}
}
- 創(chuàng)建服務(wù)端應(yīng)用
public class NettyServer {
private int port;
public NettyServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
int port = args.length > 0
? Integer.parseInt(args[0]) : 9003;
new NettyServer(port).run();
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new RequestDecoder(),
new ResponseDataEncoder(),
new ProcessingHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
- 創(chuàng)建客戶端應(yīng)用
public class NettyClient {
public static void main(String[] args) throws Exception {
String host = "127.0.0.1";
int port = 9003;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new RequestDataEncoder(),
new ResponseDataDecoder(), new ClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
- 運(yùn)行服務(wù)端和客戶端


可見(jiàn)正常工作
最后
這里我們只是對(duì)Netty進(jìn)行簡(jiǎn)單的介紹,介紹了它一些基本的概念,然后演示了一個(gè)例子。后續(xù)我們會(huì)對(duì)Netty進(jìn)行更深入的研究