前言
上一篇文章講到Poller處理完之后,交給SocketProcessor執(zhí)行處理,這篇就詳細(xì)記錄下這個(gè)處理過程。
SocketProcessor
SocketProcessor實(shí)現(xiàn)Runnable接口,對(duì)外暴露run()方法,內(nèi)部封裝doRun()。
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
super(socketWrapper, event);
}
@Override
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
// 這里的 handshake 是用來標(biāo)記https的握手完成情況,
// 如果是http不需要該握手階段,在從c1處直接置為0
int handshake = -1;
try {
if (key != null) {
if (socket.isHandshakeComplete()) { // c1
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// 如果不能完成TLS握手過程,標(biāo)記握手失敗
handshake = -1;
} else {
// 處理https的SecureNioChannel覆寫了該hanshake()方法
// 返回注冊(cè)的SelectionKey
handshake = socket.handshake(key.isReadable(), key.isWritable());
event = SocketEvent.OPEN_READ;
}
}
} catch (IOException x) {
...
}
if (handshake == 0) { // 握手完成
SocketState state = SocketState.OPEN; // 標(biāo)記Socket狀態(tài)
// 處理該Sockt里的請(qǐng)求
if (event == null) {
// c2
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key); // 否則關(guān)閉通道
}
} else if (handshake == -1) { // 握手失敗則關(guān)閉通道
close(socket, key);
} else if (handshake == SelectionKey.OP_READ) { // TLS會(huì)走到這里
socketWrapper.registerReadInterest(); // 注冊(cè)讀就緒
} else if (handshake == SelectionKey.OP_WRITE) {
socketWrapper.registerWriteInterest(); // 注冊(cè)寫就緒
}
} catch (CancelledKeyException cx) {
... // 處理異常
} finally {
socketWrapper = null;
event = null;
if (running && !paused) {
processorCache.push(this); // 將SocketProcessor放回緩存中
}
}
}
}
在c2處,該方法將包裝好的socketWrapper繼續(xù)傳下去,來看下調(diào)用棧,最終在Http11Processor的service方法中處理,而這個(gè)Http11Processor正是Processor的一個(gè)實(shí)現(xiàn)類,用來真正解析流。

