測試demo和啟動
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
public class HttpServer {
private final int port;
public HttpServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
new HttpServer(9004).start();
}
public void start() throws Exception {
ServerBootstrap b = new ServerBootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
b.group(group).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
System.out.println("initChannel ch:" + ch);
ch.pipeline()
.addLast("decoder", new HttpRequestDecoder()) // 1
.addLast("encoder", new HttpResponseEncoder()) // 2
.addLast("aggregator", new HttpObjectAggregator(512 * 1024)) // 3
.addLast("handler", new HttpHandler()); // 4
}
})
.option(ChannelOption.SO_BACKLOG, 128) // determining the number of connections queued
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
b.bind(port).sync();
}
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import static io.netty.handler.codec.http.HttpHeaders.Names.*;
public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> { // 1
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
System.out.println("class:" + msg.getClass().getName());
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.wrappedBuffer("test".getBytes())); // 2
HttpHeaders heads = response.headers();
heads.add(CONTENT_TYPE, "text/plain; charset=UTF-8");
heads.add(CONTENT_LENGTH, response.content().readableBytes()); // 3
heads.add(CONNECTION, "keep-alive");
ctx.write(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete");
super.channelReadComplete(ctx);
ctx.flush(); // 4
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught");
if (null != cause) cause.printStackTrace();
if (null != ctx) ctx.close();
}
}
package main
import (
"fmt"
"net"
"time"
)
func main() {
c, err := net.Dial("tcp", "localhost:9004")
if err != nil {
fmt.Println(err)
return
}
go func() {
for {
buf := make([]byte, 100)
n, err := c.Read(buf)
if err != nil {
fmt.Print(err)
return
}
fmt.Println(string(buf[:n]))
}
}()
c.Write([]byte("GET /"))
time.Sleep(200 * time.Millisecond)
c.Write([]byte(" HTTP/1.1\r\n"))
c.Write([]byte("Host: localhost:9004\r\n"))
time.Sleep(5 * time.Second)
c.Write([]byte("Accept: */*\r\n"))
time.Sleep(200 * time.Millisecond)
c.Write([]byte("Content-Length: 0\r\n"))
c.Write([]byte("\r\n"))
time.Sleep(500 * time.Millisecond)
time.Sleep(5 * time.Second)
}
啟動服務端。
啟動客戶端?;騮elnet手動進行http協(xié)議文本拼接。
執(zhí)行結(jié)果
由執(zhí)行結(jié)果可看出:將http協(xié)議報文隨意拆分上傳,中間有不同程度的sleep,服務端仍然可以正確處理。
分析過程
函數(shù)調(diào)用棧信息:

