? 長連接貌似是一個很高深莫測的知識,但是只要你做直播、IM、游戲、彈幕里面的任何一種,或者是你的app想要實時的接收某些消息,你就會要接觸到長連接技術(shù)。本文主要教你如何在客戶端如何使用Socket實現(xiàn)長連接。
Socket背景知識
?? 要做長連接的話,是不能用http協(xié)議來做的,因為http協(xié)議已經(jīng)是應(yīng)用層協(xié)議了,并且http協(xié)議是無狀態(tài)的,而我們要做長連接,肯定是需要在應(yīng)用層封裝自己的業(yè)務(wù),所以就需要基于TCP協(xié)議來做,而基于TCP協(xié)議的話,就要用到Socket了。
Socket是java針對tcp層通信封裝的一套網(wǎng)絡(luò)方案
TCP協(xié)議我們知道,是基于ip(或者域名)和端口對指定機器進行的點對點訪問,他的連接成功有兩個條件,就是對方ip可以到達和端口是開放的
Socket能幫完成TCP三次握手,而應(yīng)用層的頭部信息需要自己去解析,也就是說,自己要制定好協(xié)議,并且要去解析byte
Socket使用方式
Socket看上去不是很好用,因為他是基于java.io來實現(xiàn)的,你要直接跟InputStream和OutputStream打交道,也就是直接跟byte[]打交道,所以用起來并不是這么友好。
下面通過一個簡單的例子,往一臺服務(wù)器發(fā)\01 \00 \00 \00 \00這一串字節(jié),服務(wù)器也返回相同的字節(jié)流,上代碼
? ? public void testSocketChannelBlock() throws Exception {
? ? ? ? final SocketChannel channel = SocketChannel.open(address);
? ? ? ? ByteBuffer output = ByteBuffer.allocate(5);
? ? ? ? output.put((byte) 1);
? ? ? ? output.putInt(0);
? ? ? ? output.flip();
? ? ? ? channel.write(output);
? ? ? ? logger.debug("write complete, start read");
? ? ? ? ByteBuffer input = ByteBuffer.allocate(5);
? ? ? ? int readByte = channel.read(input);
? ? ? ? logger.debug("readByte " + readByte);
? ? ? ? input.flip();
? ? ? ? if (readByte == -1) {
? ? ? ? ? ? logger.debug("readByte == -1, return!");
? ? ? ? ? ? return;
? ? ? ? }
? ? ? ? for (int i = 0; i < readByte; i++) {
? ? ? ? ? ? logger.debug("read [" + i + "]:" + input.get());
? ? ? ? }
? ? }
Selector
我們知道,傳統(tǒng)io是阻塞的,也就是說,一個線程只能處理一個io流,也就是一個Socket。有了Selector之后,一個線程就能處理多個SocketChannel。
Selector的原理是,他能接受多個SocketChannel,然后不斷的遍歷每一個Channel的狀態(tài),如果有Channel已經(jīng)ready了,他就能通過他自身提供的方法,通知到線程,讓線程去處理對應(yīng)的業(yè)務(wù)。流程圖如下:

Netty對nio這一套有比較好的封裝,里面就涉及到了Selector,
Netty 優(yōu)點
1.并發(fā)高
2.傳輸快
3.封裝好
(1)Netty為什么并發(fā)高
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)開發(fā)的網(wǎng)絡(luò)通信框架,對比于BIO(Blocking I/O,阻塞IO),他的并發(fā)性能得到了很大提高,兩張圖讓你了解BIO和NIO的區(qū)別:


