Netty入門(mén)

Netty是一個(gè)異步的事件驅(qū)動(dòng)網(wǎng)絡(luò)框架,使用Netty可以研發(fā)高性能的私有協(xié)議,將業(yè)務(wù)邏輯和網(wǎng)絡(luò)進(jìn)行解耦,通過(guò)Netty我們可以實(shí)現(xiàn)一些常用的協(xié)議,如HTTP。

基本概念

Channel

Channel是NIO的基礎(chǔ),它代表一個(gè)連接,通過(guò)這個(gè)鏈接可以進(jìn)行IO操作,例如讀和寫(xiě)。

Future

在Netty的Channel中的每一個(gè)IO操作都是非阻塞的。
這就意味著每一個(gè)操作都是立刻返回結(jié)果的。在Java標(biāo)準(zhǔn)庫(kù)中有Future接口,但是我們使用Future的時(shí)候只能詢問(wèn)這個(gè)操作是否執(zhí)行完成,或者阻塞當(dāng)前的線程直到結(jié)果完成,這不是Netty想要的。

Netty實(shí)現(xiàn)了自己的ChannelFuture接口,我們可以傳遞一個(gè)回調(diào)到ChannelFuture,當(dāng)操作完成的時(shí)候才會(huì)執(zhí)行回調(diào)。

Events 和 Handlers

Netty使用的是事件驅(qū)動(dòng)的應(yīng)用設(shè)計(jì),因此Handler處理的數(shù)據(jù)流,在管道中是鏈?zhǔn)降氖录?。事件和Handler可以被 輸入 和 輸出的數(shù)據(jù)流進(jìn)行關(guān)聯(lián)。

輸入(Inbound)事件可以如下:

  • Channel激活和滅活
  • 讀操作事件
  • 異常事件
  • 用戶事件

輸出(Outbound)事件比較簡(jiǎn)單,一般是打開(kāi)和關(guān)閉連接,寫(xiě)入和刷新數(shù)據(jù)。

Encoder 和 Decoder

因?yàn)槲覀円幚砭W(wǎng)絡(luò)協(xié)議,需要操作數(shù)據(jù)的序列化和反序列化。

代碼

來(lái)個(gè)實(shí)際的案例:

  1. 新建項(xiàng)目,添加maven依賴
    <properties>
        <netty-all.version>4.1.6.Final</netty-all.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty-all.version}</version>
        </dependency>
    </dependencies>
  1. 創(chuàng)建數(shù)據(jù)的pojo
public class RequestData {
    private int intValue;
    private String stringValue;

    // getter 和 setter
    // toString方法
}

public class ResponseData {
    private int intValue;
    
    // getter 和 setter
    // toString方法    
}

  1. 創(chuàng)建Encoder和Decoder
public class RequestDataEncoder extends MessageToByteEncoder<RequestData> {

    private final Charset charset = Charset.forName("UTF-8");

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, RequestData msg, ByteBuf out) throws Exception {
        out.writeInt(msg.getIntValue());
        out.writeInt(msg.getStringValue().length());
        out.writeCharSequence(msg.getStringValue(), charset);
    }
}

public class ResponseDataDecoder extends ReplayingDecoder<ResponseData> {

    @Override
    protected void decode(ChannelHandlerContext ctx,
                          ByteBuf in, List<Object> out) throws Exception {

        ResponseData data = new ResponseData();
        data.setIntValue(in.readInt());
        out.add(data);
    }
}


public class RequestDecoder extends ReplayingDecoder<RequestData> {

    private final Charset charset = Charset.forName("UTF-8");

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {

            RequestData data = new RequestData();
            data.setIntValue(in.readInt());
            int strLen = in.readInt();
            data.setStringValue(
                    in.readCharSequence(strLen, charset).toString());
            out.add(data);
    }
}


public class ResponseDataEncoder extends MessageToByteEncoder<ResponseData> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, ResponseData msg, ByteBuf out) throws Exception {
        out.writeInt(msg.getIntValue());
    }
}

  1. 創(chuàng)建請(qǐng)求的處理器
public class ProcessingHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RequestData requestData = (RequestData) msg;
        ResponseData responseData = new ResponseData();
        responseData.setIntValue(requestData.getIntValue() * 2);
        ChannelFuture future = ctx.writeAndFlush(responseData);
        future.addListener(ChannelFutureListener.CLOSE);
        System.out.println(requestData);
    }
}


public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx)
            throws Exception {

        RequestData msg = new RequestData();
        msg.setIntValue(123);
        msg.setStringValue(
                "正常工作");
        ChannelFuture future = ctx.writeAndFlush(msg);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println((ResponseData)msg);
        ctx.close();
    }
}
  1. 創(chuàng)建服務(wù)端應(yīng)用
public class NettyServer {
    private int port;
    

    public NettyServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws Exception {

        int port = args.length > 0
                ? Integer.parseInt(args[0]) : 9003;

        new NettyServer(port).run();
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(new RequestDecoder(),
                                    new ResponseDataEncoder(),
                                    new ProcessingHandler());
                        }
                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

  1. 創(chuàng)建客戶端應(yīng)用
public class NettyClient {
    public static void main(String[] args) throws Exception {

        String host = "127.0.0.1";
        int port = 9003;
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(new RequestDataEncoder(),
                                    new ResponseDataDecoder(), new ClientHandler());
                        }
                    });

            ChannelFuture f = b.connect(host, port).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. 運(yùn)行服務(wù)端和客戶端
image
image

可見(jiàn)正常工作

最后

這里我們只是對(duì)Netty進(jìn)行簡(jiǎn)單的介紹,介紹了它一些基本的概念,然后演示了一個(gè)例子。后續(xù)我們會(huì)對(duì)Netty進(jìn)行更深入的研究

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • ? Netty強(qiáng)大的地方,是他能方便的實(shí)現(xiàn)自定義協(xié)議的網(wǎng)絡(luò)傳輸。在上一篇文章中,通過(guò)使用Netty封裝好的工具...
    追那個(gè)小女孩閱讀 93,053評(píng)論 4 47
  • Netty入門(mén) 1.Netty介紹 (1)百度百科介紹: Netty是由JBOSS提供的一個(gè)java開(kāi)源框架。Ne...
    零度微笑_019c閱讀 1,955評(píng)論 0 28
  • 一、組件 一個(gè) EventLoopGroup 包含一個(gè)或者多個(gè) EventLoop ; 一個(gè) EventLoop ...
    wxz1997閱讀 646評(píng)論 0 1
  • 最近一直在看Netty方面的資料,發(fā)現(xiàn)在某寶上面賣(mài)得最好的關(guān)于Netty的書(shū)是《Netty實(shí)戰(zhàn)》,恕我直言,這本翻...
    樂(lè)浩beyond閱讀 628評(píng)論 0 4
  • 前兩天寫(xiě)了一點(diǎn)netty相關(guān)的知識(shí),并寫(xiě)了一個(gè)demo,但是對(duì)其原理還是沒(méi)有深入,今天我們來(lái)做一次研究吧 首先讓我...
    我吃草莓閱讀 872評(píng)論 0 2

友情鏈接更多精彩內(nèi)容