轉自:https://blog.csdn.net/u013967175/article/details/78591810
基礎
- 心跳機制
- 心跳是在TCP長連接中,客戶端和服務端定時向對方發(fā)送數(shù)據(jù)包通知對方自己還在線,保證連接的有效性的一種機制
- 在服務器和客戶端之間一定時間內沒有數(shù)據(jù)交互時, 即處于 idle 狀態(tài)時, 客戶端或服務器會發(fā)送一個特殊的數(shù)據(jù)包給對方, 當接收方收到這個數(shù)據(jù)報文后, 也立即發(fā)送一個特殊的數(shù)據(jù)報文, 回應發(fā)送方, 此即一個 PING-PONG 交互. 自然地, 當某一端收到心跳消息后, 就知道了對方仍然在線, 這就確保 TCP 連接的有效性.
- 心跳實現(xiàn)
- 使用TCP協(xié)議層的Keeplive機制,但是該機制默認的心跳時間是2小時,依賴操作系統(tǒng)實現(xiàn)不夠靈活;
- 應用層實現(xiàn)自定義心跳機制,比如Netty實現(xiàn)心跳機制
IdleStateHandler心跳檢測實例
服務端
- 服務端添加IdleStateHandler心跳檢測處理器,并添加自定義處理Handler類實現(xiàn)userEventTriggered()方法作為超時事件的邏輯處理;
- 設定IdleStateHandler心跳檢測每五秒進行一次讀檢測,如果五秒內ChannelRead()方法未被調用則觸發(fā)一次userEventTrigger()方法
ServerBootstrap b= new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new HeartBeatServerHandler());
}
});
- 自定義處理類Handler繼承ChannlInboundHandlerAdapter,實現(xiàn)其userEventTriggered()方法,在出現(xiàn)超時事件時會被觸發(fā),包括讀空閑超時或者寫空閑超時;
class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
private int lossConnectCount = 0;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("已經5秒未收到客戶端的消息了!");
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.READER_IDLE){
lossConnectCount++;
if (lossConnectCount>2){
System.out.println("關閉這個不活躍通道!");
ctx.channel().close();
}
}
}else {
super.userEventTriggered(ctx,evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
lossConnectCount = 0;
System.out.println("client says: "+msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客戶端
- 客戶端添加IdleStateHandler心跳檢測處理器,并添加自定義處理Handler類實現(xiàn)userEventTriggered()方法作為超時事件的邏輯處理;
- 設定IdleStateHandler心跳檢測每四秒進行一次寫檢測,如果四秒內write()方法未被調用則觸發(fā)一次userEventTrigger()方法,實現(xiàn)客戶端每四秒向服務端發(fā)送一次消息;
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
socketChannel.pipeline().addLast(new StringEncoder());
socketChannel.pipeline().addLast(new HeartBeatClientHandler());
}
});
- 自定義處理類Handler繼承ChannlInboundHandlerAdapter,實現(xiàn)自定義userEventTrigger()方法,如果出現(xiàn)超時時間就會被觸發(fā),包括讀空閑超時或者寫空閑超時;
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("客戶端循環(huán)心跳監(jiān)測發(fā)送: "+new Date());
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.WRITER_IDLE){
if (curTime<beatTime){
curTime++;
ctx.writeAndFlush("biubiu");
}
}
}
}
IdleStateHandler源碼分析
-
IdleStateHandler構造器
readerIdleTime讀空閑超時時間設定,如果channelRead()方法超過readerIdleTime時間未被調用則會觸發(fā)超時事件調用userEventTrigger()方法;
writerIdleTime寫空閑超時時間設定,如果write()方法超過writerIdleTime時間未被調用則會觸發(fā)超時事件調用userEventTrigger()方法;
allIdleTime所有類型的空閑超時時間設定,包括讀空閑和寫空閑;
unit時間單位,包括時分秒等;
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
- 心跳檢測也是一種Handler,在啟動時添加到ChannelPipeline管道中,當有讀寫操作時消息在其中傳遞;
socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
- IdleStateHandler的channelActive()方法在socket通道建立時被觸發(fā)
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
initialize(ctx);
super.channelActive(ctx);
}
- channelActive()方法調用Initialize()方法,根據(jù)配置的readerIdleTime,WriteIdleTIme等超時事件參數(shù)往任務隊列taskQueue中添加定時任務task ;
private void initialize(ChannelHandlerContext ctx) {
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
-
定時任務添加到對應線程EventLoopExecutor對應的任務隊列taskQueue中,在對應線程的run()方法中循環(huán)執(zhí)行
用當前時間減去最后一次channelRead方法調用的時間判斷是否空閑超時;
如果空閑超時則創(chuàng)建空閑超時事件并傳遞到channelPipeline中;
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
- 在管道中傳遞調用自定義的userEventTrigger()方法
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
總結
IdleStateHandler心跳檢測主要是通過向線程任務隊列中添加定時任務,判斷channelRead()方法或write()方法是否調用空閑超時,如果超時則觸發(fā)超時事件執(zhí)行自定義userEventTrigger()方法;
Netty通過IdleStateHandler實現(xiàn)最常見的心跳機制不是一種雙向心跳的PING-PONG模式,而是客戶端發(fā)送心跳數(shù)據(jù)包,服務端接收心跳但不回復,因為如果服務端同時有上千個連接,心跳的回復需要消耗大量網絡資源;如果服務端一段時間內沒有收到客戶端的心跳數(shù)據(jù)包則認為客戶端已經下線,將通道關閉避免資源的浪費;在這種心跳模式下服務端可以感知客戶端的存活情況,無論是宕機的正常下線還是網絡問題的非正常下線,服務端都能感知到,而客戶端不能感知到服務端的非正常下線;
要想實現(xiàn)客戶端感知服務端的存活情況,需要進行雙向的心跳;Netty中的channelInactive()方法是通過Socket連接關閉時揮手數(shù)據(jù)包觸發(fā)的,因此可以通過channelInactive()方法感知正常的下線情況,但是因為網絡異常等非正常下線則無法感知;