從這兩圖可以看出,NIO的單線程能處理連接的數(shù)量比BIO要高出很多,而為什么單線程能處理更多的連接呢?原因就是圖二中出現(xiàn)的Selector。
當(dāng)一個連接建立之后,他有兩個步驟要做,第一步是接收完客戶端發(fā)過來的全部數(shù)據(jù),第二步是服務(wù)端處理完請求業(yè)務(wù)之后返回response給客戶端。NIO和BIO的區(qū)別主要是在第一步。
在BIO中,等待客戶端發(fā)數(shù)據(jù)這個過程是阻塞的,這樣就造成了一個線程只能處理一個請求的情況,而機器能支持的最大線程數(shù)是有限的,這就是為什么BIO不能支持高并發(fā)的原因。
而NIO中,當(dāng)一個Socket建立好之后,Thread并不會阻塞去接受這個Socket,而是將這個請求交給Selector,Selector會不斷的去遍歷所有的Socket,一旦有一個Socket建立完成,他會通知Thread,然后Thread處理完數(shù)據(jù)再返回給客戶端——這個過程是不阻塞的,這樣就能讓一個Thread處理更多的請求了。
下面兩張圖是基于BIO的處理流程和netty的處理流程,輔助你理解兩種方式的差別:


除了BIO和NIO之外,還有一些其他的IO模型,下面這張圖就表示了五種IO模型的處理流程:

BIO,同步阻塞IO,阻塞整個步驟,如果連接少,他的延遲是最低的,因為一個線程只處理一個連接,適用于少連接且延遲低的場景,比如說數(shù)據(jù)庫連接。
NIO,同步非阻塞IO,阻塞業(yè)務(wù)處理但不阻塞數(shù)據(jù)接收,適用于高并發(fā)且處理簡單的場景,比如聊天軟件。
多路復(fù)用IO,他的兩個步驟處理是分開的,也就是說,一個連接可能他的數(shù)據(jù)接收是線程a完成的,數(shù)據(jù)處理是線程b完成的,他比BIO能處理更多請求。
信號驅(qū)動IO,這種IO模型主要用在嵌入式開發(fā),不參與討論。
異步IO,他的數(shù)據(jù)請求和數(shù)據(jù)處理都是異步的,數(shù)據(jù)請求一次返回一次,適用于長連接的業(yè)務(wù)場景。
(2)Netty為什么傳輸快
Netty的傳輸快其實也是依賴了NIO的一個特性——零拷貝。我們知道,Java的內(nèi)存有堆內(nèi)存、棧內(nèi)存和字符串常量池等等,其中堆內(nèi)存是占用內(nèi)存空間最大的一塊,也是Java對象存放的地方,一般我們的數(shù)據(jù)如果需要從IO讀取到堆內(nèi)存,中間需要經(jīng)過Socket緩沖區(qū),也就是說一個數(shù)據(jù)會被拷貝兩次才能到達他的的終點,如果數(shù)據(jù)量大,就會造成不必要的資源浪費。
Netty針對這種情況,使用了NIO中的另一大特性——零拷貝,當(dāng)他需要接收數(shù)據(jù)的時候,他會在堆內(nèi)存之外開辟一塊內(nèi)存,數(shù)據(jù)就直接從IO讀到了那塊內(nèi)存中去,在netty里面通過ByteBuf可以直接對這些數(shù)據(jù)進行直接操作,從而加快了傳輸速度。


