Netty筆記4-如何實(shí)現(xiàn)長(zhǎng)連接

? 前面三章介紹了Netty的一些基本用法,這一章介紹怎么使用Netty來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的長(zhǎng)連接demo。

關(guān)于長(zhǎng)連接的背景知識(shí),可以參考《如何使用Socket實(shí)現(xiàn)長(zhǎng)連接》

? 一個(gè)簡(jiǎn)單的長(zhǎng)連接demo分為以下幾個(gè)步驟:

長(zhǎng)連接流程
  1. 創(chuàng)建連接(Channel)
  2. 發(fā)心跳包
  3. 發(fā)消息,并通知其他用戶(hù)
  4. 一段時(shí)間沒(méi)收到心跳包或者用戶(hù)主動(dòng)關(guān)閉之后關(guān)閉連接

? 看似簡(jiǎn)單的步驟,里面有兩個(gè)技術(shù)難點(diǎn):

  1. 如何保存已創(chuàng)建的Channel

    這里我們是將Channel放在一個(gè)Map中,以Channel.hashCode()作為key

    其實(shí)這樣做有一個(gè)劣勢(shì),就是不適合水平擴(kuò)展,每個(gè)機(jī)器都有一個(gè)連接數(shù)的上線,如果需要實(shí)現(xiàn)多用戶(hù)實(shí)時(shí)在線,對(duì)機(jī)器的數(shù)量要求會(huì)很高,在這里我們不多做討論,不同的業(yè)務(wù)場(chǎng)景,設(shè)計(jì)方案也是不同的,可以在長(zhǎng)連接方案和客戶(hù)端輪詢(xún)方案中進(jìn)行選擇。

  2. 如何自動(dòng)關(guān)閉沒(méi)有心跳的連接

    Netty有一個(gè)比較好的Feature,就是ScheduledFuture,他可以通過(guò)ChannelHandlerContext.executor().schedule()創(chuàng)建,支持延時(shí)提交,也支持取消任務(wù),這就給我們心跳包的自動(dòng)關(guān)閉提供了一個(gè)很好的實(shí)現(xiàn)方案。

開(kāi)始動(dòng)手

? 首先,我們需要用一個(gè)JavaBean來(lái)封裝通信的協(xié)議內(nèi)容,在這里我們只需要三個(gè)數(shù)據(jù)就行了:

  1. type : byte,表示消息的類(lèi)型,有心跳類(lèi)型和內(nèi)容類(lèi)型
  2. length : int,表示消息的長(zhǎng)度
  3. content : String,表示消息的內(nèi)容(心跳包在這里沒(méi)有內(nèi)容)

? 然后,因?yàn)槲覀冃枰獙hannel和ScheduledFuture緩存在Map里面,所以需要將兩個(gè)對(duì)象組合成一個(gè)JavaBean。

? 接著,需要完成輸入輸出流的解析和轉(zhuǎn)換,我們需要重寫(xiě)Decoder和Encoder,具體可以參考Netty筆記3-Decoder和Encoder。

? 最后,就是需要完成ChannelHandler了,代碼如下:

package com.dz.netty.live;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * Created by RoyDeng on 17/7/20.
 */
public class LiveHandler extends SimpleChannelInboundHandler<LiveMessage> { // 1

