netty http codec實現(xiàn)

測試demo和啟動

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
 
public class HttpServer {
 
    private final int port;
 
    public HttpServer(int port) {
        this.port = port;
    }
 
    public static void main(String[] args) throws Exception {
        new HttpServer(9004).start();
    }
 
    public void start() throws Exception {
        ServerBootstrap b = new ServerBootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();
        b.group(group).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch)
                            throws Exception {
                        System.out.println("initChannel ch:" + ch);
                        ch.pipeline()
                                .addLast("decoder", new HttpRequestDecoder())   // 1
                                .addLast("encoder", new HttpResponseEncoder())  // 2
                                .addLast("aggregator", new HttpObjectAggregator(512 * 1024))    // 3
                                .addLast("handler", new HttpHandler());        // 4
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128) // determining the number of connections queued
                .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
 
        b.bind(port).sync();
    }
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
 
import static io.netty.handler.codec.http.HttpHeaders.Names.*;
 
public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> { // 1
 
 
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
        System.out.println("class:" + msg.getClass().getName());
        DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                HttpResponseStatus.OK,
                Unpooled.wrappedBuffer("test".getBytes())); // 2
 
        HttpHeaders heads = response.headers();
        heads.add(CONTENT_TYPE, "text/plain; charset=UTF-8");
        heads.add(CONTENT_LENGTH, response.content().readableBytes()); // 3
        heads.add(CONNECTION, "keep-alive");
        ctx.write(response);
    }
 
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete");
        super.channelReadComplete(ctx);
        ctx.flush(); // 4
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exceptionCaught");
        if (null != cause) cause.printStackTrace();
        if (null != ctx) ctx.close();
    }
}
package main
 
import (
    "fmt"
    "net"
    "time"
)
 
func main() {
    c, err := net.Dial("tcp", "localhost:9004")
    if err != nil {
        fmt.Println(err)
        return
    }
 
    go func() {
        for {
            buf := make([]byte, 100)
            n, err := c.Read(buf)
            if err != nil {
                fmt.Print(err)
                return
            }
            fmt.Println(string(buf[:n]))
        }
    }()
     
    c.Write([]byte("GET /"))
    time.Sleep(200 * time.Millisecond)
    c.Write([]byte(" HTTP/1.1\r\n"))
    c.Write([]byte("Host: localhost:9004\r\n"))
    time.Sleep(5 * time.Second)
    c.Write([]byte("Accept: */*\r\n"))
    time.Sleep(200 * time.Millisecond)
    c.Write([]byte("Content-Length: 0\r\n"))
    c.Write([]byte("\r\n"))
    time.Sleep(500 * time.Millisecond)
     
    time.Sleep(5 * time.Second)
}

啟動服務端。
啟動客戶端?;騮elnet手動進行http協(xié)議文本拼接。

執(zhí)行結(jié)果

由執(zhí)行結(jié)果可看出:將http協(xié)議報文隨意拆分上傳,中間有不同程度的sleep,服務端仍然可以正確處理。

分析過程

函數(shù)調(diào)用棧信息:


image.png

代碼注釋:
io.netty.handler.codec.ByteToMessageDecoder

   @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {   //1. channel接收到io事件,傳入byteBuf??赡苁峭暾膆ttp協(xié)議請求,也可能是不完整;header可能不足行或跨行,即協(xié)議的語義不夠完整。
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                //2. cumulation為null,創(chuàng)建可進行累計的ByteBuf。
                if (first) {
                    cumulation = data; 
                } else {
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()
                            || cumulation.refCnt() > 1) {
                        // Expand cumulation (by replace it) when either there is not more room in the buffer
                        // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                        // duplicate().retain().
                        //
                        // See:
                        // - https://github.com/netty/netty/issues/2327
                        // - https://github.com/netty/netty/issues/1764
                        expandCumulation(ctx, data.readableBytes());
                    }
                    //3. cumulation不為null,即表明當前有不完整的http語義的報文,需要追加當前IO事件讀取到的byteBuf。
                    //比如:上一個byteBuf中是"GET /", http codec 發(fā)現(xiàn)請求行不完整,需要繼續(xù)等待新IO事件,接收新的byteBuf,進行累加,再次觸發(fā)http codec的解析。
                    cumulation.writeBytes(data);
                    data.release();
                }
                //4. 進行http解碼操作。
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                //5. 如果cumulation不為null且沒有可讀數(shù)據(jù),就釋放cumulation。即當cumulation中沒有不完整的http語義就可以釋放cumulation,有不完整http語義就不能釋放。
                //這里需要注意,不完整的http語義不是完整的請求,比如:Host: localhost:9004是完整的一個請求頭,是完整的http語義,但不是完整請求。
                //因此http codec 解析后會將key value存到對應的反序列化的message對象中,該cumulation可以釋放了,不需要等待完整請求接收后才釋放數(shù)據(jù)。
                if (cumulation != null && !cumulation.isReadable()) {
                    cumulation.release();
                    cumulation = null;
                }
                int size = out.size();
                decodeWasNull = size == 0;

                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