Http11Processor
先看一下Http11Processor的構(gòu)造方法。首先調(diào)用抽象父類的構(gòu)造方法,new了一個(gè)Request和Response,用于接收解析結(jié)果和Servlet處理完的內(nèi)容,值得注意的是這里的Request和Response是org.apache.coyote包下的,與org.apache.catalina.connector包下的Request和Response不同,拿Request來說,前者是服務(wù)器請(qǐng)求的一種底層,但高效的表示,同時(shí)它也是GC-free,并且將消耗計(jì)算資源的操作作了延遲處理,可以說是按需再取。說它底層且高效是因?yàn)樗侵苯硬僮鲾?shù)據(jù)流,轉(zhuǎn)換成需要的信息,所以它不適合用戶代碼。而后者是Coyote Request的包裝,為用戶(也就是servlet)提供了一個(gè)高級(jí)視圖(外觀模式)。
并且在c3處可以看到傳入了一個(gè)Adapter,這個(gè)Adapter則是基于coyote的servlet容器中的入口點(diǎn)。也就是說其作用是Connector與Container之間的橋梁。
public Http11Processor(AbstractHttp11Protocol<?> protocol, Adapter adapter) {
super(adapter);
this.protocol = protocol; // protocol
httpParser = new HttpParser(protocol.getRelaxedPathChars(),
protocol.getRelaxedQueryChars());
inputBuffer = new Http11InputBuffer(request, protocol.getMaxHttpHeaderSize(),
protocol.getRejectIllegalHeaderName(), httpParser);
request.setInputBuffer(inputBuffer); // 輸入緩沖,用于解析請(qǐng)求頭及傳輸編碼
outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpHeaderSize());
response.setOutputBuffer(outputBuffer); // 輸出緩沖,寫headers和response主題
inputBuffer.addFilter(new IdentityInputFilter(protocol.getMaxSwallowSize()));
outputBuffer.addFilter(new IdentityOutputFilter()); // 添加身份認(rèn)證過濾器
inputBuffer.addFilter(new ChunkedInputFilter(protocol.getMaxTrailerSize(),
protocol.getAllowedTrailerHeadersInternal(), protocol.getMaxExtensionSize(),
protocol.getMaxSwallowSize()));
outputBuffer.addFilter(new ChunkedOutputFilter()); // 添加分塊過濾器
// Void輸入輸出過濾器,在嘗試讀取時(shí)返回-1。 與GET,HEAD或類似請(qǐng)求一起使用。
inputBuffer.addFilter(new VoidInputFilter());
outputBuffer.addFilter(new VoidOutputFilter());
// 輸入過濾器負(fù)責(zé)讀取和緩沖請(qǐng)求體,以便它不會(huì)干擾客戶端SSL握手消息。
inputBuffer.addFilter(new BufferedInputFilter());
//inputBuffer.addFilter(new GzipInputFilter());
outputBuffer.addFilter(new GzipOutputFilter()); // Gzip
pluggableFilterIndex = inputBuffer.getFilters().length; // 標(biāo)記過濾器數(shù)量
}
public AbstractProcessor(Adapter adapter) {
this(adapter, new Request(), new Response()); // new了一個(gè)Request、Response
}
protected AbstractProcessor(Adapter adapter, Request coyoteRequest, Response coyoteResponse) {
this.adapter = adapter; // Adapter c3
asyncStateMachine = new AsyncStateMachine(this);
request = coyoteRequest;
response = coyoteResponse;
response.setHook(this);
request.setResponse(response);
request.setHook(this);
userDataHelper = new UserDataHelper(getLog());
}
看完基本的構(gòu)造方法,大致能了解這個(gè)Http11Processor的功能便是解析流,然后封裝成Request,最后給Servlet做處理。下面來看下調(diào)用棧中提到的service方法??梢钥吹教幚韘ocket分為幾個(gè)階段,解析——>準(zhǔn)備——>服務(wù)——>結(jié)束輸入、輸出——>KEEPALIVE——>結(jié)束,當(dāng)然如果對(duì)于keepalive的請(qǐng)求來說,會(huì)在前5個(gè)階段循環(huán)。前四個(gè)階段表示了一個(gè)完成的請(qǐng)求處理及響應(yīng)。在c4處可以看到獲取了Adapter,而這個(gè)Adapter的實(shí)現(xiàn)類為CoyoteAdapter,這個(gè)Adapter將Request和Reponse的處理委托給servlet。
@Override
public SocketState service(SocketWrapperBase<?> socketWrapper)
throws IOException {
// 包含有關(guān)請(qǐng)求處理的統(tǒng)計(jì)、管理信息。
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // 解析階段
setSocketWrapper(socketWrapper); // 初始化I/O設(shè)置,
inputBuffer.init(socketWrapper);
outputBuffer.init(socketWrapper);
keepAlive = true; // 標(biāo)志位
openSocket = false;
readComplete = true;
boolean keptAlive = false;
SendfileState sendfileState = SendfileState.DONE;
while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !protocol.isPaused()) {
try {
// 從buffer中讀取并解析請(qǐng)求基本信息,比如方法(GET/POST/PUT等等)、URI、協(xié)議等等
if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(), protocol.getKeepAliveTimeout())) {
if (inputBuffer.getParsingRequestLinePhase() == -1) {
return SocketState.UPGRADING;
} else if (handleIncompleteRequestLineRead()) {
break;
}
}
if (protocol.isPaused()) {
response.setStatus(503); // 如果協(xié)議解析服務(wù)被暫停,則返回503
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// 每次更新header屬性的數(shù)量限制(可以通過JMX修改)
request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());
// 解析Request Headers
if (!inputBuffer.parseHeaders()) {
openSocket = true;
readComplete = false;
break;
}
if (!protocol.getDisableUploadTimeout()) { socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());
}
}
} catch (IOException e) {
... // 處理異常
}
Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection"); // 從Header里面取到Connection屬性
boolean foundUpgrade = false;
while (connectionValues.hasMoreElements() && !foundUpgrade) {
foundUpgrade = connectionValues.nextElement().toLowerCase(
Locale.ENGLISH).contains("upgrade");
}
if (foundUpgrade) { // 如果包含upgrade,表示協(xié)議需要升級(jí)
String requestedProtocol = request.getHeader("Upgrade"); // 獲取升級(jí)協(xié)議
UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol); // 包裝成UpgradeProtocol
if (upgradeProtocol != null) {
if (upgradeProtocol.accept(request)) {
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
response.setHeader("Connection", "Upgrade"); // 設(shè)置響應(yīng)體
response.setHeader("Upgrade", requestedProtocol);
action(ActionCode.CLOSE, null);
getAdapter().log(request, response, 0);
InternalHttpUpgradeHandler upgradeHandler =
upgradeProtocol.getInternalUpgradeHandler(
socketWrapper, getAdapter(), cloneRequest(request));
UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);
action(ActionCode.UPGRADE, upgradeToken);
return SocketState.UPGRADING;
}
}
}
if (getErrorState().isIoAllowed()) {
// 準(zhǔn)備階段
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
// 主要設(shè)置過濾器,并且解析部分headers
// 比如檢查是否是keepAlive、如果是http/1.1,判斷是否包含Expect:100-continue
// (用于客戶端在發(fā)送POST數(shù)據(jù)給服務(wù)器前,征詢服務(wù)器情況,
// 看服務(wù)器是否處理POST的數(shù)據(jù),常用于大文件post,例如文件上傳)、
// 用戶代理user-agent狀況、host等等
prepareRequest();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
}
}
// 如果是長(zhǎng)連接,則需要判斷長(zhǎng)連接是否達(dá)到服務(wù)器上限
int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
if (maxKeepAliveRequests == 1) {
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}
// 通過Adapter交給Container處理請(qǐng)求,并構(gòu)建Response
if (getErrorState().isIoAllowed()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
getAdapter().service(request, response); // c4
if(keepAlive && !getErrorState().isError() && !isAsync() &&
statusDropsConnection(response.getStatus())) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
} catch (InterruptedIOException e) {
... // 處理異常
}
// 結(jié)束請(qǐng)求處理
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
if (!isAsync()) {
// 如果這是異步請(qǐng)求,則請(qǐng)求在完成后結(jié)束。
// 在這種情況下,AsyncContext負(fù)責(zé)調(diào)用endRequest()。
endRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
// 確保設(shè)置錯(cuò)誤碼,且更新請(qǐng)求的統(tǒng)計(jì)計(jì)數(shù)
if (getErrorState().isError()) {
response.setStatus(500);
}
if (!isAsync() || getErrorState().isError()) {
request.updateCounters();
if (getErrorState().isIoAllowed()) {
inputBuffer.nextRequest();
outputBuffer.nextRequest();
}
}
if (!protocol.getDisableUploadTimeout()) {
int connectionTimeout = protocol.getConnectionTimeout();
if(connectionTimeout > 0) {
socketWrapper.setReadTimeout(connectionTimeout);
} else {
socketWrapper.setReadTimeout(0);
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); // 如果是長(zhǎng)連接繼續(xù)循環(huán)
sendfileState = processSendfile(socketWrapper);
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
...
}
Connector的對(duì)請(qǐng)求數(shù)據(jù)流的處理流程基本已經(jīng)完成,接下去便是交由Container組件去做具體的Servlet處理了。在上一篇博客中提到Tomcat為了高效的處理請(qǐng)求流,做了很多包括數(shù)據(jù)結(jié)構(gòu)(輕量級(jí)的同步棧SynchronizedStack),以及延遲的異常處理等努力。接下來可以看下本篇文章代碼中涉及到的一些數(shù)據(jù)結(jié)構(gòu)等。
MessageBytes
我們?cè)谏瞎?jié)中提到org.apache.coyote包下的Request是一種底層且高效的表示。因?yàn)槲覀儚钠鋐inal域中可以看到其類型都是MessageBytes類型的。這個(gè)類用來表示HTTP消息中的字節(jié)數(shù)組,可以代表請(qǐng)求/響應(yīng)中的全部元素。且字節(jié)、字符之間的轉(zhuǎn)換都是延遲且有緩存機(jī)制,并且是可以回收的。
該對(duì)象可以表示byte [],char []或(子)String。也就是說,在接受到socket傳入的字節(jié)之后并不會(huì)馬上進(jìn)行編碼轉(zhuǎn)換,而是保持byte[]的方式,在用到的時(shí)候再進(jìn)行轉(zhuǎn)換。
private int type = T_NULL;
public static final int T_NULL = 0; // 表示空消息
public static final int T_STR = 1; // 表示字符串類型
public static final int T_BYTES = 2; // 表示字節(jié)數(shù)組類型
public static final int T_CHARS = 3; // 表示字符數(shù)組類型
另外擁有一個(gè)ByteChunk類型和一個(gè)CharChunk類型的對(duì)象,這兩個(gè)類可以看作是ByteBuffer的擴(kuò)展,提供對(duì)字節(jié)/字符數(shù)組的操作。
private final ByteChunk byteC = new ByteChunk();
private final CharChunk charC = new CharChunk();
在Http11Processor的service方法處理請(qǐng)求數(shù)據(jù)的時(shí)候,經(jīng)常用到MessageBytes的setBytes()方法,該方法將指定的字節(jié)數(shù)組存入緩沖區(qū)byteC里面。
public void setBytes(byte[] b, int off, int len) {
byteC.setBytes(b, off, len);
type = T_BYTES;
hasStrValue = false;
hasHashCode = false;
hasLongValue = false;
}
既然說MessageBytes提供了一個(gè)延遲機(jī)制,在沒有轉(zhuǎn)換時(shí),請(qǐng)求數(shù)據(jù)一直以字節(jié)形式存儲(chǔ),直到調(diào)用toString()才去轉(zhuǎn)換為字符串。
@Override
public String toString() {
if (hasStrValue) {
return strValue;
}
switch (type) {
case T_CHARS:
strValue = charC.toString(); // 字符
hasStrValue = true;
return strValue;
case T_BYTES:
strValue = byteC.toString(); // 字節(jié)
hasStrValue = true;
return strValue;
}
return null;
}
以字節(jié)形式為例,調(diào)用了ByteChunk的toString()方法。這個(gè)方法又調(diào)用的StringCache的toString(),這里便用到了緩存機(jī)制,這個(gè)StringCache的toString()大概就是先判斷緩存數(shù)組是否存在,如果不存在,則調(diào)用toStringInternal()方法,轉(zhuǎn)換好了之后新建緩存并放入數(shù)據(jù),如果緩存存在,則先嘗試從緩存中取,取不到的話就調(diào)用ByteChunk的toStringInternal()方法,從調(diào)用棧中也可以看到,代碼比較長(zhǎng)就不貼了。
public String toString() {
if (isNull()) {
return null;
} else if (end - start == 0) {
return "";
}
return StringCache.toString(this);
}

那么這個(gè)toStringInternal做了什么事來看一下,正是根據(jù)偏移量和待提取長(zhǎng)度進(jìn)行編碼提取轉(zhuǎn)換。如果直接使用new String(byte[], int, int, Charset),將會(huì)先copy整個(gè)byte數(shù)組,這很影響性能,所以先通過decode將數(shù)組的一部分編碼成CharBuffer,然后在調(diào)用new String,非常精髓的一個(gè)處理方式,可謂是費(fèi)盡心思地提高web服務(wù)器的性能。
public String toStringInternal() {
if (charset == null) {
charset = DEFAULT_CHARSET;
}
CharBuffer cb = charset.decode(ByteBuffer.wrap(buff, start, end - start));
return new String(cb.array(), cb.arrayOffset(), cb.length());
}