(3)為什么說Netty封裝好?
阻塞I/O
public class PlainOioServer {
? ? public void serve(int port) throws IOException {
? ? ? ? final ServerSocket socket = new ServerSocket(port);? ? //1
? ? ? ? try {
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? final Socket clientSocket = socket.accept();? ? //2
? ? ? ? ? ? ? ? System.out.println("Accepted connection from " + clientSocket);
? ? ? ? ? ? ? ? new Thread(new Runnable() {? ? ? ? ? ? ? ? ? ? ? ? //3
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void run() {
? ? ? ? ? ? ? ? ? ? ? ? OutputStream out;
? ? ? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? ? ? out = clientSocket.getOutputStream();
? ? ? ? ? ? ? ? ? ? ? ? ? ? out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));? ? ? ? ? ? ? ? ? ? ? ? ? ? //4
? ? ? ? ? ? ? ? ? ? ? ? ? ? out.flush();
? ? ? ? ? ? ? ? ? ? ? ? ? ? clientSocket.close();? ? ? ? ? ? ? ? //5
? ? ? ? ? ? ? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? clientSocket.close();
? ? ? ? ? ? ? ? ? ? ? ? ? ? } catch (IOException ex) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // ignore on close
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }).start();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //6
? ? ? ? ? ? }
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
}
非阻塞IO
public class PlainNioServer {
? ? public void serve(int port) throws IOException {
? ? ? ? ServerSocketChannel serverChannel = ServerSocketChannel.open();
? ? ? ? serverChannel.configureBlocking(false);
? ? ? ? ServerSocket ss = serverChannel.socket();
? ? ? ? InetSocketAddress address = new InetSocketAddress(port);
? ? ? ? ss.bind(address);? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //1
? ? ? ? Selector selector = Selector.open();? ? ? ? ? ? ? ? ? ? ? ? //2
? ? ? ? serverChannel.register(selector, SelectionKey.OP_ACCEPT);? ? //3
? ? ? ? final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
? ? ? ? for (;;) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? selector.select();? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //4
? ? ? ? ? ? } catch (IOException ex) {
? ? ? ? ? ? ? ? ex.printStackTrace();
? ? ? ? ? ? ? ? // handle exception
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? ? ? Set<SelectionKey> readyKeys = selector.selectedKeys();? ? //5
? ? ? ? ? ? Iterator<SelectionKey> iterator = readyKeys.iterator();
? ? ? ? ? ? while (iterator.hasNext()) {
? ? ? ? ? ? ? ? SelectionKey key = iterator.next();
? ? ? ? ? ? ? ? iterator.remove();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? if (key.isAcceptable()) {? ? ? ? ? ? ? ? //6
? ? ? ? ? ? ? ? ? ? ? ? ServerSocketChannel server =
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (ServerSocketChannel)key.channel();
? ? ? ? ? ? ? ? ? ? ? ? SocketChannel client = server.accept();
? ? ? ? ? ? ? ? ? ? ? ? client.configureBlocking(false);
? ? ? ? ? ? ? ? ? ? ? ? client.register(selector, SelectionKey.OP_WRITE |
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? SelectionKey.OP_READ, msg.duplicate());? ? //7
? ? ? ? ? ? ? ? ? ? ? ? System.out.println(
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "Accepted connection from " + client);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? if (key.isWritable()) {? ? ? ? ? ? ? ? //8
? ? ? ? ? ? ? ? ? ? ? ? SocketChannel client =
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (SocketChannel)key.channel();
? ? ? ? ? ? ? ? ? ? ? ? ByteBuffer buffer =
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (ByteBuffer)key.attachment();
? ? ? ? ? ? ? ? ? ? ? ? while (buffer.hasRemaining()) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? if (client.write(buffer) == 0) {? ? ? ? //9
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? client.close();? ? ? ? ? ? ? ? ? ? //10
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } catch (IOException ex) {
? ? ? ? ? ? ? ? ? ? key.cancel();
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? key.channel().close();
? ? ? ? ? ? ? ? ? ? } catch (IOException cex) {
? ? ? ? ? ? ? ? ? ? ? ? // 在關(guān)閉時忽略
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
Netty
public class NettyOioServer {
? ? public void server(int port) throws Exception {
? ? ? ? final ByteBuf buf = Unpooled.unreleasableBuffer(
? ? ? ? ? ? ? ? Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
? ? ? ? EventLoopGroup group = new OioEventLoopGroup();
? ? ? ? try {
? ? ? ? ? ? ServerBootstrap b = new ServerBootstrap();? ? ? ? //1
? ? ? ? ? ? b.group(group)? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //2
? ? ? ? ? ? .channel(OioServerSocketChannel.class)
? ? ? ? ? ? .localAddress(new InetSocketAddress(port))
? ? ? ? ? ? .childHandler(new ChannelInitializer<SocketChannel>() {//3
? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? public void initChannel(SocketChannel ch)
? ? ? ? ? ? ? ? ? ? throws Exception {
? ? ? ? ? ? ? ? ? ? ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {? ? ? ? ? ? //4
? ? ? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? ? ? public void channelActive(ChannelHandlerContext ctx) throws Exception {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? });
? ? ? ? ? ? ? ? }
? ? ? ? ? ? });
? ? ? ? ? ? ChannelFuture f = b.bind().sync();? //6
? ? ? ? ? ? f.channel().closeFuture().sync();
? ? ? ? } finally {
? ? ? ? ? ? group.shutdownGracefully().sync();? ? ? ? //7
? ? ? ? }
? ? }
}
Channel
數(shù)據(jù)傳輸流,與channel相關(guān)的概念有以下四個,上一張圖讓你了解netty里面的Channel。

Channel,表示一個連接,可以理解為每一個請求,就是一個Channel。
ChannelHandler,核心處理業(yè)務(wù)就在這里,用于處理業(yè)務(wù)請求。
ChannelHandlerContext,用于傳輸業(yè)務(wù)數(shù)據(jù)。
ChannelPipeline,用于保存處理過程需要用到的ChannelHandler和ChannelHandlerContext。
ByteBuf
ByteBuf是一個存儲字節(jié)的容器,最大特點就是使用方便,它既有自己的讀索引和寫索引,方便你對整段字節(jié)緩存進行讀寫,也支持get/set,方便你對其中每一個字節(jié)進行讀寫,他的數(shù)據(jù)結(jié)構(gòu)如下圖所示:

Heap Buffer 堆緩沖區(qū)
堆緩沖區(qū)是ByteBuf最常用的模式,他將數(shù)據(jù)存儲在堆空間。
Direct Buffer 直接緩沖區(qū)
直接緩沖區(qū)是ByteBuf的另外一種常用模式,他的內(nèi)存分配都不發(fā)生在堆,jdk1.4引入的nio的ByteBuffer類允許jvm通過本地方法調(diào)用分配內(nèi)存,這樣做有兩個好處
通過免去中間交換的內(nèi)存拷貝, 提升IO處理速度; 直接緩沖區(qū)的內(nèi)容可以駐留在垃圾回收掃描的堆區(qū)以外。
DirectBuffer 在 -XX:MaxDirectMemorySize=xxM大小限制下, 使用 Heap 之外的內(nèi)存, GC對此”無能為力”,也就意味著規(guī)避了在高負載下頻繁的GC過程對應(yīng)用線程的中斷影響.
Composite Buffer 復(fù)合緩沖區(qū)
復(fù)合緩沖區(qū)相當(dāng)于多個不同ByteBuf的視圖,這是netty提供的,jdk不提供這樣的功能。
Codec
Netty中的編碼/解碼器,通過他你能完成字節(jié)與pojo、pojo與pojo的相互轉(zhuǎn)換,從而達到自定義協(xié)議的目的。
在Netty里面最有名的就是HttpRequestDecoder和HttpResponseEncoder了。
認(rèn)識Http請求
在動手寫Netty框架之前,我們先要了解http請求的組成,如下圖:

HTTP Request 第一部分是包含的頭信息
HttpContent 里面包含的是數(shù)據(jù),可以后續(xù)有多個 HttpContent 部分
LastHttpContent 標(biāo)記是 HTTP request 的結(jié)束,同時可能包含頭的尾部信息
完整的 HTTP request,由1,2,3組成

HTTP response 第一部分是包含的頭信息
HttpContent 里面包含的是數(shù)據(jù),可以后續(xù)有多個 HttpContent 部分
LastHttpContent 標(biāo)記是 HTTP response 的結(jié)束,同時可能包含頭的尾部信息
完整的 HTTP response,由1,2,3組成
從request的介紹我們可以看出來,一次http請求并不是通過一次對話完成的,他中間可能有很次的連接。通過上一章我們隊netty的了解,每一次對話都會建立一個channel,并且一個ChannelInboundHandler一般是不會同時去處理多個Channel的。
如何在一個Channel里面處理一次完整的Http請求?這就要用到我們上圖提到的FullHttpRequest,我們只需要在使用netty處理channel的時候,只處理消息是FullHttpRequest的Channel,這樣我們就能在一個ChannelHandler中處理一個完整的Http請求了。
什么是Decoder和Encoder
?? 在學(xué)習(xí)Decoder和Encoder之前,首先要了解他們在具體是個什么東西。在Netty里面,有四個核心概念,這個在第一篇文章提到的,他們的分別是:
Channel,一個客戶端與服務(wù)器通信的通道
ChannelHandler,業(yè)務(wù)邏輯處理器,分為ChannelInboundHandler和ChannelOutboundHandler
ChannelInboundHandler,輸入數(shù)據(jù)處理器
ChannelOutboundHandler,輸出業(yè)務(wù)處理器
通常情況下,業(yè)務(wù)邏輯都是存在于ChannelHandler之中
ChannelPipeline,用于存放ChannelHandler的容器
ChannelContext,通信管道的上下文
他們之間的交流流程如下圖:

他們的交互流程是:
事件傳遞給 ChannelPipeline 的第一個 ChannelHandler
ChannelHandler 通過關(guān)聯(lián)的 ChannelHandlerContext 傳遞事件給 ChannelPipeline 中的 下一個
ChannelHandler 通過關(guān)聯(lián)的 ChannelHandlerContext 傳遞事件給 ChannelPipeline 中的 下一個
而我們本文所需要詳細講的Decoder和Encoder,他們分別就是ChannelInboundHandler和ChannelOutboundHandler,分別用于在數(shù)據(jù)流進來的時候?qū)⒆止?jié)碼轉(zhuǎn)換為消息對象和數(shù)據(jù)流出去的時候?qū)⑾ο筠D(zhuǎn)換為字節(jié)碼。
Encoder
?? Encoder最重要的實現(xiàn)類是MessageToByteEncoder<T>,這個類的作用就是將消息實體T從對象轉(zhuǎn)換成byte,寫入到ByteBuf,然后再丟給剩下的ChannelOutboundHandler傳給客戶端,流程圖如下:

encode方法是繼承MessageToByteEncoder唯一需要重寫的方法,可見其簡單程度。也是因為Encoder相比于Decoder更為簡單,在這里也不多做贅述,直接上代碼:
public class ShortToByteEncoder extends? ?MessageToByteEncoder<Short> {? //1
? ? @Override
? ? public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out)
? ? ? ? ? ? throws Exception {
? ? ? ? out.writeShort(msg);? //2
? ? }
}
Decoder
?? 和Encoder一樣,decoder就是在服務(wù)端收到數(shù)據(jù)的時候,將字節(jié)流轉(zhuǎn)換為實體對象Message。但是和Encoder的處理邏輯不一樣,數(shù)據(jù)傳到服務(wù)端有可能不是一次請求就能完成的,中間可能需要經(jīng)過幾次數(shù)據(jù)傳輸,并且每一次傳輸傳多少數(shù)據(jù)也是不確定的,所以它有兩個重要方法:
decode和decodeLast的不同之處,在于他們的調(diào)用時機不同,正如描述所說,decodeLast只有在Channel的生命周期結(jié)束之前會調(diào)用一次,默認(rèn)是調(diào)用decode方法。
?? 同樣是ToInteger的解碼器,他的代碼如下:
public class ToIntegerDecoder extends ByteToMessageDecoder { //1
? ? @Override
? ? public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
? ? ? ? ? ? throws Exception {
? ? ? ? if (in.readableBytes() >= 4) {? //2
? ? ? ? ? ? out.add(in.readInt());? //3
? ? ? ? }
? ? }
}
從這段代碼可以看出,因為不知道這次請求發(fā)過來多少數(shù)據(jù),所以每次都要判斷byte長度夠不夠4,如果你的數(shù)據(jù)長度更長,且不固定的話,這里的邏輯會變得非常復(fù)雜。所以在這里介紹另一個我們常用的解碼器 :ReplayingDecoder。
ReplayingDecoder
?? ReplayingDecoder 是 byte-to-message 解碼的一種特殊的抽象基類,讀取緩沖區(qū)的數(shù)據(jù)之前需要檢查緩沖區(qū)是否有足夠的字節(jié),使用ReplayingDecoder就無需自己檢查;若ByteBuf中有足夠的字節(jié),則會正常讀取;若沒有足夠的字節(jié)則會停止解碼。
?? RelayingDecoder在使用的時候需要搞清楚的兩個方法是checkpoint(S s)和state(),其中checkpoint的參數(shù)S,代表的是ReplayingDecoder所處的狀態(tài),一般是枚舉類型。RelayingDecoder是一個有狀態(tài)的Handler,狀態(tài)表示的是它目前讀取到了哪一步,checkpoint(S s)是設(shè)置當(dāng)前的狀態(tài),state()是獲取當(dāng)前的狀態(tài)。
?? 在這里我們模擬一個簡單的Decoder,假設(shè)每個包包含length:int和content:String兩個數(shù)據(jù),其中l(wèi)ength可以為0,代表一個空包,大于0的時候代表content的長度。代碼如下:
public class LiveDecoder extends ReplayingDecoder<LiveDecoder.LiveState> { //1
? ? public enum LiveState { //2
? ? ? ? LENGTH,
? ? ? ? CONTENT
? ? }
? ? private LiveMessage message = new LiveMessage();
? ? public LiveDecoder() {
? ? ? ? super(LiveState.LENGTH); // 3
? ? }
? ? @Override
? ? protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
? ? ? ? switch (state()) { // 4
? ? ? ? ? ? case LENGTH:
? ? ? ? ? ? ? ? int length = byteBuf.readInt();
? ? ? ? ? ? ? ? if (length > 0) {
? ? ? ? ? ? ? ? ? ? checkpoint(LiveState.CONTENT); // 5
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? list.add(message); // 6
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? case CONTENT:
? ? ? ? ? ? ? ? byte[] bytes = new byte[message.getLength()];
? ? ? ? ? ? ? ? byteBuf.readBytes(bytes);
? ? ? ? ? ? ? ? String content = new String(bytes);
? ? ? ? ? ? ? ? message.setContent(content);
? ? ? ? ? ? ? ? list.add(message);
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? default:
? ? ? ? ? ? ? ? throw new IllegalStateException("invalid state:" + state());
? ? ? ? }
? ? }
}
繼承ReplayingDecoder,泛型LiveState,用來表示當(dāng)前讀取的狀態(tài)
描述LiveState,有讀取長度和讀取內(nèi)容兩個狀態(tài)
初始化的時候設(shè)置為讀取長度的狀態(tài)
讀取的時候通過state()方法來確定當(dāng)前處于什么狀態(tài)
如果讀取出來的長度大于0,則設(shè)置為讀取內(nèi)容狀態(tài),下一次讀取的時候則從這個位置開始
讀取完成,往結(jié)果里面放解析好的數(shù)據(jù)
?? 以上就是ReplayingDecoder的使用方法,他比ByteToMessageDecoder更加靈活,能夠通過巧妙的方式來處理復(fù)雜的業(yè)務(wù)邏輯,但是也是因為這個原因,使得ReplayingDecoder帶有一定的局限性:
不是所有的標(biāo)準(zhǔn) ByteBuf 操作都被支持,如果調(diào)用一個不支持的操作會拋出 UnreplayableOperationException
ReplayingDecoder 略慢于 ByteToMessageDecoder
所以,如果不引入過多的復(fù)雜性 使用 ByteToMessageDecoder 。否則,使用ReplayingDecoder。
MessageToMessage
?? Encoder和Decoder除了能完成Byte和Message的相互轉(zhuǎn)換之外,為了處理復(fù)雜的業(yè)務(wù)邏輯,還能幫助使用者完成Message和Message的相互轉(zhuǎn)換,我們熟悉的Http協(xié)議的處理,其中就用到了很多MessageToMessage的派生類。
Netty如何實現(xiàn)長連接
一個簡單的長連接demo分為以下幾個步驟:

1.創(chuàng)建連接(Channel)
2.發(fā)心跳包
3.發(fā)消息,并通知其他用戶
4.一段時間沒收到心跳包或者用戶主動關(guān)閉之后關(guān)閉連接
? 看似簡單的步驟,里面有兩個技術(shù)難點:
1.如何保存已創(chuàng)建的Channel
這里我們是將Channel放在一個Map中,以Channel.hashCode()作為key
其實這樣做有一個劣勢,就是不適合水平擴展,每個機器都有一個連接數(shù)的上線,如果需要實現(xiàn)多用戶實時在線,對機器的數(shù)量要求會很高,在這里我們不多做討論,不同的業(yè)務(wù)場景,設(shè)計方案也是不同的,可以在長連接方案和客戶端輪詢方案中進行選擇。
2.如何自動關(guān)閉沒有心跳的連接
Netty有一個比較好的Feature,就是ScheduledFuture,他可以通過ChannelHandlerContext.executor().schedule()創(chuàng)建,支持延時提交,也支持取消任務(wù),這就給我們心跳包的自動關(guān)閉提供了一個很好的實現(xiàn)方案。
開始動手
?? 首先,我們需要用一個JavaBean來封裝通信的協(xié)議內(nèi)容,在這里我們只需要三個數(shù)據(jù)就行了:
1.type : byte,表示消息的類型,有心跳類型和內(nèi)容類型
2.length : int,表示消息的長度
3.content : String,表示消息的內(nèi)容(心跳包在這里沒有內(nèi)容)
?? 然后,因為我們需要將Channel和ScheduledFuture緩存在Map里面,所以需要將兩個對象組合成一個JavaBean。
?? 接著,需要完成輸入輸出流的解析和轉(zhuǎn)換,我們需要重寫Decoder和Encoder,
服務(wù)端:
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Created by RoyDeng on 17/7/20.
*/
public class LiveHandler extends SimpleChannelInboundHandler<LiveMessage> { // 1
? ? private static Map<Integer, LiveChannelCache> channelCache = new HashMap<>();
? ? private Logger logger = LoggerFactory.getLogger(LiveHandler.class);
? ? @Override
? ? protected void channelRead0(ChannelHandlerContext ctx, LiveMessage msg) throws Exception {
? ? ? ? Channel channel = ctx.channel();
? ? ? ? final int hashCode = channel.hashCode();
? ? ? ? System.out.println("channel hashCode:" + hashCode + " msg:" + msg + " cache:" + channelCache.size());
? ? ? ? if (!channelCache.containsKey(hashCode)) {
? ? ? ? ? ? System.out.println("channelCache.containsKey(hashCode), put key:" + hashCode);
? ? ? ? ? ? channel.closeFuture().addListener(future -> {
? ? ? ? ? ? ? ? System.out.println("channel close, remove key:" + hashCode);
? ? ? ? ? ? ? ? channelCache.remove(hashCode);
? ? ? ? ? ? });
? ? ? ? ? ? ScheduledFuture scheduledFuture = ctx.executor().schedule(
? ? ? ? ? ? ? ? ? ? () -> {
? ? ? ? ? ? ? ? ? ? ? ? System.out.println("schedule runs, close channel:" + hashCode);
? ? ? ? ? ? ? ? ? ? ? ? channel.close();
? ? ? ? ? ? ? ? ? ? }, 10, TimeUnit.SECONDS);
? ? ? ? ? ? channelCache.put(hashCode, new LiveChannelCache(channel, scheduledFuture));
? ? ? ? }
? ? ? ? switch (msg.getType()) {
? ? ? ? ? ? case LiveMessage.TYPE_HEART: {
? ? ? ? ? ? ? ? LiveChannelCache cache = channelCache.get(hashCode);
? ? ? ? ? ? ? ? ScheduledFuture scheduledFuture = ctx.executor().schedule(
? ? ? ? ? ? ? ? ? ? ? ? () -> channel.close(), 5, TimeUnit.SECONDS);
? ? ? ? ? ? ? ? cache.getScheduledFuture().cancel(true);
? ? ? ? ? ? ? ? cache.setScheduledFuture(scheduledFuture);
? ? ? ? ? ? ? ? ctx.channel().writeAndFlush(msg);
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? ? ? case LiveMessage.TYPE_MESSAGE: {
? ? ? ? ? ? ? ? channelCache.entrySet().stream().forEach(entry -> {
? ? ? ? ? ? ? ? ? ? Channel otherChannel = entry.getValue().getChannel();
? ? ? ? ? ? ? ? ? ? otherChannel.writeAndFlush(msg);
? ? ? ? ? ? ? ? });
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? @Override
? ? public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
? ? ? ? logger.debug("channelReadComplete");
? ? ? ? super.channelReadComplete(ctx);
? ? }
? ? @Override
? ? public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
? ? ? ? logger.debug("exceptionCaught");
? ? ? ? if(null != cause) cause.printStackTrace();
? ? ? ? if(null != ctx) ctx.close();
? ? }
}
2.客戶端
package com.dz.test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Scanner;
public class LongConnection {
? ? private Logger logger = LoggerFactory.getLogger(LongConnection.class);
? ? String host = "localhost";
? ? int port = 8080;
? ? public void testLongConnection() throws Exception {
? ? ? ? logger.debug("start");
? ? ? ? final Socket socket = new Socket();
? ? ? ? socket.connect(new InetSocketAddress(host, port));
? ? ? ? Scanner scanner = new Scanner(System.in);
? ? ? ? new Thread(() -> {
? ? ? ? ? ? while (true) {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? byte[] input = new byte[64];
? ? ? ? ? ? ? ? ? ? int readByte = socket.getInputStream().read(input);
? ? ? ? ? ? ? ? ? ? logger.debug("readByte " + readByte);
? ? ? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }).start();
? ? ? ? int code;
? ? ? ? while (true) {
? ? ? ? ? ? code = scanner.nextInt();
? ? ? ? ? ? logger.debug("input code:" + code);
? ? ? ? ? ? if (code == 0) {
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? } else if (code == 1) {
? ? ? ? ? ? ? ? ByteBuffer byteBuffer = ByteBuffer.allocate(5);
? ? ? ? ? ? ? ? byteBuffer.put((byte) 1);
? ? ? ? ? ? ? ? byteBuffer.putInt(0);
? ? ? ? ? ? ? ? socket.getOutputStream().write(byteBuffer.array());
? ? ? ? ? ? ? ? logger.debug("write heart finish!");
? ? ? ? ? ? } else if (code == 2) {
? ? ? ? ? ? ? ? byte[] content = ("hello, I'm" + hashCode()).getBytes();
? ? ? ? ? ? ? ? ByteBuffer byteBuffer = ByteBuffer.allocate(content.length + 5);
? ? ? ? ? ? ? ? byteBuffer.put((byte) 2);
? ? ? ? ? ? ? ? byteBuffer.putInt(content.length);
? ? ? ? ? ? ? ? byteBuffer.put(content);
? ? ? ? ? ? ? ? socket.getOutputStream().write(byteBuffer.array());
? ? ? ? ? ? ? ? logger.debug("write content finish!");
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? socket.close();
? ? }
? ? // 因為Junit不支持用戶輸入,所以用main的方式來執(zhí)行用例
? ? public static void main(String[] args) throws Exception {
? ? ? ? new LongConnection().testLongConn();
? ? }
}