EchoServer
EchoServer
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); //#1
b.group(group) //#2
.channel(NioServerSocketChannel.class) //#2
.localAddress(new InetSocketAddress(port)) //#2
.childHandler(new ChannelInitializer<SocketChannel>() { //#3
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler()); //#4
}
});
ChannelFuture f = b.bind().sync(); //#5
System.out.println(EchoServer.class.getSimpleName() + " started and listener on " + f.channel().localAddress());
f.channel().closeFuture().sync(); //#6
} finally {
group.shutdownGracefully().sync(); //#7
}
}
public static void main(String[] args) throws Exception {
new EchoServer(8989).start();
}
}
- 首先,創(chuàng)建一個(gè)ServerBootstrap實(shí)例
- 指定 NioEventLoopGroup 接收新的鏈接,并處理已經(jīng)接收的鏈接
- 設(shè)置通道類型為 NioServerSocketChannel (當(dāng)然除了NIO,也有其他痛到可以選擇,例如:OIO OioServerSocketChannel)
- 設(shè)置綁定的 InetSocketAddress
- 指定 ChannelHandler 來處理接收的鏈接(這里使用ChannelInitializer創(chuàng)建了一個(gè)子通道)
ChannelPipeline 持有通道中所有不同的ChannelHandlers
sync() 該方法會(huì)阻塞直到服務(wù)綁定(在關(guān)閉時(shí)同理)
EchoServerHandler
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Active");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Read");
System.out.println("Server received : " + msg);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("Read Complete");
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
//ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Netty使用前面提到了Future和Callback的概念去處理不同的事件。我們需要繼承ChannelInboundHandlerAdapter,這樣我們可以處理不同的事件回調(diào)。
-
channelRead()方法,這個(gè)方法會(huì)在每次消息到達(dá)時(shí)回調(diào)。 -
exceptionCaught()方法,執(zhí)行異常情況下會(huì)被回調(diào)。
EchoClient
public class EchoClient {
private String host;
private int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); //#1 創(chuàng)建 bootstrap 客戶端
b.group(group) //#2 這里指定 NioEventLoopGroup 處理客戶端事件
.channel(NioSocketChannel.class) //#3 指定通道類型
.remoteAddress(new InetSocketAddress(host, port)) //#4 設(shè)置綁定地址和端口
.handler(new ChannelInitializer<SocketChannel>() { //#5 使用ChannelInitializer,指定通道處理器
@Override
public void initChannel(SocketChannel ch)throws Exception {
ch.pipeline().addLast(new EchoClientHandler());//#6 將EchoClientHandler加入到管道
} });
ChannelFuture f = b.connect().sync(); //#7 連接到服務(wù)端
f.channel().closeFuture().sync(); //#8 阻塞直到客戶端通道關(guān)閉
} finally {
group.shutdownGracefully().sync(); //#9 關(guān)閉線程池釋放資源
}
}
public static void main(String[] args) throws Exception {
new EchoClient("127.0.0.1", 8989).start();
}
}
EchoClientHandler
@ChannelHandler.Sharable // #1 該注解標(biāo)示該處理器是可以在通道間共享的
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Active");
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); //#2 通道連接上后寫入消息 記得flush() 很重要
}
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
System.out.println("Read");
System.out.println("Client received: " + ByteBufUtil
.hexDump(in.readBytes(in.readableBytes()))); //#4
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, //#5
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
- channelRead0() 接收到數(shù)據(jù)的時(shí)候會(huì)回調(diào)該方法。但是,該方法接收的數(shù)據(jù)是分片的。也就是說,如果服務(wù)端寫入了5byte的數(shù)據(jù),該方法并不能保證一次就接收5byte的數(shù)據(jù),而可能回被回調(diào)兩次,一次接收3byte,一次接收2byte。不過像TCP這類的協(xié)議,該方法會(huì)保證接收數(shù)據(jù)的順序是與發(fā)送時(shí)一致的。
- SimpleChannelInboundHandler & ChannelInboundHandlerAdapter 我們這里使用前者的原因是后者在接收處理完數(shù)據(jù)后需要負(fù)責(zé)釋放資源。在使用SimpleChannelInboundHandler時(shí)channelRead0()回調(diào)完成后Netty會(huì)幫我們完成釋放。而在EchoServerHandler中我們使用ChannelInboundHandlerAdapter是因?yàn)樵诜?wù)端我們需要回顯(Echo)消息,在回調(diào)方法channelRead()中寫入消息時(shí)又是異步寫入,所以在該方法中我們并不能釋放資源,而是在寫入完成后由Netty幫我們完成釋放。

Server結(jié)果

Client結(jié)果