一. AbstractChannel
1.1 構(gòu)造方法
/**
* 創(chuàng)建一個(gè)新實(shí)例。
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 創(chuàng)建
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
/**
* 創(chuàng)建一個(gè)新實(shí)例。
*/
protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
可以看出在構(gòu)造方法中,就綁定了這個(gè)通道的四個(gè)成員變量
parent,id,unsafe,pipeline。
protected ChannelId newId() {
return DefaultChannelId.newInstance();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
/**
* 由子類(lèi)來(lái)實(shí)現(xiàn),創(chuàng)建對(duì)應(yīng)的 Unsafe 類(lèi)型實(shí)例
*/
protected abstract AbstractUnsafe newUnsafe();
id和pipeline都是直接創(chuàng)建,默認(rèn)是DefaultChannelId和DefaultChannelPipeline類(lèi)型。newUnsafe()是抽樣方法,有子類(lèi)才能創(chuàng)建對(duì)應(yīng)的Unsafe類(lèi)型實(shí)例。
1.2 ChannelOutboundInvoker 接口方法
Channel還繼承了ChannelOutboundInvoker接口,也就是說(shuō)通道是可以發(fā)送出站IO操作的。
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture disconnect() {
return pipeline.disconnect();
}
@Override
public ChannelFuture close() {
return pipeline.close();
}
@Override
public ChannelFuture deregister() {
return pipeline.deregister();
}
@Override
public Channel flush() {
pipeline.flush();
return this;
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
........
你會(huì)發(fā)現(xiàn)基本上都是調(diào)用 ChannelPipeline 對(duì)應(yīng)的方法。
- 也就是說(shuō)直接調(diào)用通道
Channel的發(fā)送出站IO事件的方法,和調(diào)用管道pipeline()發(fā)送出站IO事件的方法是一樣的。- 根據(jù) DefaultChannelPipeline 的分析,我們知道這些出站
IO事件最后都會(huì)調(diào)用到該通道的Unsafe屬性對(duì)應(yīng)方法進(jìn)行處理。
1.3 抽樣方法
AbstractChannel 還有幾個(gè)需要子類(lèi)實(shí)現(xiàn)抽樣方法,由子類(lèi)提供不同的處理邏輯:
-
AbstractUnsafe newUnsafe()不同類(lèi)型的
Channel有自己特定的Unsafe類(lèi)型。 -
boolean isCompatible(EventLoop loop)判斷給定的事件輪詢(xún)器
EventLoop和當(dāng)前的通道類(lèi)型是不是兼容。每種類(lèi)型的通道Channel都有自己特定的事件輪詢(xún)器。 -
SocketAddress localAddress0()和SocketAddress remoteAddress0()通道綁定的本地地址和通道連接的遠(yuǎn)程地址。
-
void doBind(SocketAddress localAddress)進(jìn)行綁定操作,每種類(lèi)型的通道綁定處理是不一樣的。
-
void doDisconnect()進(jìn)行連接操作。
-
void doClose()進(jìn)行關(guān)閉連接操作。
-
void doBeginRead()將通道設(shè)為開(kāi)始讀操作。
-
void doWrite(ChannelOutboundBuffer in)進(jìn)行寫(xiě)操作。
AbstractChannel 真正重點(diǎn)的操作都是在 AbstractUnsafe 中實(shí)現(xiàn)的啊,下面講解 AbstractUnsafe。
二. AbstractUnsafe 類(lèi)
2.1 成員屬性
// 寫(xiě)緩沖區(qū) ChannelOutboundBuffer
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
// 用于在接收數(shù)據(jù)時(shí)分配緩存區(qū) ByteBuf
private RecvByteBufAllocator.Handle recvHandle;
// 當(dāng)前是否正在刷新數(shù)據(jù),防止重復(fù)刷新數(shù)據(jù)
private boolean inFlush0;
// 如果通道從未被注冊(cè),則為true,否則為false
private boolean neverRegistered = true;
2.2 注冊(cè) register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
// 當(dāng)前通道已經(jīng)注冊(cè),失敗,調(diào)用 promise 的setFailure方法進(jìn)行通知
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
// 這個(gè)事件輪詢(xún)器和當(dāng)前通道不兼容,失敗,調(diào)用 promise 的setFailure方法進(jìn)行通知
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
// 當(dāng)前線(xiàn)程就是通道 事件輪詢(xún)器線(xiàn)程,直接調(diào)用 register0 方法
register0(promise);
} else {
try {
// 通過(guò) eventLoop.execute 方法,
// 保證 register0 方法在通道事件輪詢(xún)器線(xiàn)程中調(diào)用
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
// 發(fā)生異常,要關(guān)閉通道,并進(jìn)行相關(guān)通知
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// 檢查通道是否仍然打開(kāi)
// 當(dāng)注冊(cè)操作在 eventLoop 線(xiàn)程之外調(diào)用的話(huà),
// 有可能這時(shí)通道被別的線(xiàn)程關(guān)閉了
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// 第一次注冊(cè)
boolean firstRegistration = neverRegistered;
// 調(diào)用 AbstractChannel 的 doRegister 方法,進(jìn)行注冊(cè)操作
doRegister();
neverRegistered = false;
// 設(shè)置 AbstractChannel 的 registered 成員屬性,表示已經(jīng)注冊(cè)
registered = true;
// 確保在通道未注冊(cè)前添加到管道上的 ChannelHandler 的 handlerAdded(…) 也會(huì)被調(diào)用
// 這是必需的,因?yàn)橛脩?hù)可能已經(jīng)通過(guò)ChannelFutureListener中的管道觸發(fā)了事件。
pipeline.invokeHandlerAddedIfNeeded();
// 注冊(cè)成功的通知
safeSetSuccess(promise);
// 發(fā)送注冊(cè) 入站IO事件
pipeline.fireChannelRegistered();
// 只有在通道從未注冊(cè)的情況下才觸發(fā) channelActive 事件。
// 這可以防止在通道被取消注冊(cè)和重新注冊(cè)時(shí)觸發(fā)多個(gè)通道 channelActive 事件。
if (isActive()) {
if (firstRegistration) {
// 第一次注冊(cè)時(shí),才會(huì)發(fā)送 channelActive 事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 此通道在注冊(cè)之前,設(shè)置了 autoRead()。
// 這意味著我們需要重新設(shè)置開(kāi)始讀取操作,以便介紹入站數(shù)據(jù)。
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// 發(fā)生異常,要關(guān)閉通道,并進(jìn)行相關(guān)通知
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
將通道注冊(cè)到事件輪詢(xún)器EventLoop 上:
- 如果當(dāng)前通道已經(jīng)注冊(cè),或者當(dāng)前通道和事件輪詢(xún)器不兼容,那么注冊(cè)失敗,調(diào)用
promise的setFailure方法進(jìn)行通知。- 保證在事件輪詢(xún)器線(xiàn)程調(diào)用實(shí)際注冊(cè)
register0方法。- 調(diào)用
AbstractChannel的doRegister方法,進(jìn)行注冊(cè)操作,發(fā)送注冊(cè)事件。- 如果通道已活躍,第一次注冊(cè)的時(shí)候,就會(huì)發(fā)送
channelActive事件;- 如果不是,那么就可能設(shè)置開(kāi)始讀的操作。
- 如果這期間發(fā)生異常,就關(guān)閉通道,并進(jìn)行相關(guān)通知。
2.3 取消注冊(cè) deregister
@Override
public final void deregister(final ChannelPromise promise) {
assertEventLoop();
deregister(promise, false);
}
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
// 如果 promise不是 不可取消的,那么直接返回
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
// 當(dāng)前通道沒(méi)有注冊(cè),那么也表示取消注冊(cè)成功,進(jìn)行成功通知
safeSetSuccess(promise);
return;
}
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
// we need to ensure we do the actual deregister operation later. This is needed as for example,
// we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
// the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
// the deregister operation this could lead to have a handler invoked by different EventLoop and so
// threads.
// See:
// https://github.com/netty/netty/issues/4435
// 通過(guò) invokeLater 方法,將 doDeregister() 方法放在下一個(gè)事件輪詢(xún)周期進(jìn)行
invokeLater(new Runnable() {
@Override
public void run() {
try {
// 調(diào)用 AbstractChannel 的 doDeregister 方法,進(jìn)行取消注冊(cè)操作
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive();
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (registered) {
// 如果通道之前是注冊(cè)成功了,
// 這里才發(fā)送取消注冊(cè)的 IO 事件
registered = false;
pipeline.fireChannelUnregistered();
}
// 取消綁定成功通知
safeSetSuccess(promise);
}
}
});
}
重點(diǎn)就是調(diào)用
AbstractChannel的doDeregister方法,進(jìn)行取消注冊(cè)操作。
如果fireChannelInactive == true,將發(fā)送ChannelInactive事件。
2.4 綁定 bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
// 檢查通道是否仍然打開(kāi)
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
// 調(diào)用 AbstractChannel 的 doBind 方法,進(jìn)行綁定操作
doBind(localAddress);
} catch (Throwable t) {
// 綁定失敗的通知
safeSetFailure(promise, t);
// doBind(localAddress) 方法有可能關(guān)閉這個(gè)通道,
// 就可能需要進(jìn)行關(guān)閉通道的通知
closeIfClosed();
return;
}
// 如果綁定操作后,通道從不活躍變成活躍,就要發(fā)送 ChannelActive 事件
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
// 綁定成功的通知
safeSetSuccess(promise);
}
- 這個(gè)方法邏輯比較簡(jiǎn)單,重點(diǎn)就是調(diào)用
AbstractChannel的doBind方法,進(jìn)行綁定操作。- 如果綁定操作后,通道從不活躍變成活躍,就要發(fā)送
ChannelActive事件。
2.5 取消連接 disconnect
@Override
public final void disconnect(final ChannelPromise promise) {
assertEventLoop();
// 如果 promise不是 不可取消的,那么直接返回
if (!promise.setUncancellable()) {
return;
}
boolean wasActive = isActive();
try {
doDisconnect();
// 重置 remoteAddress and localAddress
remoteAddress = null;
localAddress = null;
} catch (Throwable t) {
safeSetFailure(promise, t);
// doDisconnect() 方法有可能關(guān)閉這個(gè)通道,
// 就可能需要進(jìn)行關(guān)閉通道的通知
closeIfClosed();
return;
}
// 如果取消連接后,通道從活躍變成不活躍,就要發(fā)送 ChannelInactive 事件
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
safeSetSuccess(promise);
// doDisconnect() 方法有可能關(guān)閉這個(gè)通道,
// 就可能需要進(jìn)行關(guān)閉通道的通知
closeIfClosed();
}
- 這個(gè)方法邏輯比較簡(jiǎn)單,重點(diǎn)就是調(diào)用
AbstractChannel的doDisconnect()方法,進(jìn)行取消連接操作。- 如果取消連接操作成功后,通道從活躍變成不活躍,就要發(fā)送
ChannelInactive事件。
2.6關(guān)閉 close
public void close(final ChannelPromise promise) {
assertEventLoop();
ClosedChannelException closedChannelException =
StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
close(promise, closedChannelException, closedChannelException, false);
}
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
// 如果 promise不是 不可取消的,那么直接返回
if (!promise.setUncancellable()) {
return;
}
if (closeInitiated) {
// closeInitiated == true,已經(jīng)調(diào)用過(guò)關(guān)閉操作了,就要return 返回了。
if (closeFuture.isDone()) {
// 已經(jīng)通道已經(jīng)關(guān)閉了,通知 promise
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// 當(dāng)前通道正在關(guān)閉,那么就添加一個(gè)監(jiān)聽(tīng)器,當(dāng)關(guān)閉成功后,再通知 promise
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}
// 保證關(guān)閉方法只調(diào)用一次,不能重復(fù)調(diào)用
closeInitiated = true;
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 禁止再向?qū)懢彌_區(qū) outboundBuffer 添加任何消息和刷新操作。
this.outboundBuffer = null;
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the close.
doClose0(promise);
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// 使寫(xiě)緩沖區(qū)中所有排隊(duì)的消息失敗
outboundBuffer.failFlushed(cause, notify);
// 關(guān)閉寫(xiě)緩沖區(qū)
outboundBuffer.close(closeCause);
}
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// 使寫(xiě)緩沖區(qū)中所有排隊(duì)的消息失敗
outboundBuffer.failFlushed(cause, notify);
// 關(guān)閉寫(xiě)緩沖區(qū)
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
// 如果正在刷新操作,那么就讓 fireChannelInactiveAndDeregister 操作,
// 放到下一個(gè)事件輪詢(xún)周期中處理
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
// 取消注冊(cè)和可能發(fā)送 ChannelInactive 事件
fireChannelInactiveAndDeregister(wasActive);
}
}
}
private void doClose0(ChannelPromise promise) {
try {
doClose();
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
deregister(voidPromise(), wasActive && !isActive());
}
方法流程:
- 通過(guò)
closeInitiated成員屬性保證關(guān)閉方法只調(diào)用一次,不能重復(fù)調(diào)用。 - 因?yàn)殛P(guān)閉連接,需要考慮寫(xiě)緩沖區(qū)
ChannelOutboundBuffer中的待寫(xiě)入數(shù)據(jù)的問(wèn)題。 - 通過(guò)
prepareToClose()方法,返回一個(gè)關(guān)閉通道的事件執(zhí)行器。- 如果不為空,那么就在這個(gè)事件執(zhí)行器中進(jìn)行接下來(lái)的關(guān)閉操作。
- 如果為空,那么就在當(dāng)前線(xiàn)程進(jìn)行接下來(lái)的關(guān)閉操作。
- 調(diào)用
doClose0(promise)方法,進(jìn)行關(guān)閉以及操作成功或失敗的相關(guān)通知。 - 處理寫(xiě)緩沖區(qū)
outboundBuffer中的數(shù)據(jù),并關(guān)閉寫(xiě)緩沖區(qū)。 - 最后調(diào)用
fireChannelInactiveAndDeregister方法,取消管道注冊(cè),以及可能會(huì)發(fā)送ChannelInactive事件。如果在
doClose()方法之后,通道從活躍變成不活躍的情況下,才會(huì)發(fā)送ChannelInactive事件。
2.7 shutdownOutput
@UnstableApi
public final void shutdownOutput(final ChannelPromise promise) {
assertEventLoop();
shutdownOutput(promise, null);
}
/**
* 關(guān)閉相應(yīng)通道的輸出部分。
* 例如,這將清理ChannelOutboundBuffer并不再允許任何寫(xiě)操作。
*/
private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
if (!promise.setUncancellable()) {
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
promise.setFailure(new ClosedChannelException());
return;
}
// 禁止再向?qū)懢彌_區(qū) outboundBuffer 添加任何消息和刷新操作。
this.outboundBuffer = null;
final Throwable shutdownCause = cause == null ?
new ChannelOutputShutdownException("Channel output shutdown") :
new ChannelOutputShutdownException("Channel output shutdown", cause);
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 調(diào)用 AbstractChannel 的 doShutdownOutput 方法,
// 進(jìn)行 shutdown 操作
doShutdownOutput();
// 操作成功通知
promise.setSuccess();
// 操作失敗通知
} catch (Throwable err) {
promise.setFailure(err);
} finally {
// Dispatch to the EventLoop
eventLoop().execute(new Runnable() {
@Override
public void run() {
// 在 Shutdown 的時(shí)候,關(guān)閉寫(xiě)緩沖區(qū) ChannelOutboundBuffer,
// 并發(fā)送用戶(hù)通知事件
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
}
});
}
}
});
} else {
try {
// 調(diào)用 AbstractChannel 的 doShutdownOutput 方法,
// 進(jìn)行 shutdown 操作
doShutdownOutput();
// 操作成功通知
promise.setSuccess();
} catch (Throwable err) {
// 操作失敗通知
promise.setFailure(err);
} finally {
// 在 Shutdown 的時(shí)候,關(guān)閉寫(xiě)緩沖區(qū) ChannelOutboundBuffer,
// 并發(fā)送用戶(hù)通知事件
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
}
}
}
/**
* 在 Shutdown 的時(shí)候,關(guān)閉寫(xiě)緩沖區(qū) ChannelOutboundBuffer,
* 并發(fā)送用戶(hù)通知事件
*/
private void closeOutboundBufferForShutdown(
ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
// 使寫(xiě)緩沖區(qū)中所有排隊(duì)的消息失敗
buffer.failFlushed(cause, false);
// 關(guān)閉寫(xiě)緩沖區(qū)
buffer.close(cause, true);
// 發(fā)送一個(gè)通道 Shutdown 的用戶(hù)通知事件
pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
}
shutdown 的方法流程和 close 很像,區(qū)別點(diǎn):
shutdown是調(diào)用AbstractChannel的doShutdownOutput()方法進(jìn)行相關(guān)操作,而close是調(diào)用AbstractChannel的doClose()方法。close最后會(huì)取消注冊(cè),以及可能會(huì)發(fā)送ChannelInactive事件。- 而
shutdown會(huì)發(fā)送一個(gè)ChannelOutputShutdownEvent.INSTANCE用戶(hù)自定義的通知事件。
2.8 強(qiáng)制關(guān)閉 closeForcibly
@Override
public final void closeForcibly() {
assertEventLoop();
try {
doClose();
} catch (Exception e) {
logger.warn("Failed to close a channel.", e);
}
}
你會(huì)發(fā)現(xiàn)只調(diào)用了
AbstractChannel的doClose()方法進(jìn)行關(guān)閉操作,不觸發(fā)任何事件,也不處理寫(xiě)緩沖區(qū)。只可能在某些特殊情況下調(diào)用,例如嘗試注冊(cè)失敗的時(shí)候。
2.9 開(kāi)始讀 beginRead
@Override
public final void beginRead() {
assertEventLoop();
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
調(diào)用
AbstractChannel的doBeginRead()方法設(shè)置通道開(kāi)始讀取數(shù)據(jù)。
2.10 寫(xiě)操作 write
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// 寫(xiě)緩沖區(qū)為 null,
try {
// 現(xiàn)在釋放資源,以防止資源泄漏
ReferenceCountUtil.release(msg);
} finally {
// 如果outboundBuffer為空,我們就知道通道被關(guān)閉了,所以立即進(jìn)行失敗通知。
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise,
newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
}
return;
}
int size;
try {
// 進(jìn)行消息的轉(zhuǎn)換,例如將堆緩沖區(qū)變成直接緩沖區(qū)
msg = filterOutboundMessage(msg);
// 估算數(shù)據(jù)的大小
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
try {
// 失敗時(shí)需要釋放資源,以防止資源泄漏
ReferenceCountUtil.release(msg);
} finally {
// 進(jìn)行操作失敗的通知
safeSetFailure(promise, t);
}
return;
}
// 將數(shù)據(jù)添加到寫(xiě)緩沖區(qū) outboundBuffer 中
outboundBuffer.addMessage(msg, size, promise);
}
方法流程
- 先判斷寫(xiě)緩沖區(qū)
outboundBuffer是不是為null,為空說(shuō)明通道已關(guān)閉,進(jìn)行失敗通知。 - 通過(guò)
filterOutboundMessage(msg)方法進(jìn)行數(shù)據(jù)轉(zhuǎn)換,例如將堆緩沖區(qū)變成直接緩沖區(qū)。 - 估算數(shù)據(jù)大小。
- 通過(guò)
outboundBuffer.addMessage(...)方法,將數(shù)據(jù)添加到寫(xiě)緩沖區(qū)outboundBuffer中。 - 如果發(fā)送異常,記得釋放數(shù)據(jù)
msg的引用,防止內(nèi)存泄露,并進(jìn)行操作失敗通知。
2.11 刷新 flush
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 寫(xiě)緩沖區(qū)為空,直接返回
if (outboundBuffer == null) {
return;
}
// 將寫(xiě)緩沖區(qū)中的消息都標(biāo)記成待刷新
outboundBuffer.addFlush();
// 進(jìn)行刷新操作
flush0();
}
@SuppressWarnings("deprecation")
protected void flush0() {
if (inFlush0) {
// 避免重復(fù)刷新
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 當(dāng)前寫(xiě)緩沖區(qū)沒(méi)有數(shù)據(jù),那么直接返回
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
// 避免重復(fù)刷新
inFlush0 = true;
// 如果通道處于非活動(dòng)狀態(tài),則將所有掛起的寫(xiě)請(qǐng)求標(biāo)記為失敗。
if (!isActive()) {
try {
// Check if we need to generate the exception at all.
if (!outboundBuffer.isEmpty()) {
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
}
}
} finally {
// 刷新操作完成,將 inFlush0重新設(shè)置為 false,以便下次刷新。
inFlush0 = false;
}
return;
}
try {
// 將給定緩沖區(qū)的內(nèi)容刷新到遠(yuǎn)端
doWrite(outboundBuffer);
} catch (Throwable t) {
handleWriteError(t);
} finally {
// 刷新操作完成,將 inFlush0重新設(shè)置為 false,以便下次刷新。
inFlush0 = false;
}
}
- 通過(guò)
inFlush0成員屬性,來(lái)避免重復(fù)刷新。- 如果通道處于非活動(dòng)狀態(tài),則將所有掛起的寫(xiě)請(qǐng)求標(biāo)記為失敗。
- 通過(guò)
AbstractChannel的doWrite(outboundBuffer)方法,將緩沖區(qū)的內(nèi)容刷新到遠(yuǎn)端。
2.12 小結(jié)
對(duì)比 Unsafe 的方法,你會(huì)發(fā)現(xiàn) AbstractUnsafe 中沒(méi)有實(shí)現(xiàn) connect(...) 連接方法。
對(duì)比發(fā)送入站IO事件:
-
ChannelRegistered和ChannelUnregistered-
register方法會(huì)發(fā)送ChannelRegistered事件。 -
deregister方法只有在通道之前已經(jīng)注冊(cè)之后,才會(huì)發(fā)送ChannelUnregistered事件。
-
-
ChannelActive和ChannelInactive- 一般都是通道
Channel從不活躍變成活躍,要發(fā)送ChannelActive事件;可能引起這個(gè)變化的操作有bind和connect操作。 - 通道
Channel從活躍變成不活躍,就要發(fā)送ChannelInactive事件;可能引起這個(gè)變化的操作有disconnect,close和shutdown。 - 最后如果第一次注冊(cè)時(shí),且當(dāng)前通道是活躍狀態(tài),也會(huì)發(fā)送
ChannelActive事件。
- 一般都是通道
三. ChannelOutboundBuffer
在 AbstractChannel.Unsafe 中看到用戶(hù)調(diào)用write(...) 方法寫(xiě)的數(shù)據(jù),會(huì)先添加到寫(xiě)緩沖區(qū) ChannelOutboundBuffer 中,然后調(diào)用 flush() 方法,才將寫(xiě)緩沖區(qū)中的數(shù)據(jù)發(fā)送到遠(yuǎn)端。
3.1 重要成員屬性
// 在鏈表結(jié)構(gòu)中第一個(gè)被刷新的節(jié)點(diǎn)
private Entry flushedEntry;
// 在鏈表結(jié)構(gòu)中第一個(gè)未刷新的節(jié)點(diǎn)
private Entry unflushedEntry;
// 表示鏈表中最后一個(gè)節(jié)點(diǎn)
private Entry tailEntry;
// 等待刷新節(jié)點(diǎn)的數(shù)量
private int flushed;
寫(xiě)緩沖區(qū)通過(guò)鏈表來(lái)儲(chǔ)存數(shù)據(jù)(依靠 Entry.next 來(lái)實(shí)現(xiàn)鏈表),鏈表形式 Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
flushedEntry表示第一個(gè)被刷新的節(jié)點(diǎn),在鏈表頭,當(dāng)然也是通過(guò)addFlush()方法設(shè)置的。unflushedEntry表示第一個(gè)未刷新的節(jié)點(diǎn),表示還沒(méi)有被標(biāo)記刷新的第一個(gè)節(jié)點(diǎn)。tailEntry最后一個(gè)節(jié)點(diǎn)。flushed刷新節(jié)點(diǎn)的數(shù)量,這個(gè)屬性很重要,靠它來(lái)標(biāo)記刷新節(jié)點(diǎn),也就是說(shuō)從flushedEntry開(kāi)始,flushed數(shù)量的節(jié)點(diǎn)都被標(biāo)記為刷新節(jié)點(diǎn)了。
3.2 重要方法
3.2.1 添加數(shù)據(jù)
這個(gè)方法一般在 AbstractChannel.AbstractUnsafe 的 write(...) 方法中調(diào)用。
/**
* 將給定的消息 msg 添加到ChannelOutboundBuffer中。
* 一旦消息寫(xiě)入,給定的ChannelPromise將被通知。
*/
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 將給定消息封裝成一個(gè)節(jié)點(diǎn)
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
// 將新消息節(jié)點(diǎn)添加到隊(duì)列尾
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
// 如果未刷新節(jié)點(diǎn)為空,說(shuō)明隊(duì)列節(jié)點(diǎn)都變成刷新節(jié)點(diǎn)了,
// 那么這個(gè)新添加的節(jié)點(diǎn),就是未刷新節(jié)點(diǎn)的頭了。
unflushedEntry = entry;
}
// See https://github.com/netty/netty/issues/1619
// 向未刷新的數(shù)組添加消息后,增加掛起的字節(jié)數(shù)。
incrementPendingOutboundBytes(entry.pendingSize, false);
}
- 先將數(shù)據(jù)
msg封裝成一個(gè)節(jié)點(diǎn)entry,并將節(jié)點(diǎn)添加到鏈表尾。- 如果
unflushedEntry是null,那么這個(gè)節(jié)點(diǎn)就是第一個(gè)未刷新節(jié)點(diǎn)。incrementPendingOutboundBytes(...)方法,增加掛起的字節(jié)數(shù),看是否需要改變通道的 可寫(xiě)屬性。
3.2.2 標(biāo)記刷新
這個(gè)方法一般在 AbstractChannel.AbstractUnsafe 的 flush() 方法中調(diào)用。
/**
* 向此ChannelOutboundBuffer添加刷新。
* 這意味著所有以前添加的消息都被標(biāo)記為刷新,因此您將能夠處理它們。
*/
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
// 未刷新節(jié)點(diǎn)后面的鏈表示新添加的節(jié)點(diǎn)列表,都是要加入到刷新中
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
// 將所有要刷新的節(jié)點(diǎn)變成不可取消的
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
// 掛起消息被取消,所以確保我們釋放內(nèi)存并通知釋放的字節(jié)
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// 節(jié)點(diǎn)都變成已刷新的了,未刷新節(jié)點(diǎn)就設(shè)置為 null
unflushedEntry = null;
}
}
- 將從
unflushedEntry未刷新節(jié)點(diǎn)開(kāi)始到鏈表尾的所有節(jié)點(diǎn)都標(biāo)記為刷新。通過(guò)flushed++來(lái)增加刷新節(jié)點(diǎn)數(shù)量。- 調(diào)用
setUncancellable(...)要寫(xiě)入的節(jié)點(diǎn)是不可取消的,如果設(shè)置失敗,就要取消掛起數(shù)據(jù),并調(diào)用decrementPendingOutboundBytes(...)減少掛起字節(jié)數(shù),看是否需要改變通道的 可寫(xiě)屬性。
3.2.3 刪除節(jié)點(diǎn)
/**
* 將刪除當(dāng)前消息,將其ChannelPromise標(biāo)記為success并返回true。
* 如果在調(diào)用此方法時(shí)不存在刷新的消息,則返回false,表示沒(méi)有準(zhǔn)備好處理的消息。
*/
public boolean remove() {
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}
// recycle the entry
e.recycle();
return true;
}
private void removeEntry(Entry e) {
if (-- flushed == 0) {
// flushed == 0, 表示所有刷新節(jié)點(diǎn)都被處理了
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
// 將下一個(gè)節(jié)點(diǎn)變成刷新節(jié)點(diǎn)
flushedEntry = e.next;
}
}
當(dāng)緩存區(qū)當(dāng)前刷新節(jié)點(diǎn)數(shù)據(jù)被寫(xiě)入到遠(yuǎn)端了,那么調(diào)用這個(gè)
remove()方法,移除當(dāng)前節(jié)點(diǎn),得到下一個(gè)刷新節(jié)點(diǎn)。