Netty-EchoServer

EchoServer

EchoServer

public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();                  //#1
            b.group(group)                                              //#2
                    .channel(NioServerSocketChannel.class)              //#2
                    .localAddress(new InetSocketAddress(port))          //#2
                    .childHandler(new ChannelInitializer<SocketChannel>() { //#3
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());                  //#4
                        }
                    });

            ChannelFuture f = b.bind().sync();              //#5
            System.out.println(EchoServer.class.getSimpleName() + " started and listener on " + f.channel().localAddress());
            f.channel().closeFuture().sync();               //#6
        } finally {
            group.shutdownGracefully().sync();              //#7
        }

    }


    public static void main(String[] args) throws Exception {
        new EchoServer(8989).start();
    }

}
  • 首先,創(chuàng)建一個(gè)ServerBootstrap實(shí)例
  • 指定 NioEventLoopGroup 接收新的鏈接,并處理已經(jīng)接收的鏈接
  • 設(shè)置通道類型為 NioServerSocketChannel (當(dāng)然除了NIO,也有其他痛到可以選擇,例如:OIO OioServerSocketChannel)
  • 設(shè)置綁定的 InetSocketAddress
  • 指定 ChannelHandler 來處理接收的鏈接(這里使用ChannelInitializer創(chuàng)建了一個(gè)子通道)

ChannelPipeline 持有通道中所有不同的ChannelHandlers

sync() 該方法會(huì)阻塞直到服務(wù)綁定(在關(guān)閉時(shí)同理)

EchoServerHandler

@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Active");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Read");
        System.out.println("Server received : " + msg);
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Read Complete");
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        //ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Netty使用前面提到了Future和Callback的概念去處理不同的事件。我們需要繼承ChannelInboundHandlerAdapter,這樣我們可以處理不同的事件回調(diào)。

  • channelRead()方法,這個(gè)方法會(huì)在每次消息到達(dá)時(shí)回調(diào)。
  • exceptionCaught()方法,執(zhí)行異常情況下會(huì)被回調(diào)。

EchoClient

public class EchoClient {
    private String host;
    private int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();                        //#1 創(chuàng)建 bootstrap 客戶端
            b.group(group)                                        //#2 這里指定 NioEventLoopGroup 處理客戶端事件
                    .channel(NioSocketChannel.class)                     //#3 指定通道類型
                    .remoteAddress(new InetSocketAddress(host, port))    //#4 設(shè)置綁定地址和端口
                    .handler(new ChannelInitializer<SocketChannel>() {   //#5 使用ChannelInitializer,指定通道處理器
                        @Override
                        public void initChannel(SocketChannel ch)throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());//#6 將EchoClientHandler加入到管道
                        } });
            ChannelFuture f = b.connect().sync();                 //#7 連接到服務(wù)端
            f.channel().closeFuture().sync();                     //#8 阻塞直到客戶端通道關(guān)閉
        } finally {
            group.shutdownGracefully().sync();                    //#9 關(guān)閉線程池釋放資源
        }
    }


    public static void main(String[] args) throws Exception {
        new EchoClient("127.0.0.1", 8989).start();
    }
}

EchoClientHandler

@ChannelHandler.Sharable                                                        // #1  該注解標(biāo)示該處理器是可以在通道間共享的
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{


    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("Active");
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); //#2 通道連接上后寫入消息 記得flush() 很重要
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
        System.out.println("Read");

        System.out.println("Client received: " + ByteBufUtil
                .hexDump(in.readBytes(in.readableBytes())));  //#4
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,              //#5
                                Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  • channelRead0() 接收到數(shù)據(jù)的時(shí)候會(huì)回調(diào)該方法。但是,該方法接收的數(shù)據(jù)是分片的。也就是說,如果服務(wù)端寫入了5byte的數(shù)據(jù),該方法并不能保證一次就接收5byte的數(shù)據(jù),而可能回被回調(diào)兩次,一次接收3byte,一次接收2byte。不過像TCP這類的協(xié)議,該方法會(huì)保證接收數(shù)據(jù)的順序是與發(fā)送時(shí)一致的。
  • SimpleChannelInboundHandler & ChannelInboundHandlerAdapter 我們這里使用前者的原因是后者在接收處理完數(shù)據(jù)后需要負(fù)責(zé)釋放資源。在使用SimpleChannelInboundHandler時(shí)channelRead0()回調(diào)完成后Netty會(huì)幫我們完成釋放。而在EchoServerHandler中我們使用ChannelInboundHandlerAdapter是因?yàn)樵诜?wù)端我們需要回顯(Echo)消息,在回調(diào)方法channelRead()中寫入消息時(shí)又是異步寫入,所以在該方法中我們并不能釋放資源,而是在寫入完成后由Netty幫我們完成釋放。
Server結(jié)果
Client結(jié)果
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,659評論 19 139
  • 原文地址:http://netty.io/wiki/reference-counted-objects.html ...
    linkinparkzlz閱讀 1,731評論 0 0
  • 此時(shí),沒有什么適合的語言 來盡數(shù)這沉默的景仰 看著你們那被雨水浸濕的身影 一股暖流,不忍用手去擦試 你們的父母在遠(yuǎn)...
    詩雨長河閱讀 255評論 0 2
  • 海棠社第100社 主題:海棠社 東家:海棠社全體成員 編輯:禪貓,唐僧 時(shí)間:1月23日 20.00 文/寒霜 【...
    劉寒霜閱讀 325評論 21 20
  • 介紹 build模式經(jīng)常是用于構(gòu)建一個(gè)復(fù)雜多變對象,有3個(gè)要點(diǎn):1、這個(gè)對象的創(chuàng)建涉及到多個(gè)子元素的創(chuàng)建2、每個(gè)子...
    cxlin007閱讀 725評論 0 0

友情鏈接更多精彩內(nèi)容