netty 源碼分析 (三) Channel

netty 源碼分析 (三) Channel

sschrodinger

2019/06/25


參考


《Netty 權(quán)威指南》第二版 - 李林鋒 著

JDK version 1.8


Channel 簇


Channel

Channel 是 netty 抽象出來的與網(wǎng)絡(luò) I/O ,該接口實現(xiàn)了所有的 I/O 操作方法和與 netty 框架相關(guān)的方法,如 pipeline() 等方法獲得責(zé)任鏈。Channel 的定義如下:

public interface ChannelOutboundInvoker {

    ChannelFuture bind(SocketAddress localAddress);

    ChannelFuture connect(SocketAddress remoteAddress);

    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);

    ChannelFuture disconnect();

    ChannelFuture close();

    ChannelFuture deregister();

    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);

    ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);

    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

    ChannelFuture disconnect(ChannelPromise promise);

    ChannelFuture close(ChannelPromise promise);

    ChannelFuture deregister(ChannelPromise promise);

    ChannelOutboundInvoker read();

    ChannelFuture write(Object msg);

    ChannelFuture write(Object msg, ChannelPromise promise);

    ChannelOutboundInvoker flush();

    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

    ChannelFuture writeAndFlush(Object msg);

    ChannelPromise newPromise();

    ChannelProgressivePromise newProgressivePromise();

    ChannelFuture newSucceededFuture();

    ChannelFuture newFailedFuture(Throwable cause);

    ChannelPromise voidPromise();
}

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {

    ChannelId id();

    EventLoop eventLoop();

    Channel parent();

    ChannelConfig config();

    boolean isOpen();

    boolean isRegistered();

    boolean isActive();

    ChannelMetadata metadata();

    SocketAddress localAddress();

    SocketAddress remoteAddress();

    ChannelFuture closeFuture();

    boolean isWritable();

    long bytesBeforeUnwritable();

    long bytesBeforeWritable();

    Unsafe unsafe();

    ChannelPipeline pipeline();

    ByteBufAllocator alloc();

    @Override
    Channel read();

    @Override
    Channel flush();

    interface Unsafe {
        //...
    }
}

note

  • ChannelOutboundInvoker 接口申明了所有的外向的網(wǎng)絡(luò)操作,即流量向外的網(wǎng)絡(luò)操作。
  • Channel 接口返回的都是 Future 類型的變量及其子類(立即返回),通過 Futureget() 方法或者 isDone() 方法判斷是否執(zhí)行成功。(包括 ChannelFutureChannelPromise、ChannelProgressivePromise 等以 future 或者 promise 結(jié)尾的都是 Future 的子類)。
  • 對于所有的 I/O 操作,都是異步的,所以需要返回 Future 對返回的結(jié)果進行處理。

AbstractChannel 實現(xiàn)了部分的 Channel 接口,AbstractChannel 抽象類包括了如下域:

// 一些靜態(tài)的異常
private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ExtendedClosedChannelException(null), AbstractUnsafe.class, "ensureOpen(...)");
private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ExtendedClosedChannelException(null), AbstractUnsafe.class, "write(...)");
private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ExtendedClosedChannelException(null), AbstractUnsafe.class, "flush0()");
private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");

// 父 channel
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
// 默認的 pipeline
private final DefaultChannelPipeline pipeline;
// 空的 future,對于 isDone, isSuccess 等方法,都立即返回 false
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);

private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile boolean registered;
private boolean closeInitiated;
private Throwable initialCloseCause;

/** Cache for the string representation of this channel */
private boolean strValActive;
    private String strVal;

每一個域都可以實現(xiàn)特定的功能,通過組合,就可以實現(xiàn) Channel 的各種功能。

基本上繼承自 ChannelOutboundInvoker 的方法,都交由 pipeline 處理,如下:

public ChannelFuture bind(SocketAddress localAddress) {
    return pipeline.bind(localAddress);

    
//...
}

同時,該類也增加了一些較為實用的函數(shù)供子類改寫:


protected abstract void doBeginRead() throws Exception;
// 該方法會在 Channel 被注冊到 EventLoop(一個線程)中時被調(diào)用
protected void doRegister() throws Exception {
    // NOOP
}

protected abstract void doBind(SocketAddress localAddress) throws Exception;

protected abstract void doDisconnect() throws Exception;
protected abstract void doClose() throws Exception;

protected void doDeregister() throws Exception {
    // NOOP
}

AbstractNioChannel

該類主要實現(xiàn)了 JDK 中 NIO 的部分功能。

