基于Netty源代碼版本:netty-all-4.1.33.Final
什么是心跳機(jī)制?
心跳說的是在客戶端和服務(wù)端在互相建立ESTABLISH狀態(tài)的時(shí)候,如何通過發(fā)送一個(gè)最簡(jiǎn)單的包來保持連接的存活,還有監(jiān)控另一邊服務(wù)的可用性等。
心跳機(jī)制
- 心跳是在TCP長(zhǎng)連接中,客戶端和服務(wù)端定時(shí)向?qū)Ψ桨l(fā)送數(shù)據(jù)包通知對(duì)方自己還在線,保證連接的有效性的一種機(jī)制
- 在服務(wù)器和客戶端之間一定時(shí)間內(nèi)沒有數(shù)據(jù)交互時(shí), 即處于 idle 狀態(tài)時(shí), 客戶端或服務(wù)器會(huì)發(fā)送一個(gè)特殊的數(shù)據(jù)包給對(duì)方, 當(dāng)接收方收到這個(gè)數(shù)據(jù)報(bào)文后, 也立即發(fā)送一個(gè)特殊的數(shù)據(jù)報(bào)文, 回應(yīng)發(fā)送方, 此即一個(gè) PING-PONG 交互. 自然地, 當(dāng)某一端收到心跳消息后, 就知道了對(duì)方仍然在線, 這就確保 TCP 連接的有效性
心跳實(shí)現(xiàn)
- 使用TCP協(xié)議層的Keeplive機(jī)制,但是該機(jī)制默認(rèn)的心跳時(shí)間是2小時(shí),依賴操作系統(tǒng)實(shí)現(xiàn)不夠靈活;
- 應(yīng)用層實(shí)現(xiàn)自定義心跳機(jī)制,比如基于Netty的IdleStateHandler實(shí)現(xiàn)心跳機(jī)制;
心跳包的作用
?;?br> Q:為什么說心跳機(jī)制能保持連接的存活,它是集群中或長(zhǎng)連接中最為有效避免網(wǎng)絡(luò)中斷的一個(gè)重要的保障措施?
A:之所以說是“避免網(wǎng)絡(luò)中斷的一個(gè)重要保障措施”,原因是:我們得知公網(wǎng)IP是一個(gè)寶貴的資源,一旦某一連接長(zhǎng)時(shí)間的占用并且不發(fā)數(shù)據(jù),這怎能對(duì)得起網(wǎng)絡(luò)給此連接分配公網(wǎng)IP,這簡(jiǎn)直是對(duì)網(wǎng)絡(luò)資源最大的浪費(fèi),所以基本上所有的NAT路由器都會(huì)定時(shí)的清除那些長(zhǎng)時(shí)間沒有數(shù)據(jù)傳輸?shù)挠成浔眄?xiàng)。一是回收IP資源,二是釋放NAT路由器本身內(nèi)存的資源,這樣問題就來了,連接被從中間斷開了,雙發(fā)還都不曉得對(duì)方已經(jīng)連通不了了,還會(huì)繼續(xù)發(fā)數(shù)據(jù),這樣會(huì)有兩個(gè)結(jié)果:a) 發(fā)方會(huì)收到NAT路由器的RST包,導(dǎo)致發(fā)方知道連接已中斷;b) 發(fā)方?jīng)]有收到任何NAT的回執(zhí),NAT只是簡(jiǎn)單的drop相應(yīng)的數(shù)據(jù)包
通常我們測(cè)試得出的是第二種情況會(huì)多些,就是客戶端是不知道自己應(yīng)經(jīng)連接斷開了,所以這時(shí)候心跳就可以和NAT建立關(guān)聯(lián)了,只要我們?cè)贜AT認(rèn)為合理連接的時(shí)間內(nèi)發(fā)送心跳數(shù)據(jù)包,這樣NAT會(huì)繼續(xù)keep連接的IP映射表項(xiàng)不被移除,達(dá)到了連接不會(huì)被中斷的目的。檢測(cè)另一端服務(wù)是否可用
TCP的斷開可能有時(shí)候是不能瞬時(shí)探知的,甚至是不能探知的,也可能有很長(zhǎng)時(shí)間的延遲,如果前端沒有正常的斷開TCP連接,四次握手沒有發(fā)起,服務(wù)端無(wú)從得知客戶端的掉線,這個(gè)時(shí)候我們就需要心跳包來檢測(cè)另一端服務(wù)是否還存活可用。
基于TCP的keepalive機(jī)制實(shí)現(xiàn)
基于TCP的keepalive機(jī)制,由具體的TCP協(xié)議棧來實(shí)現(xiàn)長(zhǎng)連接的維持。如在netty中可以在創(chuàng)建channel的時(shí)候,指定SO_KEEPALIVE參數(shù)來實(shí)現(xiàn):

