新連接接入

入口

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        ...
        // 服務器收到Accept事件,來到這里準備新連接的處理
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            ...
        }
    ...
}

接下來開始接入新連接,read會將拿到的新連接存在本地的readBuf中。

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                // doReadMessages拿底層accept的socketchannel
                // 將拿到的channel收集到本地readBuf
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }

                allocHandle.incMessagesRead(localRead);
                // 判斷是否還可以繼續(xù)接入新連接
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        // 遍歷上面收集到的連接channel,逐個觸發(fā)fireChannelRead
        // 而這里觸發(fā)的pipeline的handler是服務端啟動的時候默認加載的ServerBootstrapAcceptor
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        ...
    } finally {
       ...
    }
}

組裝SocketChannel

protected int doReadMessages(List<Object> buf) throws Exception {
    // 拿到底層NIO的SocketChannel
    SocketChannel ch = javaChannel().accept();

    try {
        if (ch != null) {
            // 封裝SocketChannel為Netty自定義的NioSocketChannel
            // 并將this也就是NioServerSocketChannel作為parent
            // 加到本地保存,并返回1,表示成功拿到
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        ...
    }
    // 否則沒有連接,返回0
    return 0;
}

Channel分類

1563634925318.png

NioSocketChannel結(jié)構

1563633366967.png

創(chuàng)建NioSocketChannel

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

AbstractNioByteChannel

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    // 將OP_READ往上傳,說明新連接建立后,服務器開始接收READ事件
    super(parent, ch, SelectionKey.OP_READ);
}

AbstractNioChannel

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    // 保存NIO的SocketChannel
    this.ch = ch;
    // 保存READ事件碼
    this.readInterestOp = readInterestOp;
    try {
        // 設置NIO非阻塞模式
        ch.configureBlocking(false);
    } catch (IOException e) {
       ...
    }
}

AbstractChannel

protected AbstractChannel(Channel parent) {
    // NioServerSocketChannel
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

config

public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
    super(channel);
    if (javaSocket == null) {
        throw new NullPointerException("javaSocket");
    }
    this.javaSocket = javaSocket;

    // Enable TCP_NODELAY by default if possible.
    if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
        try {
            // 設置TcpNoDelay,不會攢包,有的話立即發(fā)送
            setTcpNoDelay(true);
        } catch (Exception e) {
            // Ignore.
        }
    }
}

ServerBootstrapAcceptor

fireChannelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    // childHandler添加到子連接的pipeline中,也就是ChannelInitializer
    // 這里作為pipeline的起手式,將該ChannelInitializer作為該pipeline的第一個執(zhí)行者
    child.pipeline().addLast(childHandler);

    // 設置options到客戶端channel的config
    for (Entry<ChannelOption<?>, Object> e: childOptions) {
        try {
            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue()))
            ...
        ...
    }

    // 設置attrs
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        // 將NioSocketChannel注冊到NioEventLoopGroup上的一個EventLoop上
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    ...
}

register

public ChannelFuture register(Channel channel) {
    // next就是拿到下一個EventLoop,NioEventLoopGroup的chooser來滾動整個EventLoop數(shù)組
    // 繼續(xù)往下探
    return next().register(channel);
}
public ChannelFuture register(Channel channel) {
    // 對channel進行簡單封裝成Promise
    return register(new DefaultChannelPromise(channel, this));
}
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...
    AbstractChannel.this.eventLoop = eventLoop;

    // 當前是boss發(fā)起,那么這里必然進不去
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 這里新建線程運行,并跟eventLoop綁定,之后就由該線程與channel進行互動
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ...
        }
    }
}
private void register0(ChannelPromise promise) {
    try {

        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                ...
                beginRead();
            }
        }
    } catch (Throwable t) {
       ...
    }
}

doRegister

 protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 終于看到NIO的調(diào)用,將selector與channel綁定
                // 至此底層注冊完成
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...
            }
        }
    }

invokeHandlerAddedIfNeeded

final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false;
        // 接著去調(diào)用callHandlerAddedForAllHandlers
        callHandlerAddedForAllHandlers();
    }
}
private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;

        // This Channel itself was registered.
        registered = true;

        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        // Null out so it can be GC'ed.
        this.pendingHandlerCallbackHead = null;
    }
    
    // 那么這個pendingHandlerCallbackHead到底是哪里來的,要記得上面boss在fireChannelRead的時候會
    // addLast一個ChannelInitializer么?
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        // 那么這里的execute
        task.execute();
        task = task.next;
    }
}
ChannelInitializer
addLast
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { 
    ...
    // 將ChannelInitializer保證成nexCtx
    newCtx = newContext(group, filterName(name, handler), handler);
    ...
    // 如果是客戶端channel還沒有注冊到這個EventLoop上,調(diào)用callHandlerCallbackLater,將              // ChannelInitializer與pendingHandlerCallbackHead進行綁定
    // 實際上這個handler將作為客戶端pipeline的head來處理,且初始化完成后會remove掉自己
    if (!registered) {
        newCtx.setAddPending();
        callHandlerCallbackLater(newCtx, true);
        return this;
    }
    ...
}
execute
void execute() {
    EventExecutor executor = ctx.executor();
    if (executor.inEventLoop()) {
        // 最終會調(diào)用callHandlerAdded0
        callHandlerAdded0(ctx);
    } else {
        try {
            executor.execute(this);
        } catch (RejectedExecutionException e) {
           ...
        }
    }
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            // 會最終會執(zhí)行Initializer的initChannel
            initChannel(ctx);
        }
    }
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
        try {
            // 這里真正開始執(zhí)行用戶自定義的Initializer的initChannel方法
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
           ...
        } finally {
            // 責任鏈初始化完成后會移除該Initializer,這樣整個客戶端pipeline的責任鏈初始化完成
            remove(ctx);
        }
        return true;
    }
    return false;
}

fireChannelRegistered

public ChannelHandlerContext fireChannelRegistered() {
    // 遍歷pipeline的handler執(zhí)行invokeChannelRegistered
    invokeChannelRegistered(findContextInbound());
    return this;
}
private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        // 拿到下一個handler
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}
private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            // 執(zhí)行channelRegistered方法
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();
    }
}

fireChannelActive

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 將ChannelActive事件在pipeline上繼續(xù)往后傳播,也就是執(zhí)行各個鏈上的channelActive方法
    ctx.fireChannelActive();
    // 開始正式read
    readIfIsAutoRead();
}
read
private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        // 這里會在底層注冊給selector注冊通道的read事件
        channel.read();
    }
}
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        // 前面register的時候,沒有注冊任何事件,也就是0,這里開始注冊read,說明前期準備工作已經(jīng)完成
        // 可以正式開始接收客戶端發(fā)來的數(shù)據(jù)了
        // 而這里的readInterestOp是創(chuàng)建NioSocketChannel的時候指定的OP_READ事件
        // NIO的代碼,做ready注冊
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容