上一篇說(shuō)到了Tomcat的啟動(dòng)流程,那么請(qǐng)求處理的流程是如何完成的呢?
Connector接收請(qǐng)求
Connector是請(qǐng)求接收器,通過(guò)設(shè)置的協(xié)議處理器處理請(qǐng)求,以默認(rèn)的Http11NioProtocol協(xié)議為例,Http11NioProtocol構(gòu)造時(shí)創(chuàng)建了NioEndpoint,用來(lái)處理網(wǎng)絡(luò)請(qǐng)求:
public Http11NioProtocol() {
super(new NioEndpoint());
}
Connector啟動(dòng)時(shí),啟動(dòng)了protocolHandler(Http11NioProtocol), protocolHandler啟動(dòng)的時(shí)候又對(duì)endpoint進(jìn)行了啟動(dòng),NioEndpoint啟動(dòng)過(guò)程中綁定了本地端口并在Acceptor線(xiàn)程中監(jiān)聽(tīng)網(wǎng)絡(luò)連接(詳情可見(jiàn)Tomcat啟動(dòng)流程),所以Tomcat接收網(wǎng)絡(luò)請(qǐng)求的起點(diǎn)是從Accetpor線(xiàn)程開(kāi)始的。
Acceptor中會(huì)循環(huán)阻塞調(diào)用serverSock.accept()來(lái)監(jiān)聽(tīng)請(qǐng)求:
......
try {
// Accept the next incoming connection from the server
// socket
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
......
接收到請(qǐng)求后,通過(guò)setSocketOptions來(lái)處理請(qǐng)求:
protected boolean setSocketOptions(SocketChannel socket) {
NioSocketWrapper socketWrapper = null;
try {
// Allocate channel and wrapper
NioChannel channel = null;
if (nioChannels != null) {
channel = nioChannels.pop();
}
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(bufhandler, selectorPool, this);
} else {
channel = new NioChannel(bufhandler);
}
}
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
channel.reset(socket, newWrapper);
connections.put(socket, newWrapper);
socketWrapper = newWrapper;
// Set socket properties
// Disable blocking, polling will be used
socket.configureBlocking(false);
socketProperties.setProperties(socket.socket());
socketWrapper.setReadTimeout(getConnectionTimeout());
socketWrapper.setWriteTimeout(getConnectionTimeout());
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
socketWrapper.setSecure(isSSLEnabled());
poller.register(channel, socketWrapper);
return true;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
if (socketWrapper == null) {
destroySocket(socket);
}
}
// Tell to close the socket if needed
return false;
}
代碼比較長(zhǎng),最關(guān)鍵的代碼是poller.register(channel, socketWrapper),將請(qǐng)求進(jìn)來(lái)的socket包裝后注冊(cè)到poller中等待讀寫(xiě)事件,這個(gè)地方的socket是非阻塞的,即可以用一個(gè)poller線(xiàn)程處理大量的請(qǐng)求連接,提高系統(tǒng)吞吐量。
接下來(lái)來(lái)到Poller類(lèi):
public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent r = null;
if (eventCache != null) {
r = eventCache.pop();
}
if (r == null) {
r = new PollerEvent(socket, OP_REGISTER);
} else {
r.reset(socket, OP_REGISTER);
}
addEvent(r);
}
Poller線(xiàn)程run方法:
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
...
keyCount = selector.select(selectorTimeout);
...
wakeupCounter.set(0);
}
...
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
// Either we timed out or we woke up, process events first
if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
processKey(sk, socketWrapper);
}
代碼進(jìn)行了一些刪減,只保留了主要流程:
1.Poller注冊(cè)socket
Poller將連接上的socket設(shè)置interestOps(SelectionKey.OP_READ),包裝成PollerEvent,interestOps為OP_REGISTER,放入隊(duì)列中。
2.Poller線(xiàn)程從隊(duì)列中取事件,執(zhí)行run方法:
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(), SelectionKey.OP_READ, socket.getSocketWrapper());
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
}
將socket注冊(cè)到Poller的Selector中,ops為SelectionKey.OP_READ,等待對(duì)端發(fā)送數(shù)據(jù)。
3.然后在socket有數(shù)據(jù)可讀時(shí),通過(guò)processKey(sk, socketWrapper)來(lái)進(jìn)行處理。繼續(xù)跟進(jìn)processKey,最終將socket封裝成socketProcessor提交到Executor處理。
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
......
SocketProcessor實(shí)現(xiàn)了Runnable接口,在線(xiàn)程池中執(zhí)行SocketProcessor.doRun,忽略可能的TLS握手到流程,將通過(guò)連接處理器ConnectionHandler來(lái)處理請(qǐng)求:
getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
ConnectHandler又通過(guò)Http11Processor來(lái)處理
if (processor == null) {
processor = getProtocol().createProcessor();
register(processor);
}
...
// Associate the processor with the connection
wrapper.setCurrentProcessor(processor);
SocketState state = SocketState.CLOSED;
do {
state = processor.process(wrapper, status);
Http11Processor中會(huì)根據(jù)Http協(xié)議來(lái)解析數(shù)據(jù),封裝成request、response模型,并通過(guò)適配器轉(zhuǎn)給Container,然后Connector的任務(wù)就完成了,接下來(lái)的操作由Container來(lái)進(jìn)行。
getAdapter().service(request, response);
Adapter的處理
Connect的主要任務(wù)是接收請(qǐng)求,并在有數(shù)據(jù)讀寫(xiě)時(shí)在線(xiàn)程池中根據(jù)使用的協(xié)議來(lái)解析數(shù)據(jù)并封裝成request、response,然后交給Adapter來(lái)處理。
Adapter的實(shí)現(xiàn)類(lèi)是CoyoteAdapter,顧名思義是一個(gè)適配器,將Connector連接器讀取的數(shù)據(jù)適配到Container容器來(lái)處理。
Adapter主要有兩個(gè)任務(wù)map和轉(zhuǎn)發(fā)。
map在postParseRequest方法中進(jìn)行:
connector.getService().getMapper().map(serverName, decodedURI,
version, request.getMappingData());
根據(jù)請(qǐng)求到uri找到目標(biāo)Host、Context、Wrapper。
Mapper對(duì)象存在service中,通過(guò)MapperListener來(lái)監(jiān)聽(tīng)Container到生命周期,動(dòng)態(tài)調(diào)整Mapper中的Container。
通過(guò)Mapper找到目標(biāo)Container后,就可以轉(zhuǎn)發(fā)給Container來(lái)處理了:
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
Container的處理流程
Adapter傳給Container的操作是將request、response參數(shù)交給engine的pipeline處理。每一個(gè)級(jí)別的Container都維護(hù)了一個(gè)pipeline,用來(lái)處理請(qǐng)求,這是典型的流水線(xiàn)模式,pipeline中維護(hù)了一個(gè)Valve鏈表,請(qǐng)求在pipeline中處理的過(guò)程就是從第一個(gè)Valve處理到最后一個(gè),Valve可以方便的進(jìn)行配置來(lái)實(shí)現(xiàn)各層容器的擴(kuò)展功能,每個(gè)pipeline的最后一個(gè)Valve是"basic",完成基本的邏輯處理,如Engine的basic將調(diào)用傳給下一級(jí)的Host的pipeline,Host的basic將調(diào)用傳給下一級(jí)的Context的pipeline,Context的basic將調(diào)用傳給下一級(jí)的Wrapper的pipeline,如Engine的basic StandardEngineValve:
public final void invoke(Request request, Response response)
throws IOException, ServletException {
// Select the Host to be used for this Request
Host host = request.getHost();
if (host == null) {
// HTTP 0.9 or HTTP 1.0 request without a host when no default host
// is defined. This is handled by the CoyoteAdapter.
return;
}
if (request.isAsyncSupported()) {
request.setAsyncSupported(host.getPipeline().isAsyncSupported());
}
// Ask this Host to process this request
host.getPipeline().getFirst().invoke(request, response);
}
而Wrapper作為最底層的Container,basic完成最終Servlet的加載、初始化,并組裝filterChain進(jìn)行調(diào)用:
standardWrapperValve.invoke部分代碼
...
if (!unavailable) {
servlet = wrapper.allocate();
}
...
// Create the filter chain for this request
ApplicationFilterChain filterChain =
ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);
...
filterChain.doFilter(request.getRequest(), response.getResponse());
...
那么什么時(shí)候調(diào)用了Servlet的service來(lái)處理請(qǐng)求呢?在filter鏈的調(diào)用過(guò)程中,鏈中所有的filter處理完成后,后執(zhí)行Servlet.service來(lái)處理請(qǐng)求:
private void internalDoFilter(ServletRequest request,
ServletResponse response)
throws IOException, ServletException {
// Call the next filter if there is one
if (pos < n) {
...
return;
}
// We fell off the end of the chain -- call the servlet instance
//chain處理完成后,執(zhí)行servlet.service
try {
...
// Use potentially wrapped request from this point
if ((request instanceof HttpServletRequest) &&
(response instanceof HttpServletResponse) &&
Globals.IS_SECURITY_ENABLED ) {
...
} else {
servlet.service(request, response);
}
至此,tomcat已經(jīng)完成了從接受連接請(qǐng)求到找到目標(biāo)servlet來(lái)處理請(qǐng)求的流程,實(shí)現(xiàn)了作為Servlet容器的功能。