《Netty實(shí)戰(zhàn)》讀書筆記02——第二章

第 2 章 筆記

利用Netty來構(gòu)建如下圖所示的Echo客戶端和服務(wù)器應(yīng)用程序,即客戶端在和服務(wù)器建立連接以后,發(fā)生消息,反過來,服務(wù)器又會(huì)將這個(gè)消息回送給客戶端,是典型的“請(qǐng)求-響應(yīng)交互”模型。

image

服務(wù)器

ChannelHandler

這里我們會(huì)繼承ChannelInboundHandlerAdapter類,并復(fù)寫其中的一些方法:

  • channelRead():在收到客戶端的請(qǐng)求的時(shí)候會(huì)調(diào)用該方法;
  • channelReadComplete():當(dāng)前批量讀取中的最后一條消息調(diào)用該方法;
  • exceptionCaught():出現(xiàn)異常的時(shí)候會(huì)調(diào)用

具體代碼如下:

@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 對(duì)于每個(gè)傳入的消息都會(huì)調(diào)用
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //將接收到的消息打印出來,并將該消息重新寫給發(fā)送者
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received:"+in.toString(CharsetUtil.UTF_8));
        ctx.write(in);

    }

    /**
     *  通知ChannelInboundHandler最后一次對(duì)channelRead()的調(diào)用是當(dāng)前批量讀取中的最后一條消息
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //將未發(fā)送完的消息沖刷到遠(yuǎn)程節(jié)點(diǎn),并且關(guān)閉該channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);

    }

    /**
     * 在讀取操作期間,有異常拋出會(huì)調(diào)用
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exception ~");
        cause.printStackTrace();
        ctx.close();
    }
}

引導(dǎo)服務(wù)器

主要邏輯如下:

  • 綁定到服務(wù)器上的某個(gè)端口,監(jiān)聽并接受傳入請(qǐng)求;
  • 配置Channel,將入站消息通知給EchoServerHandler的實(shí)例。

具體代碼如下:

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }


    public void start() throws Exception{
        final EchoServerHandler serverHandler=new EchoServerHandler();
        EventLoopGroup group=new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            //指定所使用的NIO傳輸channel
            b.group(group).channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))  //使用指定的端口設(shè)置套接字地址
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            //異步地綁定服務(wù);調(diào)用sync()阻塞等待直到綁定完成
            ChannelFuture f = b.bind().sync();
            //阻塞當(dāng)前線程直到它完成
            f.channel().closeFuture().sync();


        }finally {
            group.shutdownGracefully().sync();
        }

    }

    public static void main(String[] args) throws Exception {
        if (args.length!=1){
            System.err.println("args error!");
        }
        int port = Integer.parseInt(args[0]);
        new EchoServer(port).start();

    }
}

客戶端

客戶端的邏輯如下:

  1. 連接到服務(wù)器;
  2. 發(fā)生消息;
  3. 對(duì)于每個(gè)消息,等待并接收服務(wù)器響應(yīng)的相同消息;
  4. 關(guān)閉連接。

ChannelHandler

這里將繼承SimpleChannelInboundHandler,并復(fù)寫下列方法:

  • channelActive():連接建立以后就被調(diào)用
  • channelRead0():每收到一條來自服務(wù)器的消息時(shí)就被調(diào)用;
  • exceptionCaught():發(fā)生異常的時(shí)候調(diào)用。

具體代碼如下:

@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {



    /**
     * 建立連接以后,立即調(diào)用該方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty started !", CharsetUtil.UTF_8));

    }

    /**
     * 每收到服務(wù)器的一條響應(yīng)的時(shí)候調(diào)用
     *
     *
     */
    @Override
    public void channelRead0(ChannelHandlerContext chc, ByteBuf byteBuf) throws Exception {
        System.out.println("Client 接收到了:"+byteBuf.toString(CharsetUtil.UTF_8));
    }


    /**
     * 當(dāng)發(fā)生異常的時(shí)候調(diào)用
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Client Exception ~");
        cause.printStackTrace();
        ctx.close();
    }
}

引導(dǎo)

與服務(wù)器的引導(dǎo)相類似,具體代碼如下:

public class EchoClient {

    private String host;
    private int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception{
        EventLoopGroup group=new NioEventLoopGroup();
        try {
            //創(chuàng)建Bootstrap
            Bootstrap b = new Bootstrap();

            //指定 EventLoopGroup 以處理客戶端事件;需要適用于 NIO 的實(shí)現(xiàn)
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host,port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });

            //連接到遠(yuǎn)程節(jié)點(diǎn),阻塞等待直到連接完成
            ChannelFuture f = b.connect().sync();
            //阻塞,直到channel關(guān)閉
            f.channel().closeFuture().sync();


        }finally {
            //關(guān)閉線程池并且釋放所有資源
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length!=2){
            System.err.println("\"Usage: \" + EchoClient.class.getSimpleName() +\n" +
                    "\" <host> <port>\"");
            return;
        }
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        new EchoClient(host,port).start();

    }
}

測(cè)試結(jié)果

服務(wù)器:

image

客戶端:

image

補(bǔ)充: Discard型服務(wù)器

注意,這里Maven中需要引入依賴如下:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

這種類型的服務(wù)器,就是指服務(wù)器只接收客戶端的消息,而不對(duì)客戶端進(jìn)行響應(yīng),客戶端可以使用telnet來模擬。

ChannelHandler

代碼如下:

/**
 * 服務(wù)端的處理通道
 *
 * 在這里的處理只是簡(jiǎn)單地打印一下請(qǐng)求,然后拋棄這個(gè)請(qǐng)求
 *
 */
