Netty 模型

5.8.1工作原理示意圖 1-簡單版

Netty 主要基于主從 Reactors 多線程模型(如圖)做了一定的改進(jìn),其中主從 Reactor 多線程模型有多個 Reactor


image.png

5.8.2對上圖說明

  1. BossGroup 線程維護(hù) Selector , 只關(guān)注 Accecpt
  2. 當(dāng)接收到 Accept 事件,獲取到對應(yīng)的 SocketChannel, 封裝成 NIOScoketChannel 并注冊到 Worker 線程(事件循 環(huán)), 并進(jìn)行維護(hù)
  3. 當(dāng) Worker 線程監(jiān)聽到 selector 中通道發(fā)生自己感興趣的事件后,就進(jìn)行處理(就由 handler), 注意 handler 已 經(jīng)加入到通道

5.8.3工作原理示意圖 2-進(jìn)階版

image.png

5.8.4工作原理示意圖-詳細(xì)版

image.png

5.8.5對上圖的說明小結(jié)

  1. Netty 抽象出兩組線程池 BossGroup 專門負(fù)責(zé)接收客戶端的連接, WorkerGroup 專門負(fù)責(zé)網(wǎng)絡(luò)的讀寫
  2. BossGroup 和 WorkerGroup 類型都是 NioEventLoopGroup
  3. NioEventLoopGroup 相當(dāng)于一個事件循環(huán)組, 這個組中含有多個事件循環(huán) ,每一個事件循環(huán)是 NioEventLoop
  4. NioEventLoop 表示一個不斷循環(huán)的執(zhí)行處理任務(wù)的線程, 每個 NioEventLoop 都有一個 selector , 用于監(jiān)聽綁 定在其上的 socket 的網(wǎng)絡(luò)通訊
  5. NioEventLoopGroup 可以有多個線程, 即可以含有多個 NioEventLoop
  6. 每個 Boss NioEventLoop 循環(huán)執(zhí)行的步驟有 3 步
    ? 輪詢 accept 事件
    ? 處理 accept 事件 , 與 client 建立連接 , 生成 NioScocketChannel , 并將其注冊到某個 worker NIOEventLoop 上 的 selector
    ? 處理任務(wù)隊列的任務(wù) , 即 runAllTasks
  7. 每個 Worker NIOEventLoop 循環(huán)執(zhí)行的步驟
    ? 輪詢 read, write 事件
    ? 處理 i/o 事件, 即 read , write 事件,在對應(yīng) NioScocketChannel 處理
    ? 處理任務(wù)隊列的任務(wù) , 即 runAllTasks
  8. 每個Worker NIOEventLoop 處理業(yè)務(wù)時,會使用pipeline(管道), pipeline 中包含了 channel , 即通過pipeline 可以獲取到對應(yīng)通道, 管道中維護(hù)了很多的 處理器

5.8.6Netty 快速入門實例-TCP 服務(wù)

實例要求:使用 IDEA 創(chuàng)建 Netty 項目

  1. Netty 服務(wù)器在 6668 端口監(jiān)聽,客戶端能發(fā)送消息給服務(wù)器 "hello, 服務(wù)器~"
  2. 服務(wù)器可以回復(fù)消息給客戶端 "hello, 客戶端~"
  3. 目的:對 Netty 線程模型 有一個初步認(rèn)識, 便于理解 Netty 模型理論
  4. 看老師代碼演示
    5.1 編寫服務(wù)端 5.2 編寫客戶端 5.3 對 netty 程序進(jìn)行分析,看看 netty 模型特點(diǎn) 說明: 創(chuàng)建 Maven 項目,并引入 Netty 包
  <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.43.Final</version>
        </dependency>

NettyServer

package com.qiz.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author qiz
 */
public class NettyServer {
    public static void main(String[] args)  {

        //創(chuàng)建BossGroup 和 WorkGroup
        //創(chuàng)建兩個線程組
        //boosGroup只是處理連接請求,真正和客戶端業(yè)務(wù)處理的是workGroup
        //兩個都是無限循環(huán)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        //創(chuàng)建服務(wù)器端的啟動對象,配置參數(shù)
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用鏈?zhǔn)骄幊虂磉M(jìn)行設(shè)置
            bootstrap.group(bossGroup,workGroup)//設(shè)置兩個線程組
                    .channel(NioServerSocketChannel.class)//使用NioSocketChannel 作為服務(wù)器的通道實現(xiàn)
                    .option(ChannelOption.SO_BACKLOG,128)//設(shè)置線程隊列得到連接個數(shù)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)//設(shè)置保持活動連接狀態(tài)
                    .childHandler(new ChannelInitializer<SocketChannel>() {//創(chuàng)建一個通道測試對象(匿名對象)
                        //給pipeline 設(shè)置處理器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });//給我們的workGroup 的 EventLoop 對應(yīng)的管道記錄處理器
            System.out.println("..........服務(wù)器 is ready");
            //綁定一個端口并且同步,生成了一個ChannelFuture對象
            //啟動服務(wù)器(并綁定端口)
            ChannelFuture cf = bootstrap.bind(6668).sync();
            //對關(guān)閉通道進(jìn)行監(jiān)聽
            cf.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

NettyServerHandler

package com.qiz.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author qiz
 * 自定義一個handler需要繼承netty 規(guī)定好的某個HandlerAdapter
 * 這時我們自定義一個Handler,才能稱為一個handler
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //讀取數(shù)據(jù)完畢
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //將數(shù)據(jù)寫入到緩沖,并刷新
        //一般講,我們對這個發(fā)送的數(shù)據(jù)進(jìn)行編碼
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端~",CharsetUtil.UTF_8));
    }

    //處理異常,一般是需要關(guān)閉通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    //讀取數(shù)據(jù)事件(這里我們可以讀取客戶端發(fā)送的消息)
    //上下文對象,含有管道pipeline,通道 地址
    //msg 客戶端發(fā)送的數(shù)據(jù) 默認(rèn)object
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


        System.out.println("server ctx =" + ctx);
        //將 msg 轉(zhuǎn)成一個ByteBuf
        //ByteBuf 是 Netty 提供的,不是NIO 的ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println(""+buf.toString(CharsetUtil.UTF_8));
        System.out.println("客戶端地址:"+ctx.channel().remoteAddress());

    }
}

