第 2 章 筆記
利用Netty來構(gòu)建如下圖所示的Echo客戶端和服務(wù)器應(yīng)用程序,即客戶端在和服務(wù)器建立連接以后,發(fā)生消息,反過來,服務(wù)器又會(huì)將這個(gè)消息回送給客戶端,是典型的“請(qǐng)求-響應(yīng)交互”模型。

image
服務(wù)器
ChannelHandler
這里我們會(huì)繼承ChannelInboundHandlerAdapter類,并復(fù)寫其中的一些方法:
- channelRead():在收到客戶端的請(qǐng)求的時(shí)候會(huì)調(diào)用該方法;
- channelReadComplete():當(dāng)前批量讀取中的最后一條消息調(diào)用該方法;
- exceptionCaught():出現(xiàn)異常的時(shí)候會(huì)調(diào)用
具體代碼如下:
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
/**
* 對(duì)于每個(gè)傳入的消息都會(huì)調(diào)用
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//將接收到的消息打印出來,并將該消息重新寫給發(fā)送者
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received:"+in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}
/**
* 通知ChannelInboundHandler最后一次對(duì)channelRead()的調(diào)用是當(dāng)前批量讀取中的最后一條消息
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//將未發(fā)送完的消息沖刷到遠(yuǎn)程節(jié)點(diǎn),并且關(guān)閉該channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
/**
* 在讀取操作期間,有異常拋出會(huì)調(diào)用
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exception ~");
cause.printStackTrace();
ctx.close();
}
}
引導(dǎo)服務(wù)器
主要邏輯如下:
- 綁定到服務(wù)器上的某個(gè)端口,監(jiān)聽并接受傳入請(qǐng)求;
- 配置Channel,將入站消息通知給EchoServerHandler的實(shí)例。
具體代碼如下:
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception{
final EchoServerHandler serverHandler=new EchoServerHandler();
EventLoopGroup group=new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
//指定所使用的NIO傳輸channel
b.group(group).channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port)) //使用指定的端口設(shè)置套接字地址
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}
});
//異步地綁定服務(wù);調(diào)用sync()阻塞等待直到綁定完成
ChannelFuture f = b.bind().sync();
//阻塞當(dāng)前線程直到它完成
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
if (args.length!=1){
System.err.println("args error!");
}
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}
}
客戶端
客戶端的邏輯如下:
- 連接到服務(wù)器;
- 發(fā)生消息;
- 對(duì)于每個(gè)消息,等待并接收服務(wù)器響應(yīng)的相同消息;
- 關(guān)閉連接。
ChannelHandler
這里將繼承SimpleChannelInboundHandler,并復(fù)寫下列方法:
- channelActive():連接建立以后就被調(diào)用
- channelRead0():每收到一條來自服務(wù)器的消息時(shí)就被調(diào)用;
- exceptionCaught():發(fā)生異常的時(shí)候調(diào)用。
具體代碼如下:
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 建立連接以后,立即調(diào)用該方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty started !", CharsetUtil.UTF_8));
}
/**
* 每收到服務(wù)器的一條響應(yīng)的時(shí)候調(diào)用
*
*
*/
@Override
public void channelRead0(ChannelHandlerContext chc, ByteBuf byteBuf) throws Exception {
System.out.println("Client 接收到了:"+byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 當(dāng)發(fā)生異常的時(shí)候調(diào)用
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Client Exception ~");
cause.printStackTrace();
ctx.close();
}
}
引導(dǎo)
與服務(wù)器的引導(dǎo)相類似,具體代碼如下:
public class EchoClient {
private String host;
private int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception{
EventLoopGroup group=new NioEventLoopGroup();
try {
//創(chuàng)建Bootstrap
Bootstrap b = new Bootstrap();
//指定 EventLoopGroup 以處理客戶端事件;需要適用于 NIO 的實(shí)現(xiàn)
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host,port))
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
//連接到遠(yuǎn)程節(jié)點(diǎn),阻塞等待直到連接完成
ChannelFuture f = b.connect().sync();
//阻塞,直到channel關(guān)閉
f.channel().closeFuture().sync();
}finally {
//關(guān)閉線程池并且釋放所有資源
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
if (args.length!=2){
System.err.println("\"Usage: \" + EchoClient.class.getSimpleName() +\n" +
"\" <host> <port>\"");
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
new EchoClient(host,port).start();
}
}
測(cè)試結(jié)果
服務(wù)器:

image
客戶端:

image
補(bǔ)充: Discard型服務(wù)器
注意,這里Maven中需要引入依賴如下:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
這種類型的服務(wù)器,就是指服務(wù)器只接收客戶端的消息,而不對(duì)客戶端進(jìn)行響應(yīng),客戶端可以使用telnet來模擬。
ChannelHandler
代碼如下:
/**
* 服務(wù)端的處理通道
*
* 在這里的處理只是簡(jiǎn)單地打印一下請(qǐng)求,然后拋棄這個(gè)請(qǐng)求
*
*/
public class DiscardServerHandler extends ChannelHandlerAdapter {
/**
* 每當(dāng)收到客戶端的請(qǐng)求的時(shí)候,這個(gè)方法都被調(diào)用
*
* @param ctx
* @param msg
* @throws Exception
*/
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
ByteBuf in= (ByteBuf) msg;
System.out.print(in.toString(CharsetUtil.UTF_8));
}finally {
//拋棄收到的請(qǐng)求
ReferenceCountUtil.release(msg);
}
}
/**
* 發(fā)生異常的時(shí)候會(huì)觸發(fā)這個(gè)方法
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
引導(dǎo)
代碼如下:
/**
*
* 啟動(dòng)服務(wù)管道處理
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
super();
this.port = port;
}
public void run() throws Exception {
/***
* NioEventLoopGroup 是用來處理I/O操作的多線程事件循環(huán)器,
* Netty提供了許多不同的EventLoopGroup的實(shí)現(xiàn)用來處理不同傳輸協(xié)議。 在這個(gè)例子中我們實(shí)現(xiàn)了一個(gè)服務(wù)端的應(yīng)用,
* 因此會(huì)有2個(gè)NioEventLoopGroup會(huì)被使用。 第一個(gè)經(jīng)常被叫做‘boss’,用來接收進(jìn)來的連接。
* 第二個(gè)經(jīng)常被叫做‘worker’,用來處理已經(jīng)被接收的連接, 一旦‘boss’接收到連接,就會(huì)把連接信息注冊(cè)到‘worker’上。
* 如何知道多少個(gè)線程已經(jīng)被使用,如何映射到已經(jīng)創(chuàng)建的Channels上都需要依賴于EventLoopGroup的實(shí)現(xiàn),
* 并且可以通過構(gòu)造函數(shù)來配置他們的關(guān)系。
*/
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
System.out.println("準(zhǔn)備啟動(dòng)的端口是:"+port);
try{
//一個(gè)啟動(dòng)NIO服務(wù)的輔助啟動(dòng)類 你可以在這個(gè)服務(wù)中直接使用Channel
ServerBootstrap b = new ServerBootstrap();
//必須進(jìn)行設(shè)置
b=b.group(bossGroup,workerGroup);
//ServerSocketChannel以NIO的selector為基礎(chǔ)進(jìn)行實(shí)現(xiàn)的,用來接收新的連接
//這里告訴Channel如何獲取新的連接
b.channel(NioServerSocketChannel.class);
/***
* 這里的事件處理類經(jīng)常會(huì)被用來處理一個(gè)最近的已經(jīng)接收的Channel。
* ChannelInitializer是一個(gè)特殊的處理類,
* 其目的是幫助使用者配置一個(gè)新的Channel。
* 也許你想通過增加一些處理類比如NettyServerHandler來配置一個(gè)新的Channel
* 或者其對(duì)應(yīng)的ChannelPipeline來實(shí)現(xiàn)你的網(wǎng)絡(luò)程序。
*
* 當(dāng)你的程序變的復(fù)雜時(shí),可能你會(huì)增加更多的處理類到pipline上,然后提取這些匿名類到最頂層的類上。
*/
b.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new DiscardServerHandler()).addLast(new DiscardServerHandler());
}
});
/***
* 你可以設(shè)置這里指定的通道實(shí)現(xiàn)的配置參數(shù)。 我們正在寫一個(gè)TCP/IP的服務(wù)端,
* 因此我們被允許設(shè)置socket的參數(shù)選項(xiàng)比如tcpNoDelay和keepAlive。
* 請(qǐng)參考ChannelOption和詳細(xì)的ChannelConfig實(shí)現(xiàn)的接口文檔以此可以對(duì)ChannelOptions的有一個(gè)大概的認(rèn)識(shí)。
*/
b.option(ChannelOption.SO_BACKLOG,128);
/***
* option()是提供給NioServerSocketChannel用來接收進(jìn)來的連接。
* childOption()是提供給由父管道ServerChannel接收到的連接,
* 在這個(gè)例子中也是NioServerSocketChannel。
*/
b.childOption(ChannelOption.SO_KEEPALIVE,true);
/***
* 綁定端口并啟動(dòng)去接收進(jìn)來的連接
*/
ChannelFuture f = b.bind(port).sync();
//這里會(huì)一直等待,直到socket被關(guān)閉
f.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length>0){
port=Integer.parseInt(args[0]);
}else {
port=8080;
}
new DiscardServer(port).run();
System.out.println("server is running: ");
}
}
測(cè)試:
使用telnet模擬客戶端來測(cè)試,啟動(dòng)服務(wù)器
telnet 127.0.0.1 9999

image