首先來看該類所持有的部分域:

// NIO 可選擇 Channel
private final SelectableChannel ch;
protected final int readInterestOp;
// NIO 感興趣的鍵
volatile SelectionKey selectionKey;
boolean readPending;

/**
 * The future of the current connection attempt.  If not null, subsequent
 * connection attempts will fail.
 */
private ChannelPromise connectPromise;
// 返回一個 ScheduleFuture,相比普通的 Future,多了一個 getDelay 方法 
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;

首先看該類的構(gòu)造函數(shù),如下:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

最關(guān)鍵的一點就是在初始化該類的實例時,會將 channel 設(shè)置成非阻塞模式的。

重點看 doRegister 方法,該方法主要是開啟監(jiān)聽,并將該 Channel 注冊到 EventLoop (一個線程,用于處理事件)。

// return SelectableChannel
protected SelectableChannel javaChannel() {return ch;}

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    // 3.
    for (;;) {
        try {
            // 1.
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                // 2.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

第一處 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this) 實際上是調(diào)用的 channel.register(selector, 0, object)。

這就是 NIO 中將 channel 注冊到 selector 的步驟,接下來的兩個參數(shù),0 代表對任意事件都不關(guān)心,object 代表 attachment (附件),在該方法中,將他自身作為附件,在以后可以通過 SelectionKey 類的 attachment 方法得到該類自身。

如下是一個 attachment 的樣例代碼:

public class Demo {

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        SelectionKey key = channel.register(selector, 0, new Attachment());
        System.out.println(((Attachment)key.attachment()).value);

    }

    public static class Attachment {
        int value = 10;
    }
}

// output
// 10

如果在某次操作中,將 SelectKey 取消,即調(diào)用了 key.cancel() 函數(shù),就會導(dǎo)致在注冊時拋出 CancelledKeyException 異常(),如下:

public class Demo {

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        System.out.println("=========== first register =============");
        SelectionKey key = channel.register(selector, 0, new Attachment());
        System.out.println("=========== cancel key =================");
        key.cancel();
        System.out.println("=========== second register ============");
        channel.register(selector, 0, new Attachment());

    }
}
// output
// =========== first register =============
// =========== cancel key =================
// =========== second register ============
// Exception in thread "main" java.nio.channels.CancelledKeyException

原因是因為調(diào)用 key.cancel() 之后,會將 channel 置為無效,需要刷新。可以使用 selectNow 刷新,將原來的 channel 刪除掉。如下:

Selector selector = Selector.open();
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
System.out.println("=========== first register =============");
SelectionKey key = channel.register(selector, 0, new Attachment());
System.out.println("=========== cancel key =================");
key.cancel();
System.out.println("=========== select now =================");
selector.selectNow();
System.out.println("=========== second register ============");
channel.register(selector, 0, new Attachment());

如果取消掉也仍然拋出異常,則是 JDK 的 bug,直接拋出。

第三步使用了 selected 變量和一個 for 循環(huán)控制注冊,基本等效于如上的示例代碼。

doDeregister() 方法主要就是調(diào)用了 key.cancel(),略。

doBeginRead() 方法主要實現(xiàn)了切換監(jiān)聽的興趣為讀,表示開始監(jiān)聽讀事件。如下:

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    // 判斷 selectionKey 是否有效
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    // 增加讀的興趣字
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

對于 doClose() 方法,實現(xiàn)如下:

protected void doClose() throws Exception {
    ChannelPromise promise = connectPromise;
    if (promise != null) {
        // Use tryFailure() instead of setFailure() to avoid the race against cancel().
        promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
        connectPromise = null;
    }

    ScheduledFuture<?> future = connectTimeoutFuture;
    if (future != null) {
        future.cancel(false);
        connectTimeoutFuture = null;
    }
}

connectPromise 代表的是正在進行連接的操作,如果不為空,則證明連接還沒有成功,所以設(shè)定關(guān)閉已經(jīng)關(guān)閉的通道異常,最后調(diào)用 connectTimeoutFuturecancel 函數(shù)關(guān)閉。

該類添加了兩個新方法供子類改寫:

// 連接到服務(wù)器的操作
protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;

// 連接完成之后執(zhí)行的操作
protected abstract void doFinishConnect() throws Exception;

AbstractNioByteChannel

AbstractNioByteChannel 實現(xiàn)了 AbstractChanneldoWrite 方法,該方法主要的目的就是將在緩沖區(qū)內(nèi)的數(shù)據(jù)發(fā)送到遠端。