存在的問題:Netty只能控制SO_KEEPALIVE這個(gè)參數(shù),其他參數(shù),則需要從系統(tǒng)的sysctl中讀取,其中比較關(guān)鍵的是tcp_keepalive_time,發(fā)送心跳包檢測(cè)的時(shí)間間隔,默認(rèn)為7200s,即空閑后,每2小時(shí)檢測(cè)一次。如果客戶端在這2小時(shí)內(nèi)斷開了,那么服務(wù)端也要維護(hù)這個(gè)連接2小時(shí),浪費(fèi)服務(wù)端資源;另外就是對(duì)于需要實(shí)時(shí)傳輸數(shù)據(jù)的場(chǎng)景,客戶端斷開了,服務(wù)端也要2小時(shí)后才能發(fā)現(xiàn)。服務(wù)端發(fā)送心跳檢測(cè),具體可能出現(xiàn)的情況如下:
- (1)連接正常:客戶端仍然存在,網(wǎng)絡(luò)連接狀況良好。此時(shí)客戶端會(huì)返回一個(gè) ACK 。 服務(wù)端接收到ACK后重置計(jì)時(shí)器,在2小時(shí)后再發(fā)送探測(cè)。如果2小時(shí)內(nèi)連接上有數(shù)據(jù)傳輸,那么在該時(shí)間基礎(chǔ)上向后推延2個(gè)小時(shí);
- (2)連接斷開:客戶端異常關(guān)閉,或是網(wǎng)絡(luò)斷開。在這兩種情況下,客戶端都不會(huì)響應(yīng)。服務(wù)器沒有收到對(duì)其發(fā)出探測(cè)的響應(yīng),并且在一定時(shí)間(系統(tǒng)默認(rèn)為 1000 ms )后重復(fù)發(fā)送 keep-alive packet ,并且重復(fù)發(fā)送一定次數(shù)。
- (3)客戶端曾經(jīng)崩潰,但已經(jīng)重啟:這種情況下,服務(wù)器將會(huì)收到對(duì)其存活探測(cè)的響應(yīng),但該響應(yīng)是一個(gè)復(fù)位,從而引起服務(wù)器對(duì)連接的終止。
基于Netty的IdleStateHandler實(shí)現(xiàn)
什么是 IdleStateHandler
當(dāng)連接的空閑時(shí)間(讀或者寫)太長(zhǎng)時(shí),將會(huì)觸發(fā)一個(gè) IdleStateEvent 事件。然后,你可以通過你的 ChannelInboundHandler 中重寫 userEventTrigged 方法來處理該事件。
如何使用?
IdleStateHandler 既是出站處理器也是入站處理器,繼承了 ChannelDuplexHandler 。通常在 initChannel 方法中將 IdleStateHandler 添加到 pipeline 中。然后在自己的 handler 中重寫 userEventTriggered 方法,當(dāng)發(fā)生空閑事件(讀或者寫),就會(huì)觸發(fā)這個(gè)方法,并傳入具體事件。
這時(shí),你可以通過 Context 對(duì)象嘗試向目標(biāo) Socekt 寫入數(shù)據(jù),并設(shè)置一個(gè) 監(jiān)聽器,如果發(fā)送失敗就關(guān)閉 Socket (Netty 準(zhǔn)備了一個(gè) ChannelFutureListener.CLOSE_ON_FAILURE 監(jiān)聽器用來實(shí)現(xiàn)關(guān)閉 Socket 邏輯)。
這樣,就實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的心跳服務(wù)。
先通過一段代碼來學(xué)習(xí)下IdleStateHandler的用法:
MyServerHandler:(負(fù)責(zé)監(jiān)測(cè)通道的各種狀態(tài)并處理空閑事件IdleStateEvent)
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
String eventType = null;
switch (event.state()){
case READER_IDLE:
eventType = "讀空閑";
break;
case WRITER_IDLE:
eventType = "寫空閑";
break;
case ALL_IDLE:
eventType = "讀寫空閑";
break;
}
System.out.println(ctx.channel().remoteAddress() + "超時(shí)事件:" + eventType);
ctx.channel().close();
}else{
super.userEventTriggered(ctx, evt);
}
}
}
服務(wù)器代碼:
public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(20,40,60, TimeUnit.SECONDS));
pipeline.addLast(new MyServerHandler());
}
}
客戶端代碼:
public class MyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new MyClientInitializer());
Channel channel = bootstrap.connect("localhost", 8899).sync().channel();
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
for(;;){
channel.writeAndFlush(reader.readLine() + "\r\n");
}
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyClientHandler());
}
}
public class MyClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
}
源碼分析
構(gòu)造方法
IdleStateHandler構(gòu)造器
- readerIdleTime讀空閑超時(shí)時(shí)間設(shè)定,如果channelRead()方法超過readerIdleTime時(shí)間未被調(diào)用則會(huì)觸發(fā)超時(shí)事件調(diào)用userEventTrigger()方法;
- writerIdleTime寫空閑超時(shí)時(shí)間設(shè)定,如果write()方法超過writerIdleTime時(shí)間未被調(diào)用則會(huì)觸發(fā)超時(shí)事件調(diào)用userEventTrigger()方法;
- allIdleTime所有類型的空閑超時(shí)時(shí)間設(shè)定,包括讀空閑和寫空閑;
- unit時(shí)間單位,包括時(shí)分秒等;
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
該類有 3 個(gè)構(gòu)造方法,主要對(duì)一下 4 個(gè)屬性賦值:
private final boolean observeOutput;// 是否考慮出站時(shí)較慢的情況。默認(rèn)值是false(不考慮)。
private final long readerIdleTimeNanos; // 讀事件空閑時(shí)間,0 則禁用事件
private final long writerIdleTimeNanos;// 寫事件空閑時(shí)間,0 則禁用事件
private final long allIdleTimeNanos; //讀或?qū)懣臻e時(shí)間,0 則禁用事件
可以分別控制讀,寫,讀寫超時(shí)的時(shí)間,單位為秒,如果是0表示不檢測(cè),所以如果全是0,則相當(dāng)于沒添加這個(gè)IdleStateHandler,連接是個(gè)普通的短連接。
handlerAdded 方法
IdleStateHandler是在創(chuàng)建IdleStateHandler實(shí)例并添加到ChannelPipeline時(shí)添加定時(shí)任務(wù)來進(jìn)行定時(shí)檢測(cè)的,具體在initialize(ctx)方法實(shí)現(xiàn);同時(shí)在從ChannelPipeline移除或Channel關(guān)閉時(shí),移除這個(gè)定時(shí)檢測(cè),具體在destroy()實(shí)現(xiàn)
public class IdleStateHandler extends ChannelDuplexHandler {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
// channelActive() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead.
initialize(ctx);
} else {
// channelActive() event has not been fired yet. this.channelActive() will be invoked
// and initialization will occur there.
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
destroy();
}
}
initialize
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
// 這里的 schedule 方法會(huì)調(diào)用 eventLoop 的 schedule 方法,將定時(shí)任務(wù)添加進(jìn)隊(duì)列中
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
只要給定的參數(shù)大于0,就創(chuàng)建一個(gè)定時(shí)任務(wù),每個(gè)事件都創(chuàng)建。同時(shí),將 state 狀態(tài)設(shè)置為 1,防止重復(fù)初始化。調(diào)用 initOutputChanged 方法,初始化 “監(jiān)控出站數(shù)據(jù)屬性”,代碼如下:
private void initOutputChanged(ChannelHandlerContext ctx) {
if (observeOutput) {
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
// 記錄了出站緩沖區(qū)相關(guān)的數(shù)據(jù),buf 對(duì)象的 hash 碼,和 buf 的剩余緩沖字節(jié)數(shù)
if (buf != null) {
lastMessageHashCode = System.identityHashCode(buf.current());
lastPendingWriteBytes = buf.totalPendingWriteBytes();
}
}
}
讀事件的 run 方法
代碼如下:
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
// 用于取消任務(wù) promise
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
// 再次提交任務(wù)
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
// 觸發(fā)用戶 handler use
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
nextDelay的初始化值為超時(shí)秒數(shù)readerIdleTimeNanos,如果檢測(cè)的時(shí)候沒有正在讀,且計(jì)算多久沒讀了:nextDelay -= 當(dāng)前時(shí)間 - 上次讀取時(shí)間,如果小于0,說明左邊的readerIdleTimeNanos小于空閑時(shí)間(當(dāng)前時(shí)間 - 上次讀取時(shí)間)了,則超時(shí)了
則創(chuàng)建IdleStateEvent事件,IdleState枚舉值為READER_IDLE,然后調(diào)用channelIdle方法分發(fā)給下一個(gè)ChannelInboundHandler,通常由用戶自定義一個(gè)ChannelInboundHandler來捕獲并處理
總的來說,每次讀取操作都會(huì)記錄一個(gè)時(shí)間,定時(shí)任務(wù)時(shí)間到了,會(huì)計(jì)算當(dāng)前時(shí)間和最后一次讀的時(shí)間的間隔,如果間隔超過了設(shè)置的時(shí)間,就觸發(fā) UserEventTriggered 方法。就是這么簡(jiǎn)單。
寫事件的 run 方法
寫任務(wù)的邏輯基本和讀任務(wù)的邏輯一樣,唯一不同的就是有一個(gè)針對(duì) 出站較慢數(shù)據(jù)的判斷。
private final class WriterIdleTimeoutTask extends AbstractIdleTask {
WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
if (nextDelay <= 0) {
// Writer is idle - set a new timeout and notify the callback.
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstWriterIdleEvent;
firstWriterIdleEvent = false;
try {
//如果這個(gè)方法返回 true,就不執(zhí)行觸發(fā)事件操作了,即使時(shí)間到了??纯丛摲椒▽?shí)現(xiàn):
if (hasOutputChanged(ctx, first)) {
return;
}
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
如果這個(gè)方法返回 true,就不執(zhí)行觸發(fā)事件操作了,即使時(shí)間到了??纯丛摲椒▽?shí)現(xiàn):
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
if (observeOutput) {
// We can take this shortcut if the ChannelPromises that got passed into write()
// appear to complete. It indicates "change" on message level and we simply assume
// that there's change happening on byte level. If the user doesn't observe channel
// writability events then they'll eventually OOME and there's clearly a different
// problem and idleness is least of their concerns.
// 如果最后一次寫的時(shí)間和上一次記錄的時(shí)間不一樣,說明寫操作進(jìn)行過了,則更新此值
if (lastChangeCheckTimeStamp != lastWriteTime) {
lastChangeCheckTimeStamp = lastWriteTime;
// But this applies only if it's the non-first call.
// 但如果,在這個(gè)方法的調(diào)用間隙修改的,就仍然不觸發(fā)事件
if (!first) {
return true;
}
}
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
// 如果出站區(qū)有數(shù)據(jù)
if (buf != null) {
// 拿到出站緩沖區(qū)的 對(duì)象 hashcode
int messageHashCode = System.identityHashCode(buf.current());
// 拿到這個(gè) 緩沖區(qū)的 所有字節(jié)
long pendingWriteBytes = buf.totalPendingWriteBytes();
// 如果和之前的不相等,或者字節(jié)數(shù)不同,說明,輸出有變化,將 "最后一個(gè)緩沖區(qū)引用" 和 “剩余字節(jié)數(shù)” 刷新
if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
lastMessageHashCode = messageHashCode;
lastPendingWriteBytes = pendingWriteBytes;
// 如果寫操作沒有進(jìn)行過,則任務(wù)寫的慢,不觸發(fā)空閑事件
if (!first) {
return true;
}
}
}
}
return false;
}
- 1、如果用戶沒有設(shè)置了需要觀察出站情況。就返回 false,繼續(xù)執(zhí)行事件。
- 2、反之,繼續(xù)向下, 如果最后一次寫的時(shí)間和上一次記錄的時(shí)間不一樣,說明寫操作剛剛做過了,則更新此值,但仍然需要判斷這個(gè) first 的值,如果這個(gè)值還是 false,說明在這個(gè)寫事件是在兩個(gè)方法調(diào)用間隙完成的 / 或者是第一次訪問這個(gè)方法,就仍然不觸發(fā)事件。
- 3、如果不滿足上面的條件,就取出緩沖區(qū)對(duì)象,如果緩沖區(qū)沒對(duì)象了,說明沒有發(fā)生寫的很慢的事件,就觸發(fā)空閑事件。反之,記錄當(dāng)前緩沖區(qū)對(duì)象的 hashcode 和 剩余字節(jié)數(shù),再和之前的比較,如果任意一個(gè)不相等,說明數(shù)據(jù)在變化,或者說數(shù)據(jù)在慢慢的寫出去。那么就更新這兩個(gè)值,留在下一次判斷。
- 4、繼續(xù)判斷 first ,如果是 fasle,說明這是第二次調(diào)用,就不用觸發(fā)空閑事件了。
所有事件的 run 方法
這個(gè)類叫做 AllIdleTimeoutTask ,表示這個(gè)監(jiān)控著所有的事件。當(dāng)讀寫事件發(fā)生時(shí),都會(huì)記錄。代碼邏輯和寫事件的的基本一致,除了這里:
private final class AllIdleTimeoutTask extends AbstractIdleTask {
AllIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = allIdleTimeNanos;
if (!reading) {
// 當(dāng)前時(shí)間減去 最后一次寫或讀 的時(shí)間 ,若大于0,說明超時(shí)了
nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}
if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and
// notify the callback.
allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstAllIdleEvent;
firstAllIdleEvent = false;
try {
if (hasOutputChanged(ctx, first)) {
return;
}
IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Either read or write occurred before the timeout - set a new
// timeout with shorter delay.
allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
這里的時(shí)間計(jì)算是取讀寫事件中的最大值來的。然后像寫事件一樣,判斷是否發(fā)生了寫的慢的情況。最后調(diào)用 ctx.fireUserEventTriggered(evt) 方法。
通常這個(gè)使用的是最多的。構(gòu)造方法一般是:
pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));
讀寫都是 0 表示禁用,30 表示 30 秒內(nèi)沒有任務(wù)讀寫事件發(fā)生,就觸發(fā)事件。注意,當(dāng)不是 0 的時(shí)候,這三個(gè)任務(wù)會(huì)重疊。
- 心跳檢測(cè)也是一種Handler,在啟動(dòng)時(shí)添加到ChannelPipeline管道中,當(dāng)有讀寫操作時(shí)消息在其中傳遞;
socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
- IdleStateHandler的channelActive()方法在socket通道建立時(shí)被觸發(fā)
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
initialize(ctx);
super.channelActive(ctx);
}
- channelActive()方法調(diào)用Initialize()方法,根據(jù)配置的readerIdleTime,WriteIdleTIme等超時(shí)事件參數(shù)往任務(wù)隊(duì)列taskQueue中添加定時(shí)任務(wù)task ;
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
- 定時(shí)任務(wù)添加到對(duì)應(yīng)線程EventLoopExecutor對(duì)應(yīng)的任務(wù)隊(duì)列taskQueue中,在對(duì)應(yīng)線程的run()方法中循環(huán)執(zhí)行
- 用當(dāng)前時(shí)間減去最后一次channelRead方法調(diào)用的時(shí)間判斷是否空閑超時(shí);
- 如果空閑超時(shí)則創(chuàng)建空閑超時(shí)事件并傳遞到channelPipeline中;
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
- 在管道中傳遞調(diào)用自定義的userEventTrigger()方法
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
總結(jié):
- IdleStateHandler心跳檢測(cè)主要是通過向線程任務(wù)隊(duì)列中添加定時(shí)任務(wù),判斷channelRead()方法或write()方法是否調(diào)用空閑超時(shí),如果超時(shí)則觸發(fā)超時(shí)事件執(zhí)行自定義userEventTrigger()方法;
- Netty通過IdleStateHandler實(shí)現(xiàn)最常見的心跳機(jī)制不是一種雙向心跳的PING-PONG模式,而是客戶端發(fā)送心跳數(shù)據(jù)包,服務(wù)端接收心跳但不回復(fù),因?yàn)槿绻?wù)端同時(shí)有上千個(gè)連接,心跳的回復(fù)需要消耗大量網(wǎng)絡(luò)資源;如果服務(wù)端一段時(shí)間內(nèi)沒有收到客戶端的心跳數(shù)據(jù)包則認(rèn)為客戶端已經(jīng)下線,將通道關(guān)閉避免資源的浪費(fèi);在這種心跳模式下服務(wù)端可以感知客戶端的存活情況,無(wú)論是宕機(jī)的正常下線還是網(wǎng)絡(luò)問題的非正常下線,服務(wù)端都能感知到,而客戶端不能感知到服務(wù)端的非正常下線;
- 要想實(shí)現(xiàn)客戶端感知服務(wù)端的存活情況,需要進(jìn)行雙向的心跳;Netty中的channelInactive()方法是通過Socket連接關(guān)閉時(shí)揮手?jǐn)?shù)據(jù)包觸發(fā)的,因此可以通過channelInactive()方法感知正常的下線情況,但是因?yàn)榫W(wǎng)絡(luò)異常等非正常下線則無(wú)法感知;
參考:
https://www.cnblogs.com/java-chen-hao/p/11453198.html