    private static Map<Integer, LiveChannelCache> channelCache = new HashMap<>();
    private Logger logger = LoggerFactory.getLogger(LiveHandler.class);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LiveMessage msg) throws Exception {
        Channel channel = ctx.channel();
        final int hashCode = channel.hashCode();
        System.out.println("channel hashCode:" + hashCode + " msg:" + msg + " cache:" + channelCache.size());

        if (!channelCache.containsKey(hashCode)) {
            System.out.println("channelCache.containsKey(hashCode), put key:" + hashCode);
            channel.closeFuture().addListener(future -> {
                System.out.println("channel close, remove key:" + hashCode);
                channelCache.remove(hashCode);
            });
            ScheduledFuture scheduledFuture = ctx.executor().schedule(
                    () -> {
                        System.out.println("schedule runs, close channel:" + hashCode);
                        channel.close();
                    }, 10, TimeUnit.SECONDS);
            channelCache.put(hashCode, new LiveChannelCache(channel, scheduledFuture));
        }

        switch (msg.getType()) {
            case LiveMessage.TYPE_HEART: {
                LiveChannelCache cache = channelCache.get(hashCode);
                ScheduledFuture scheduledFuture = ctx.executor().schedule(
                        () -> channel.close(), 5, TimeUnit.SECONDS);
                cache.getScheduledFuture().cancel(true);
                cache.setScheduledFuture(scheduledFuture);
                ctx.channel().writeAndFlush(msg);
                break;
            }
            case LiveMessage.TYPE_MESSAGE: {
                channelCache.entrySet().stream().forEach(entry -> {
                    Channel otherChannel = entry.getValue().getChannel();
                    otherChannel.writeAndFlush(msg);
                });
                break;
            }
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        logger.debug("channelReadComplete");
        super.channelReadComplete(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.debug("exceptionCaught");
        if(null != cause) cause.printStackTrace();
        if(null != ctx) ctx.close();
    }
}

? 寫(xiě)完服務(wù)端之后,我們需要有客戶(hù)端連接來(lái)測(cè)試這個(gè)項(xiàng)目,教程參考如何使用Socket在客戶(hù)端實(shí)現(xiàn)長(zhǎng)連接,代碼如下:

package com.dz.test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Scanner;

/**
 * Created by RoyDeng on 18/2/3.
 */
public class LongConnTest {

    private Logger logger = LoggerFactory.getLogger(LongConnTest.class);

    String host = "localhost";
    int port = 8080;

    public void testLongConn() throws Exception {
        logger.debug("start");
        final Socket socket = new Socket();
        socket.connect(new InetSocketAddress(host, port));
        Scanner scanner = new Scanner(System.in);
        new Thread(() -> {
            while (true) {
                try {
                    byte[] input = new byte[64];
                    int readByte = socket.getInputStream().read(input);
                    logger.debug("readByte " + readByte);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        int code;
        while (true) {
            code = scanner.nextInt();
            logger.debug("input code:" + code);
            if (code == 0) {
                break;
            } else if (code == 1) {
                ByteBuffer byteBuffer = ByteBuffer.allocate(5);
                byteBuffer.put((byte) 1);
                byteBuffer.putInt(0);
                socket.getOutputStream().write(byteBuffer.array());
                logger.debug("write heart finish!");
            } else if (code == 2) {
                byte[] content = ("hello, I'm" + hashCode()).getBytes();
                ByteBuffer byteBuffer = ByteBuffer.allocate(content.length + 5);
                byteBuffer.put((byte) 2);
                byteBuffer.putInt(content.length);
                byteBuffer.put(content);
                socket.getOutputStream().write(byteBuffer.array());
                logger.debug("write content finish!");
            }
        }
        socket.close();
    }

    // 因?yàn)镴unit不支持用戶(hù)輸入,所以用main的方式來(lái)執(zhí)行用例
    public static void main(String[] args) throws Exception {
        new LongConnTest().testLongConn();
    }
}

運(yùn)行main方法之后,輸入1表示發(fā)心跳包,輸入2表示發(fā)content,5秒內(nèi)不輸入1則服務(wù)端會(huì)自動(dòng)斷開(kāi)連接。

結(jié)語(yǔ)

? 本項(xiàng)目是我一直想研究的一塊,這個(gè)demo比較簡(jiǎn)單,不能投入到生產(chǎn)環(huán)境中去,因?yàn)椴荒軕?yīng)付連接數(shù)大的應(yīng)用場(chǎng)景。本項(xiàng)目已經(jīng)上傳至github,地址:https://github.com/dzr1990/helloNetty/tree/master,如果有問(wèn)題歡迎評(píng)論留言!

前三篇筆記的快捷通道:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,534評(píng)論 19 139
  • netty常用API學(xué)習(xí) netty簡(jiǎn)介 Netty是基于Java NIO的網(wǎng)絡(luò)應(yīng)用框架. Netty是一個(gè)NIO...
    花丶小偉閱讀 6,118評(píng)論 0 20
  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 綜述 netty通...
    jiangmo閱讀 6,203評(píng)論 0 13
  • 6.2 Channel實(shí)現(xiàn) ![Netty_Channel類(lèi)圖][2] Channel的類(lèi)圖比較清晰。我們主要分析...
    Hypercube閱讀 8,673評(píng)論 6 19
  • @我覺(jué)得人還是得多看看毒雞湯 每次堅(jiān)持不下去的時(shí)候就想想那些更加不容易的人 我媽媽每天都告訴我 吃的苦中苦 方...
    木水呀閱讀 259評(píng)論 0 0

友情鏈接更多精彩內(nèi)容