io.netty.handler.codec.ReplayingDecoder

@Override
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        //1. 接收byteBuf,即上文中的cumulation。
        replayable.setCumulation(in);
        try {
            while (in.isReadable()) {
                int oldReaderIndex = checkpoint = in.readerIndex();
                int outSize = out.size();
                S oldState = state;
                int oldInputLength = in.readableBytes();
                try {
                    //2. 發(fā)起codec解析
                    decode(ctx, replayable, out);

                    // Check if this handler was removed before continuing the loop.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See https://github.com/netty/netty/issues/1664
                    if (ctx.isRemoved()) {
                        break;
                    }

                    if (outSize == out.size()) {
                        if (oldInputLength == in.readableBytes() && oldState == state) {
                            throw new DecoderException(
                                    StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
                                    "data or change its state if it did not decode anything.");
                        } else {
                            // Previous data has been discarded or caused state transition.
                            // Probably it is reading on.
                            continue;
                        }
                    }
                } catch (Signal replay) {
                    //3. codec解析中發(fā)現(xiàn)有不完整的http語義,會拋出replay信號異常,用于下一次“重播”
                    replay.expect(REPLAY);

                    // Check if this handler was removed before continuing the loop.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See https://github.com/netty/netty/issues/1664
                    if (ctx.isRemoved()) {
                        break;
                    }

                    // Return to the checkpoint (or oldPosition) and retry. 回退checkpoint,即重置byteBuf讀取指針。
                    int checkpoint = this.checkpoint;
                    if (checkpoint >= 0) {
                        in.readerIndex(checkpoint);
                    } else {
                        // Called by cleanup() - no need to maintain the readerIndex
                        // anymore because the buffer has been released already.
                    }
                    break;
                }

                if (oldReaderIndex == in.readerIndex() && oldState == state) {
                    throw new DecoderException(
                           StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
                           "or change its state if it decoded something.");
                }
                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }

io.netty.handler.codec.http.HttpObjectDecoder

@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        //1. 根據(jù)當前狀態(tài)嘗試完成當前語義的解析。
        switch (state()) {
        //2. 接收http請求前,先跳過io buffer中的空白行或者其他控制符。比如在telnet端口后不斷進行換行回車操作,該邏輯進行過濾。
        //3. 沒有錯誤,更新state為READ_INITIAL,更新讀取指針為最新。
        case SKIP_CONTROL_CHARS: {
            try {
                skipControlCharacters(buffer);
                checkpoint(State.READ_INITIAL);
            } finally {
                checkpoint();
            }
            // fall-through
        }
        //3. 上一個case沒有break,fallthrough。
        //解析請求行信息。如果滿足 METHOD URL VERSION的格式,則初始化請求對象,便于后續(xù)數(shù)據(jù)的反序列化,更新state為READ_HEADER,更新讀取指針為最新。
        //如果請求行不完整,lineParser.parse(buffer)會拋出REPLAY異常
        case READ_INITIAL: try {
            String[] initialLine = splitInitialLine(lineParser.parse(buffer));
            if (initialLine.length < 3) {
                // Invalid initial line - ignore.
                checkpoint(State.SKIP_CONTROL_CHARS);
                return;
            }

            message = createMessage(initialLine);
            checkpoint(State.READ_HEADER);
            // fall-through
        } catch (Exception e) {
            out.add(invalidMessage(e));
            return;
        }
        //4. readHeaders負責解析header,遇到兩次連續(xù)\r\n,認為header結(jié)束,更新state狀態(tài)為下一個狀態(tài)。
        //根據(jù)body長度下一個狀態(tài)可能是READ_CHUNK_SIZE、READ_FIXED_LENGTH_CONTENT。
        //如果header數(shù)據(jù)是不完整的語義,則會拋出REPLAY異常,狀態(tài)保持不變,等待新IO事件,再次進入此邏輯。
        case READ_HEADER: try {
            State nextState = readHeaders(buffer);
            checkpoint(nextState);
            switch (nextState) {
            case SKIP_CONTROL_CHARS:
                // fast-path
                // No content is expected.
                out.add(message);
                out.add(LastHttpContent.EMPTY_LAST_CONTENT);
                reset();
                return;
            case READ_CHUNK_SIZE:
                if (!chunkedSupported) {
                    throw new IllegalArgumentException("Chunked messages not supported");
                }
                // Chunked encoding - generate HttpMessage first.  HttpChunks will follow.
                out.add(message);
                return;
            default:
                //5. 如果body長度為空,直接返回發(fā)序列化好的message對象。
                long contentLength = contentLength();
                if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
                    out.add(message);
                    out.add(LastHttpContent.EMPTY_LAST_CONTENT);
                    reset();
                    return;
                }
    ...略
    }

readHeaders

private State readHeaders(ByteBuf buffer) {
        final HttpMessage message = this.message;
        final HttpHeaders headers = message.headers();

        //1. 不斷循環(huán)解析header。
        //如果行數(shù)據(jù)長度大于0,完成key value拆分,則解析下一行數(shù)據(jù);
        //如果行數(shù)據(jù)長度等于0,退出循環(huán)。認為header已經(jīng)全部接收處理。
        //如果行數(shù)據(jù)不是完整的語義,在headerParser.parse(buffer)中會拋出REPLAY異常。
        AppendableCharSequence line = headerParser.parse(buffer);
        if (line.length() > 0) {
            do {
                char firstChar = line.charAt(0);
                if (name != null && (firstChar == ' ' || firstChar == '\t')) {
                    StringBuilder buf = new StringBuilder(value.length() + line.length() + 1);
                    buf.append(value);
                    buf.append(' ');
                    buf.append(line.toString().trim());
                    value = buf.toString();
                } else {
                    if (name != null) {
                        headers.add(name, value);
                    }
                    splitHeader(line);
                }

                line = headerParser.parse(buffer);
            } while (line.length() > 0);
        }

        // Add the last header.
        if (name != null) {
            headers.add(name, value);
        }
        // reset name and value fields
        name = null;
        value = null;

        //2. 解析完所有header,判斷body狀態(tài)。
        State nextState;

        if (isContentAlwaysEmpty(message)) {
            HttpHeaders.removeTransferEncodingChunked(message);
            nextState = State.SKIP_CONTROL_CHARS;
        } else if (HttpHeaders.isTransferEncodingChunked(message)) {
            nextState = State.READ_CHUNK_SIZE;
        } else if (contentLength() >= 0) {
            nextState = State.READ_FIXED_LENGTH_CONTENT;
        } else {
            nextState = State.READ_VARIABLE_LENGTH_CONTENT;
        }
        return nextState;
    }

parser的全部邏輯

public AppendableCharSequence parse(ByteBuf buffer) {
    seq.reset();
    //1. 遍歷buf每個字節(jié),直到遇到\r\n,并返回索引值。如果沒有拋出異常,則更新readerIndex為下一個索引。即保證從下一個header開始讀取。
    int i = buffer.forEachByte(this);
    buffer.readerIndex(i + 1);

    // Call checkpoint to make sure the readerIndex is updated correctly
    checkpoint();
    return seq;
}

@Override
public int forEachByte(ByteBufProcessor processor) {
    //2. 索引值如果<0,即沒有查找到\r\n,拋出REPLAY異常。
    int ret = buffer.forEachByte(processor);
    if (ret < 0) {
        throw REPLAY;
    } else {
        return ret;
    }
}

@Override
public int forEachByte(ByteBufProcessor processor) {
    int index = readerIndex;
    int length = writerIndex - index;
    ensureAccessible();
    //3. length是當前待讀取的長度。
    return forEachByteAsc0(index, length, processor);
}

private int forEachByteAsc0(int index, int length, ByteBufProcessor processor) {
    if (processor == null) {
        throw new NullPointerException("processor");
    }
    //4. 待讀取長度為0,無可讀取數(shù)據(jù),返回-1。如果是http協(xié)議的空白行,\r\n,長度是2。不會滿足當前條件。
    if (length == 0) {
        return -1;
    }

    final int endIndex = index + length;
    int i = index;
    try {
        do {
            //5. 循環(huán)遍歷每個字節(jié)。不是目標結(jié)束符,index++,判斷下一個。
            if (processor.process(_getByte(i))) {
                i ++;
            } else {
                return i;
            }
        } while (i < endIndex);
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }

    return -1;
}

@Override
public boolean process(byte value) throws Exception {
    char nextByte = (char) value;
    if (nextByte == HttpConstants.CR) {
        return true;
    }
    //6. 讀到\n行終止符,返回false,終止上層遍歷。
    if (nextByte == HttpConstants.LF) {
        return false;
    }
    if (size >= maxLength) {
        // TODO: Respond with Bad Request and discard the traffic
        //    or close the connection.
        //       No need to notify the upstream handlers - just log.
        //       If decoding a response, just throw an exception.
        throw newException(maxLength);
    }
    size ++;
    seq.append(nextByte);
    return true;
}

圖示說明

http parser design.png
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容