image.png
代碼注釋:
io.netty.handler.codec.ByteToMessageDecoder
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) { //1. channel接收到io事件,傳入byteBuf??赡苁峭暾膆ttp協(xié)議請求,也可能是不完整;header可能不足行或跨行,即協(xié)議的語義不夠完整。
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
//2. cumulation為null,創(chuàng)建可進行累計的ByteBuf。
if (first) {
cumulation = data;
} else {
if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()
|| cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
expandCumulation(ctx, data.readableBytes());
}
//3. cumulation不為null,即表明當前有不完整的http語義的報文,需要追加當前IO事件讀取到的byteBuf。
//比如:上一個byteBuf中是"GET /", http codec 發(fā)現(xiàn)請求行不完整,需要繼續(xù)等待新IO事件,接收新的byteBuf,進行累加,再次觸發(fā)http codec的解析。
cumulation.writeBytes(data);
data.release();
}
//4. 進行http解碼操作。
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
//5. 如果cumulation不為null且沒有可讀數(shù)據(jù),就釋放cumulation。即當cumulation中沒有不完整的http語義就可以釋放cumulation,有不完整http語義就不能釋放。
//這里需要注意,不完整的http語義不是完整的請求,比如:Host: localhost:9004是完整的一個請求頭,是完整的http語義,但不是完整請求。
//因此http codec 解析后會將key value存到對應的反序列化的message對象中,該cumulation可以釋放了,不需要等待完整請求接收后才釋放數(shù)據(jù)。
if (cumulation != null && !cumulation.isReadable()) {
cumulation.release();
cumulation = null;
}
int size = out.size();
decodeWasNull = size == 0;
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
}
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
io.netty.handler.codec.ReplayingDecoder
@Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
//1. 接收byteBuf,即上文中的cumulation。
replayable.setCumulation(in);
try {
while (in.isReadable()) {
int oldReaderIndex = checkpoint = in.readerIndex();
int outSize = out.size();
S oldState = state;
int oldInputLength = in.readableBytes();
try {
//2. 發(fā)起codec解析
decode(ctx, replayable, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes() && oldState == state) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
"data or change its state if it did not decode anything.");
} else {
// Previous data has been discarded or caused state transition.
// Probably it is reading on.
continue;
}
}
} catch (Signal replay) {
//3. codec解析中發(fā)現(xiàn)有不完整的http語義,會拋出replay信號異常,用于下一次“重播”
replay.expect(REPLAY);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
// Return to the checkpoint (or oldPosition) and retry. 回退checkpoint,即重置byteBuf讀取指針。
int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
in.readerIndex(checkpoint);
} else {
// Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already.
}
break;
}
if (oldReaderIndex == in.readerIndex() && oldState == state) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
"or change its state if it decoded something.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
io.netty.handler.codec.http.HttpObjectDecoder
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
//1. 根據(jù)當前狀態(tài)嘗試完成當前語義的解析。
switch (state()) {
//2. 接收http請求前,先跳過io buffer中的空白行或者其他控制符。比如在telnet端口后不斷進行換行回車操作,該邏輯進行過濾。
//3. 沒有錯誤,更新state為READ_INITIAL,更新讀取指針為最新。
case SKIP_CONTROL_CHARS: {
try {
skipControlCharacters(buffer);
checkpoint(State.READ_INITIAL);
} finally {
checkpoint();
}
// fall-through
}
//3. 上一個case沒有break,fallthrough。
//解析請求行信息。如果滿足 METHOD URL VERSION的格式,則初始化請求對象,便于后續(xù)數(shù)據(jù)的反序列化,更新state為READ_HEADER,更新讀取指針為最新。
//如果請求行不完整,lineParser.parse(buffer)會拋出REPLAY異常
case READ_INITIAL: try {
String[] initialLine = splitInitialLine(lineParser.parse(buffer));
if (initialLine.length < 3) {
// Invalid initial line - ignore.
checkpoint(State.SKIP_CONTROL_CHARS);
return;
}
message = createMessage(initialLine);
checkpoint(State.READ_HEADER);
// fall-through
} catch (Exception e) {
out.add(invalidMessage(e));
return;
}
//4. readHeaders負責解析header,遇到兩次連續(xù)\r\n,認為header結(jié)束,更新state狀態(tài)為下一個狀態(tài)。
//根據(jù)body長度下一個狀態(tài)可能是READ_CHUNK_SIZE、READ_FIXED_LENGTH_CONTENT。
//如果header數(shù)據(jù)是不完整的語義,則會拋出REPLAY異常,狀態(tài)保持不變,等待新IO事件,再次進入此邏輯。
case READ_HEADER: try {
State nextState = readHeaders(buffer);
checkpoint(nextState);
switch (nextState) {
case SKIP_CONTROL_CHARS:
// fast-path
// No content is expected.
out.add(message);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
reset();
return;
case READ_CHUNK_SIZE:
if (!chunkedSupported) {
throw new IllegalArgumentException("Chunked messages not supported");
}
// Chunked encoding - generate HttpMessage first. HttpChunks will follow.
out.add(message);
return;
default:
//5. 如果body長度為空,直接返回發(fā)序列化好的message對象。
long contentLength = contentLength();
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
out.add(message);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
reset();
return;
}
...略
}
readHeaders
private State readHeaders(ByteBuf buffer) {
final HttpMessage message = this.message;
final HttpHeaders headers = message.headers();
//1. 不斷循環(huán)解析header。
//如果行數(shù)據(jù)長度大于0,完成key value拆分,則解析下一行數(shù)據(jù);
//如果行數(shù)據(jù)長度等于0,退出循環(huán)。認為header已經(jīng)全部接收處理。
//如果行數(shù)據(jù)不是完整的語義,在headerParser.parse(buffer)中會拋出REPLAY異常。
AppendableCharSequence line = headerParser.parse(buffer);
if (line.length() > 0) {
do {
char firstChar = line.charAt(0);
if (name != null && (firstChar == ' ' || firstChar == '\t')) {
StringBuilder buf = new StringBuilder(value.length() + line.length() + 1);
buf.append(value);
buf.append(' ');
buf.append(line.toString().trim());
value = buf.toString();
} else {
if (name != null) {
headers.add(name, value);
}
splitHeader(line);
}
line = headerParser.parse(buffer);
} while (line.length() > 0);
}
// Add the last header.
if (name != null) {
headers.add(name, value);
}
// reset name and value fields
name = null;
value = null;
//2. 解析完所有header,判斷body狀態(tài)。
State nextState;
if (isContentAlwaysEmpty(message)) {
HttpHeaders.removeTransferEncodingChunked(message);
nextState = State.SKIP_CONTROL_CHARS;
} else if (HttpHeaders.isTransferEncodingChunked(message)) {
nextState = State.READ_CHUNK_SIZE;
} else if (contentLength() >= 0) {
nextState = State.READ_FIXED_LENGTH_CONTENT;
} else {
nextState = State.READ_VARIABLE_LENGTH_CONTENT;
}
return nextState;
}
parser的全部邏輯
public AppendableCharSequence parse(ByteBuf buffer) {
seq.reset();
//1. 遍歷buf每個字節(jié),直到遇到\r\n,并返回索引值。如果沒有拋出異常,則更新readerIndex為下一個索引。即保證從下一個header開始讀取。
int i = buffer.forEachByte(this);
buffer.readerIndex(i + 1);
// Call checkpoint to make sure the readerIndex is updated correctly
checkpoint();
return seq;
}
@Override
public int forEachByte(ByteBufProcessor processor) {
//2. 索引值如果<0,即沒有查找到\r\n,拋出REPLAY異常。
int ret = buffer.forEachByte(processor);
if (ret < 0) {
throw REPLAY;
} else {
return ret;
}
}
@Override
public int forEachByte(ByteBufProcessor processor) {
int index = readerIndex;
int length = writerIndex - index;
ensureAccessible();
//3. length是當前待讀取的長度。
return forEachByteAsc0(index, length, processor);
}
private int forEachByteAsc0(int index, int length, ByteBufProcessor processor) {
if (processor == null) {
throw new NullPointerException("processor");
}
//4. 待讀取長度為0,無可讀取數(shù)據(jù),返回-1。如果是http協(xié)議的空白行,\r\n,長度是2。不會滿足當前條件。
if (length == 0) {
return -1;
}
final int endIndex = index + length;
int i = index;
try {
do {
//5. 循環(huán)遍歷每個字節(jié)。不是目標結(jié)束符,index++,判斷下一個。
if (processor.process(_getByte(i))) {
i ++;
} else {
return i;
}
} while (i < endIndex);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
return -1;
}
@Override
public boolean process(byte value) throws Exception {
char nextByte = (char) value;
if (nextByte == HttpConstants.CR) {
return true;
}
//6. 讀到\n行終止符,返回false,終止上層遍歷。
if (nextByte == HttpConstants.LF) {
return false;
}
if (size >= maxLength) {
// TODO: Respond with Bad Request and discard the traffic
// or close the connection.
// No need to notify the upstream handlers - just log.
// If decoding a response, just throw an exception.
throw newException(maxLength);
}
size ++;
seq.append(nextByte);
return true;
}
圖示說明

http parser design.png