對于一個普通的 NIO 程序,發(fā)送數(shù)據(jù)的方法示例代碼如下:

public class Demo {

    public static void main(String[] args) throws IOException {
        SocketChannel channel = SocketChannel.open();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        byteBuffer.put(DATA);
        byteBuffer.flip();
        channel.write(byteBuffer);
    }

    public static final byte DATA = '0';

}

以上代碼會將 '0' 作為數(shù)據(jù)發(fā)送至遠程端。

對于 netty 來說,他的實現(xiàn)代碼如下:

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = config().getWriteSpinCount();
    do {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }
        writeSpinCount -= doWriteInternal(in, msg);
        } while (writeSpinCount > 0);

    incompleteWrite(writeSpinCount < 0);
}

ChannelOutboundBuffer 代表的是發(fā)送緩沖區(qū),可以理解為多個 ByteBuf 的一個數(shù)組。

writeSpinCount 代表的是循環(huán)發(fā)送次數(shù),可以理解為一次發(fā)送 ByteBuf 數(shù)組的多少個元素。

note

  • 設(shè)置 writeSpinCount 的主要目的是當循環(huán)發(fā)送時, I/O 線程會一直執(zhí)行寫操作, I/O 線程就不能執(zhí)行其他的操作,如果網(wǎng)絡(luò)阻塞,可能導(dǎo)致線程假死。

每次循環(huán)從 ChannelOutboundBuffer 中讀取一個對象,如果對象為空,說明所有的對象都已經(jīng)發(fā)送成功了,這個時候調(diào)用 clearOpWrite()selectionKey 的寫興趣字去掉,并直接返回。clearOpWrite 如下,主要就是取消寫的興趣字:

protected final void clearOpWrite() {
    final SelectionKey key = selectionKey();
    // Check first if the key is still valid as it may be canceled as part of the deregistration
    // from the EventLoop
    // See https://github.com/netty/netty/issues/2104
    if (!key.isValid()) {
        return;
    }
    final int interestOps = key.interestOps();
    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
    }
}

如果消息不為空且循環(huán)次數(shù)仍然大于 0,則調(diào)用 doWriteInternal() 方法發(fā)送緩沖區(qū)的數(shù)據(jù),實現(xiàn)如下:

private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (!buf.isReadable()) {
            in.remove();
            return 0;
        }

        final int localFlushedAmount = doWriteBytes(buf);
        if (localFlushedAmount > 0) {
            in.progress(localFlushedAmount);
            if (!buf.isReadable()) {
                in.remove();
            }
            return 1;
        }
    } else if (msg instanceof FileRegion) {
        //...
    } else {
            // Should not reach here.
            throw new Error();
    }
    // WRITE_STATUS_SNDBUF_FULL = 2147483647
    return WRITE_STATUS_SNDBUF_FULL;
}