public class DiscardServerHandler extends ChannelHandlerAdapter {

    /**
     * 每當(dāng)收到客戶端的請(qǐng)求的時(shí)候,這個(gè)方法都被調(diào)用
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try{
            ByteBuf in= (ByteBuf) msg;
            System.out.print(in.toString(CharsetUtil.UTF_8));
        }finally {

            //拋棄收到的請(qǐng)求
            ReferenceCountUtil.release(msg);

        }
    }

    /**
     * 發(fā)生異常的時(shí)候會(huì)觸發(fā)這個(gè)方法
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();

    }
}

引導(dǎo)

代碼如下:

/**
 *
 * 啟動(dòng)服務(wù)管道處理
 */
public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        super();
        this.port = port;
    }

    public void run() throws Exception {
        /***
         * NioEventLoopGroup 是用來處理I/O操作的多線程事件循環(huán)器,
         * Netty提供了許多不同的EventLoopGroup的實(shí)現(xiàn)用來處理不同傳輸協(xié)議。 在這個(gè)例子中我們實(shí)現(xiàn)了一個(gè)服務(wù)端的應(yīng)用,
         * 因此會(huì)有2個(gè)NioEventLoopGroup會(huì)被使用。 第一個(gè)經(jīng)常被叫做‘boss’,用來接收進(jìn)來的連接。
         * 第二個(gè)經(jīng)常被叫做‘worker’,用來處理已經(jīng)被接收的連接, 一旦‘boss’接收到連接,就會(huì)把連接信息注冊(cè)到‘worker’上。
         * 如何知道多少個(gè)線程已經(jīng)被使用,如何映射到已經(jīng)創(chuàng)建的Channels上都需要依賴于EventLoopGroup的實(shí)現(xiàn),
         * 并且可以通過構(gòu)造函數(shù)來配置他們的關(guān)系。
         */
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workerGroup=new NioEventLoopGroup();

        System.out.println("準(zhǔn)備啟動(dòng)的端口是:"+port);

        try{
            //一個(gè)啟動(dòng)NIO服務(wù)的輔助啟動(dòng)類 你可以在這個(gè)服務(wù)中直接使用Channel
            ServerBootstrap b = new ServerBootstrap();

            //必須進(jìn)行設(shè)置
            b=b.group(bossGroup,workerGroup);

            //ServerSocketChannel以NIO的selector為基礎(chǔ)進(jìn)行實(shí)現(xiàn)的,用來接收新的連接
            //這里告訴Channel如何獲取新的連接
            b.channel(NioServerSocketChannel.class);

            /***
             * 這里的事件處理類經(jīng)常會(huì)被用來處理一個(gè)最近的已經(jīng)接收的Channel。
             * ChannelInitializer是一個(gè)特殊的處理類,
             * 其目的是幫助使用者配置一個(gè)新的Channel。
             * 也許你想通過增加一些處理類比如NettyServerHandler來配置一個(gè)新的Channel
             * 或者其對(duì)應(yīng)的ChannelPipeline來實(shí)現(xiàn)你的網(wǎng)絡(luò)程序。
             *
             * 當(dāng)你的程序變的復(fù)雜時(shí),可能你會(huì)增加更多的處理類到pipline上,然后提取這些匿名類到最頂層的類上。
             */
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new DiscardServerHandler()).addLast(new DiscardServerHandler());
                }
            });

            /***
             * 你可以設(shè)置這里指定的通道實(shí)現(xiàn)的配置參數(shù)。 我們正在寫一個(gè)TCP/IP的服務(wù)端,
             * 因此我們被允許設(shè)置socket的參數(shù)選項(xiàng)比如tcpNoDelay和keepAlive。
             * 請(qǐng)參考ChannelOption和詳細(xì)的ChannelConfig實(shí)現(xiàn)的接口文檔以此可以對(duì)ChannelOptions的有一個(gè)大概的認(rèn)識(shí)。
             */
            b.option(ChannelOption.SO_BACKLOG,128);

            /***
             * option()是提供給NioServerSocketChannel用來接收進(jìn)來的連接。
             * childOption()是提供給由父管道ServerChannel接收到的連接,
             * 在這個(gè)例子中也是NioServerSocketChannel。
             */
            b.childOption(ChannelOption.SO_KEEPALIVE,true);

            /***
             * 綁定端口并啟動(dòng)去接收進(jìn)來的連接
             */
            ChannelFuture f = b.bind(port).sync();

            //這里會(huì)一直等待,直到socket被關(guān)閉
            f.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws Exception {
        int port;
        if (args.length>0){
            port=Integer.parseInt(args[0]);
        }else {
            port=8080;
        }

        new DiscardServer(port).run();
        System.out.println("server is running: ");

    }


}

測(cè)試:

使用telnet模擬客戶端來測(cè)試,啟動(dòng)服務(wù)器

telnet 127.0.0.1 9999
image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容