NettyClient

package com.qiz.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author qiz
 */
public class NettyClient {
    public static void main(String[] args) {

        //客戶端需要一個事件循環(huán)組
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //創(chuàng)建客戶端啟動對象
            //客戶端使用BootStrap  服務(wù)端使用ServerBootStrap
            Bootstrap bootstrap = new Bootstrap();

            //設(shè)置相關(guān)參數(shù)
            bootstrap.group(eventExecutors) //設(shè)置線程組
                    .channel(NioSocketChannel.class) //設(shè)置客戶端通道的實現(xiàn)類(反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });

            System.out.println("客戶端 ok..");

            //啟動客戶端去連接服務(wù)器端
            //關(guān)于ChannelFuture 要分析,涉及到netty的異步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //給關(guān)閉通道進(jìn)行監(jiān)聽
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
          //
        }finally {
            eventExecutors.shutdownGracefully();
        }

    }
}

NettyClientHandler

package com.qiz.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author qiz
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    //當(dāng)通道就緒就會觸發(fā)該方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client" + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server:<>", CharsetUtil.UTF_8));
    }

    //當(dāng)通道有讀取事件時會觸發(fā)
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服務(wù)器回復(fù)的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("服務(wù)器的地址" +ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

5.8.7任務(wù)隊列中的 Task 有 3 種典型使用場景

  1. 用戶程序自定義的普通任務(wù) [舉例說明]
  2. 用戶自定義定時任務(wù)
  3. 非當(dāng)前 Reactor 線程調(diào)用 Channel 的各種方法 例如在推送系統(tǒng)的業(yè)務(wù)線程里面,根據(jù)用戶的標(biāo)識,找到對應(yīng)的 Channel 引用,然后調(diào)用 Write 類方法向該用戶推送消息,就會進(jìn)入到這種場景。最終的 Write 會提交到任務(wù)隊列中后被異步消費(fèi)

解決方案 1 用戶程序自定義的普通任務(wù) 阻塞的線程會后執(zhí)行,先往下執(zhí)行

ctx.channel().eventLoop().execute(new Runnable() { 
 @Override 
 public void run() { 
 try {
  Thread.sleep(5 * 1000); 
 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵 2", 
 CharsetUtil.UTF_8)); 
 System.out.println("channel code=" + ctx.channel().hashCode()); 
}
 catch (Exception ex) { 
System.out.println("發(fā)生異常" + ex.getMessage()); 
} } }); 

ctx.channel().eventLoop().execute(new Runnable() { 
@Override 
public void run() {
try {Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵 3", CharsetUtil.UTF_8)); 
System.out.println("channel code=" + ctx.channel().hashCode()); 
} catch (Exception ex) { 
System.out.println("發(fā)生異常" + ex.getMessage());
 } } });
IMG_2238.PNG

解決方案 2 : 用戶自定義定時任務(wù) -》 該任務(wù)是提交到 scheduledTaskQueue 中

ctx.channel().eventLoop().schedule(new Runnable() { 
@Override public void run() { 
try {Thread.sleep(5 * 1000); 
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵 4", CharsetUtil.UTF_8)); 
System.out.println("channel code=" + ctx.channel().hashCode()); 
} catch (Exception ex) { 
System.out.println("發(fā)生異常" + ex.getMessage()); 
} } }, 5, TimeUnit.SECONDS);

5.8.8方案再說明

  1. Netty 抽象出兩組線程池,BossGroup 專門負(fù)責(zé)接收客戶端連接,WorkerGroup 專門負(fù)責(zé)網(wǎng)絡(luò)讀寫操作。
  2. NioEventLoop 表示一個不斷循環(huán)執(zhí)行處理任務(wù)的線程,每個 NioEventLoop 都有一個 selector,用于監(jiān)聽綁定 在其上的 socket 網(wǎng)絡(luò)通道。
  3. NioEventLoop 內(nèi)部采用串行化設(shè)計,從消息的讀取->解碼->處理->編碼->發(fā)送,始終由 IO 線程 NioEventLoop 負(fù)責(zé)
    ? NioEventLoopGroup 下包含多個 NioEventLoop
    ? 每個 NioEventLoop 中包含有一個 Selector,一個 taskQueue
    ? 每個 NioEventLoop 的 Selector 上可以注冊監(jiān)聽多個 NioChannel
    ? 每個 NioChannel 只會綁定在唯一的 NioEventLoop 上
    ? 每個 NioChannel 都綁定有一個自己的 ChannelPipeline
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容