如果 msg 是 ByteBuf 類型:

  • 判斷 buf 是否可讀(是否有數(shù)據(jù)),如果沒有,刪除他執(zhí)行下一次循環(huán)
  • 利用 doWriteBytes(buf) 函數(shù)發(fā)送數(shù)據(jù),返回實際上發(fā)送的字節(jié)數(shù)
  • 如果發(fā)送的字節(jié)數(shù)實際上大于 0,則發(fā)送一個通知(in.progress(localFlushedAmount)
  • 如果發(fā)送完成之后,buf 不可讀,則說明已經(jīng)發(fā)送完,進入下一循環(huán)
  • 如果發(fā)送的字節(jié)數(shù)等于 0,則可能是由于沒有發(fā)送的緩存窗口了,直接返回一個大數(shù)

note

  • in.progress() 實際上是調(diào)用了一個特殊的 Future,該 Future 會在收到消息后就給監(jiān)聽者發(fā)送通知,發(fā)送的通知為發(fā)送了多少字節(jié)的數(shù)據(jù),有點類似于進度條的感覺。
  • 監(jiān)聽者可以根據(jù)通知(進度條信息)做一些特定的事情。

回到 doWrite 函數(shù),如果沒有發(fā)送完所有數(shù)據(jù)就達到了循環(huán)發(fā)送的最高次數(shù),則會調(diào)用 incompleteWrite(writeSpinCount < 0) 函數(shù),設(shè)置 channel 的 OP_WRITE 興趣字、將發(fā)送掛在任務(wù)中等操作。代碼如下:

protected final void incompleteWrite(boolean setOpWrite) {
    // Did not write completely.
    if (setOpWrite) {
        setOpWrite();
    } else {
        // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
        // use our write quantum. In this case we no longer want to set the write OP because the socket is still
        // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
        // and set the write OP if necessary.
        clearOpWrite();

        // Schedule flush again later so other tasks can be picked up in the meantime
        eventLoop().execute(flushTask);
    }
}

經(jīng)過分析,參數(shù) setOpWirite 只在 doWriteInternal 函數(shù)返回 WRITE_STATUS_SNDBUF_FULL 時才會為 true,說明出現(xiàn)了 send buff full,不可寫,則設(shè)置寫監(jiān)聽的套接字(setOpWrite()),直到可寫。其他情況下,可以繼續(xù)發(fā)送,但是為了給其他任務(wù)執(zhí)行的機會,就將其加入到線程池的 execute 隊列中,待稍后執(zhí)行。

note

  • eventLoop().execute(flushTask) 可以等效為 executor.execute(() -> doWrite(in)), 其中 executor 就是一個普通的線程池。即一段時間后繼續(xù)執(zhí)行發(fā)送函數(shù)。

除了 doWriteBytes()需要子類實現(xiàn),該類也增加其他函數(shù)供子類實現(xiàn):

protected abstract int doReadBytes(ByteBuf buf) throws Exception;

NioSocketChannel

該類是客戶端 Channel 的最終實現(xiàn)類。

主要實現(xiàn)了 doReadBytes、doConnect、doBinddoFinishConnect、doDisconnectdoClose、doWriteBytes 等方法。

doBind 方法實現(xiàn)如下:

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    doBind0(localAddress);
}
private void doBind0(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        SocketUtils.bind(javaChannel(), localAddress);
    } else {
        SocketUtils.bind(javaChannel().socket(), localAddress);
    }
}
public static void bind(final SocketChannel socketChannel, final SocketAddress address) throws IOException {
    try {
        AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
            @Override
            public Void run() throws IOException {
                socketChannel.bind(address);
                return null;
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}

最終是調(diào)用的 JDK 的 channel.bind(InetAddress) 方法,略。

doConnect 的實現(xiàn)如下:

@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        // 等效于 channel.connect(remoteAddress)
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

對于一個非阻塞的 channel(AbstractChannel 在初始化時就將 cannel 設(shè)置成了非阻塞),如果調(diào)用 channel.connect() 方法,其返回值有三種情況:

  • 如果連接立即建立,則返回 true
  • 如果連接暫時沒有建立(服務(wù)端沒有返回 ACK 應(yīng)答等,連接結(jié)果不確定),則返回 false
  • 如果連接失敗,直接拋出 I/O 異常

對于連接操作,第一步是綁定端口,即第一個 if 語句。

接下來連接至遠端,即 SocketUtils.connect(javaChannel(), remoteAddress)。

如果返回 true,表明連接成功,返回成功。

如果返回 false,表明還未成功,在該 channel 中加入連接完成的興趣字。

如果拋出了異常,會執(zhí)行 finally 語句的 doClose 方法,關(guān)閉連接。

note

  • 如果對興趣字增加了連接完成的監(jiān)聽
  • 如果 selector 返回,表示連接通道連接就緒或者發(fā)生了錯誤。要使用finishConnect判斷下,如果連接失敗,會拋出異常

一般來說,NIO 處理就緒 OP_CONNECT 的通用代碼如下:

if (key.isValid() && key.isConnectable()) {
    SocketChannel ch = (SocketChannel) key.channel();
    if (ch.finishConnect()) {
        // Connect successfully
        // key.interestOps(SelectionKey.OP_READ);
} else {
    // Connect failed
}

doFinishConnectdoDiconnect、doClosedoWriteBytes、doReadBytes 如下:

// 與 channel 原生的 finishConnect 不同,這直接拋出異常
@Override
protected void doFinishConnect() throws Exception {
    if (!javaChannel().finishConnect()) {
        throw new Error();
    }
}

@Override
protected void doDisconnect() throws Exception {
    doClose();
}

@Override
protected void doClose() throws Exception {
    super.doClose();
    javaChannel().close();
}

@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    // 直接向 channel 寫數(shù)據(jù)
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    // 設(shè)置一個足夠大的緩沖區(qū)讀數(shù)據(jù)
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    // 讀數(shù)據(jù),并將數(shù)據(jù)保存在 byteBuf 中
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

對于 服務(wù)器端的 channel 來說,還有 AbstractNioMessageChannelNioServerSocketChannel 兩個類,暫時跳過。

==綜上,對于 Channel 簇來說,除了由 Channel 接口提供的函數(shù),交由 pipeline 實現(xiàn)之外,還提供了各種 doXXX 命名的幫助函數(shù)。==


Unsafe 簇


Unsafe 類是 Channel 的內(nèi)部類,只允許非用戶線程訪問。該接口的定義如下:

interface Unsafe {
    // 主要在讀數(shù)據(jù)時生成 ByteBuf
    RecvByteBufAllocator.Handle recvBufAllocHandle();

    SocketAddress localAddress();

    SocketAddress remoteAddress();

    // 在某個 eventLoop (線程)上注冊該 channel 的 promise,一旦完成,通知 ChannelFuture
    void register(EventLoop eventLoop, ChannelPromise promise);

    void bind(SocketAddress localAddress, ChannelPromise promise);

    void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

    void disconnect(ChannelPromise promise);

    void close(ChannelPromise promise);

    void closeForcibly();

    void deregister(ChannelPromise promise);

    // 計劃一個讀取操作,該操作將填充Channel管道中第一個ChannelInbindingHandler的入站緩沖區(qū)。如果已有掛起的讀取操作,則此方法不執(zhí)行任何操作。
    void beginRead();

    void write(Object msg, ChannelPromise promise);

    void flush();

    // 一個占位的 promise
    ChannelPromise voidPromise();

    // 返回發(fā)送緩沖
    ChannelOutboundBuffer outboundBuffer();
}

AbstractUnsafe

AbstractUnsafe 是所有 Unsafe 類的抽象類,他定義了如下實用方法:

// 是否被注冊并且在當前 eventLoop 中
private void assertEventLoop() {
    assert !registered || eventLoop.inEventLoop();
}

先看 register 方法,如下:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

首先對當前狀態(tài)進行一些判斷

  • 如果已經(jīng)注冊,拋出異常
  • 如果不兼容,拋出異常

最后,判斷是否是當前線程在執(zhí)行 eventLoop

  • 是的話,就在當前線程注冊
  • 否則,將注冊封裝到一個 task 中,交由線程池運行

以下是注冊實現(xiàn)的主要邏輯,如下:

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

該函數(shù)的主要邏輯是調(diào)用 Channel 提供的 doRegister 方法,將 Channel 注冊到 selector 中。

如果 doRegister 沒有拋出異常,則說明注冊到了 selector 上,則將 ChannelPromise 的狀態(tài)設(shè)置為 success

設(shè)置成功后,向 pipeline 發(fā)送 fireChannelRegistered() 事件,供上層處理。

然后,判斷當前 Channel 的狀態(tài),即 isActive 方法。該方法會在 Channel 是打開并且已經(jīng)連接的情況下返回 true。

在激活狀態(tài)下,如果是第一次注冊,即向 pipline 發(fā)送一個 fireChannelActive 事件。

如果不是第一次注冊,說明之前有注冊過,如果是自動讀取的配置,則會進行讀?。?code>beginRead() 方法)。

beginRead 方法主要是調(diào)用 doBeginRead 方法設(shè)置讀的感興趣字。

write 方法主要是用于向發(fā)送緩沖中增加元素。

@Override
public final void write(Object msg, ChannelPromise promise) {
    // ...
    outboundBuffer.addMessage(msg, size, promise);
}

flush 方法主要是將發(fā)送緩沖的數(shù)據(jù)發(fā)送出去,代碼如下:

public final void flush() {
    // ...
    // 標識之前的數(shù)據(jù)都需要發(fā)送
    outboundBuffer.addFlush();
    // 發(fā)送核心邏輯
    flush0();
}
protected void flush0() {
    if (inFlush0) {
        // Avoid re-entrance
        return;
    }
    //...
    inFlush0 = true;

    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
        // ...
    }

    try {
        // 調(diào)用 Channel 的 doWrite 發(fā)送數(shù)據(jù)
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        // ...
    } finally {
        inFlush0 = false;
    }
}

AbstractNioUnsafe

AbstractNioUnsafe 主要實現(xiàn)了 connect 方法,如下:

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    // ...

    try {
        // 1. 
        if (connectPromise != null) {
            // Already a connect in process.
            throw new ConnectionPendingException();
        }

        boolean wasActive = isActive();
        // 2.
        if (doConnect(remoteAddress, localAddress)) {
            // 3.
            fulfillConnectPromise(promise, wasActive);
        } else {
            // 4. 
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }

            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }    
}

主要分成了 4 個步驟。

  • 判斷是否已經(jīng)存在正在連接的操作,如果存在,拋出異常
  • 調(diào)用 doConnect 進行連接,如果成功,返回 true,如果沒有立即獲得鏈接,返回 false
  • 如果立即獲得了鏈接,調(diào)用 fullfilConnectionPromise 方法。該方法的主要目的就是將 promise 的狀態(tài)設(shè)置為 sueccess,同時,如果是第一次激活,發(fā)出 fireChannelActive 事件
  • 如果沒有立即連接成功,會將等待的任務(wù)加到事件監(jiān)聽中

對于步驟四
首先構(gòu)建了一個超時處理的任務(wù),如果到 time out 的時間,這個任務(wù)都沒有被取消,說明超超時了,就會執(zhí)行超時之后的邏輯。如下:

Runnable runnable = new Runnable() {
    @Override
    public void run() {
        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
        ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
        if (connectPromise != null && connectPromise.tryFailure(cause)) {
            close(voidPromise());
        }
    }
}

connectTimeoutFuture = eventLoop().schedule(runnable, , connectTimeoutMillis, TimeUnit.MILLISECONDS);

那么如果在超時之前連接成功,就需要取消該任務(wù)。netty 的做法是添加一個事件監(jiān)聽,當監(jiān)聽到連接成功時,取消該任務(wù),如下:

promise.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isCancelled()) {
            if (connectTimeoutFuture != null) {
                connectTimeoutFuture.cancel(false);
            }
            connectPromise = null;
            close(voidPromise());
        }
    });

finishConnect 代碼如下:

@Override
public final void finishConnect() {
    // Note this method is invoked by the event loop only if the connection attempt was
    // neither cancelled nor timed out.

    assert eventLoop().inEventLoop();

    try {
        boolean wasActive = isActive();
        doFinishConnect();
        fulfillConnectPromise(connectPromise, wasActive);
    } catch (Throwable t) {
        fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
    } finally {
        // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
        // See https://github.com/netty/netty/issues/1770
        if (connectTimeoutFuture != null) {
            connectTimeoutFuture.cancel(false);
        }
        connectPromise = null;
    }
}

AbstractNioByteUnsafe

AbstractNioByteUnsafe 主要實現(xiàn)了 read 方法。實現(xiàn)如下:

@Override
public final void read() {
    // ...

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // 分配一個 ByteBuf 用于接收數(shù)據(jù)
            byteBuf = allocHandle.allocate(allocator);
            // 接受數(shù)據(jù)
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            // 獲得最近添加的字節(jié)數(shù)
            if (allocHandle.lastBytesRead() <= 0) {
                // 如果字節(jié)數(shù)小于等于 0,代表什么都沒有收到,釋放緩沖區(qū)
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // 如果字節(jié)數(shù)小于 0,說明已經(jīng)沒有可讀數(shù)據(jù),將readingPending(嘗試讀)設(shè)為 false
                            readPending = false;
                }
                // 返回
                break;
            }

            // 如果有讀到數(shù)據(jù),則對讀到的數(shù)據(jù)數(shù)進行計數(shù) + 1
            allocHandle.incMessagesRead(1);
            readPending = false;
            // 觸發(fā)可讀事件
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        // 讀取完成后觸發(fā)讀完成事件
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

==綜上,Unsafe 封裝了一系列對 I/O 的操作方法,可以通過這些方法,操作 I/O。==

如:某線程可以的調(diào)用調(diào)用鏈可能如下:

public void demo() {
    ChannelPromise promise;
    EventLoop eventLoop;
    // 注冊到 selector
    unsafe.register(eventLoop, promise);
    // 連接到遠程地址
    unsafe.connect(remoteAddress, localAddress, promise);
    // 發(fā)送數(shù)據(jù)
    unsfafe.write();
    // 接受數(shù)據(jù)
    unsafe.read();
}

總結(jié)


Channel 接口和 Unsafe 接口實現(xiàn)了用于 I/O 操作的大部分方法,方便 netty 其他線程對 I/O 進行讀寫,大大降低了工作難度。Channel 操作的特點如下:

  • 所有的 Channel 接口的操作都是異步的:返回一個 Future 對象用于后期處理或者直接返回空,在函數(shù)內(nèi)部設(shè)置 Future 對象
  • 對于 NIO 的空輪詢 bug,使用重建的方式對 Selector 進行重建
  • 可打斷的發(fā)送方式,在發(fā)送一定時間后會主動退出,給其他操作運行的機會(doWrite() 方法)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容