Tomcat請(qǐng)求處理流程

上一篇說(shuō)到了Tomcat的啟動(dòng)流程,那么請(qǐng)求處理的流程是如何完成的呢?

Connector接收請(qǐng)求

Connector是請(qǐng)求接收器,通過(guò)設(shè)置的協(xié)議處理器處理請(qǐng)求,以默認(rèn)的Http11NioProtocol協(xié)議為例,Http11NioProtocol構(gòu)造時(shí)創(chuàng)建了NioEndpoint,用來(lái)處理網(wǎng)絡(luò)請(qǐng)求:

public Http11NioProtocol() {
        super(new NioEndpoint());
    }

Connector啟動(dòng)時(shí),啟動(dòng)了protocolHandler(Http11NioProtocol), protocolHandler啟動(dòng)的時(shí)候又對(duì)endpoint進(jìn)行了啟動(dòng),NioEndpoint啟動(dòng)過(guò)程中綁定了本地端口并在Acceptor線(xiàn)程中監(jiān)聽(tīng)網(wǎng)絡(luò)連接(詳情可見(jiàn)Tomcat啟動(dòng)流程),所以Tomcat接收網(wǎng)絡(luò)請(qǐng)求的起點(diǎn)是從Accetpor線(xiàn)程開(kāi)始的。
Acceptor中會(huì)循環(huán)阻塞調(diào)用serverSock.accept()來(lái)監(jiān)聽(tīng)請(qǐng)求:

......
try {
                    // Accept the next incoming connection from the server
                    // socket
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    // We didn't get a socket
                    endpoint.countDownConnection();
                    if (endpoint.isRunning()) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
// Successful accept, reset the error delay
                errorDelay = 0;

                // Configure the socket
                if (endpoint.isRunning() && !endpoint.isPaused()) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else {
                    endpoint.destroySocket(socket);
                }
......

接收到請(qǐng)求后,通過(guò)setSocketOptions來(lái)處理請(qǐng)求:

protected boolean setSocketOptions(SocketChannel socket) {
        NioSocketWrapper socketWrapper = null;
        try {
            // Allocate channel and wrapper
            NioChannel channel = null;
            if (nioChannels != null) {
                channel = nioChannels.pop();
            }
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(bufhandler);
                }
            }
            NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
            channel.reset(socket, newWrapper);
            connections.put(socket, newWrapper);
            socketWrapper = newWrapper;

            // Set socket properties
            // Disable blocking, polling will be used
            socket.configureBlocking(false);
            socketProperties.setProperties(socket.socket());

            socketWrapper.setReadTimeout(getConnectionTimeout());
            socketWrapper.setWriteTimeout(getConnectionTimeout());
            socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            socketWrapper.setSecure(isSSLEnabled());
            poller.register(channel, socketWrapper);
            return true;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error(sm.getString("endpoint.socketOptionsError"), t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            if (socketWrapper == null) {
                destroySocket(socket);
            }
        }
        // Tell to close the socket if needed
        return false;
    }

代碼比較長(zhǎng),最關(guān)鍵的代碼是poller.register(channel, socketWrapper),將請(qǐng)求進(jìn)來(lái)的socket包裝后注冊(cè)到poller中等待讀寫(xiě)事件,這個(gè)地方的socket是非阻塞的,即可以用一個(gè)poller線(xiàn)程處理大量的請(qǐng)求連接,提高系統(tǒng)吞吐量。
接下來(lái)來(lái)到Poller類(lèi):

public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
            socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            PollerEvent r = null;
            if (eventCache != null) {
                r = eventCache.pop();
            }
            if (r == null) {
                r = new PollerEvent(socket, OP_REGISTER);
            } else {
                r.reset(socket, OP_REGISTER);
            }
            addEvent(r);
        }

Poller線(xiàn)程run方法:

@Override
        public void run() {
            // Loop until destroy() is called
            while (true) {

                boolean hasEvents = false;

                try {
                    if (!close) {
                        hasEvents = events();
                       ...
                            keyCount = selector.select(selectorTimeout);
                        ...
                        wakeupCounter.set(0);
                    }
                   ...
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
                    continue;
                }
                // Either we timed out or we woke up, process events first
                if (keyCount == 0) {
                    hasEvents = (hasEvents | events());
                }

                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    processKey(sk, socketWrapper);
                }

代碼進(jìn)行了一些刪減,只保留了主要流程:
1.Poller注冊(cè)socket
Poller將連接上的socket設(shè)置interestOps(SelectionKey.OP_READ),包裝成PollerEvent,interestOps為OP_REGISTER,放入隊(duì)列中。
2.Poller線(xiàn)程從隊(duì)列中取事件,執(zhí)行run方法:

if (interestOps == OP_REGISTER) {
                try {
                    socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(), SelectionKey.OP_READ, socket.getSocketWrapper());
                } catch (Exception x) {
                    log.error(sm.getString("endpoint.nio.registerFail"), x);
                }
            } 

將socket注冊(cè)到Poller的Selector中,ops為SelectionKey.OP_READ,等待對(duì)端發(fā)送數(shù)據(jù)。
3.然后在socket有數(shù)據(jù)可讀時(shí),通過(guò)processKey(sk, socketWrapper)來(lái)進(jìn)行處理。繼續(xù)跟進(jìn)processKey,最終將socket封裝成socketProcessor提交到Executor處理。

public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<S> sc = null;
            if (processorCache != null) {
                sc = processorCache.pop();
            }
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
......

SocketProcessor實(shí)現(xiàn)了Runnable接口,在線(xiàn)程池中執(zhí)行SocketProcessor.doRun,忽略可能的TLS握手到流程,將通過(guò)連接處理器ConnectionHandler來(lái)處理請(qǐng)求:

getHandler().process(socketWrapper, SocketEvent.OPEN_READ);

ConnectHandler又通過(guò)Http11Processor來(lái)處理

if (processor == null) {
                    processor = getProtocol().createProcessor();
                    register(processor);
                }

               ...
                // Associate the processor with the connection
                wrapper.setCurrentProcessor(processor);

                SocketState state = SocketState.CLOSED;
                do {
                    state = processor.process(wrapper, status);

Http11Processor中會(huì)根據(jù)Http協(xié)議來(lái)解析數(shù)據(jù),封裝成request、response模型,并通過(guò)適配器轉(zhuǎn)給Container,然后Connector的任務(wù)就完成了,接下來(lái)的操作由Container來(lái)進(jìn)行。

getAdapter().service(request, response);

Adapter的處理

Connect的主要任務(wù)是接收請(qǐng)求,并在有數(shù)據(jù)讀寫(xiě)時(shí)在線(xiàn)程池中根據(jù)使用的協(xié)議來(lái)解析數(shù)據(jù)并封裝成request、response,然后交給Adapter來(lái)處理。
Adapter的實(shí)現(xiàn)類(lèi)是CoyoteAdapter,顧名思義是一個(gè)適配器,將Connector連接器讀取的數(shù)據(jù)適配到Container容器來(lái)處理。
Adapter主要有兩個(gè)任務(wù)map和轉(zhuǎn)發(fā)。
map在postParseRequest方法中進(jìn)行:

connector.getService().getMapper().map(serverName, decodedURI,
                    version, request.getMappingData());

根據(jù)請(qǐng)求到uri找到目標(biāo)Host、Context、Wrapper。
Mapper對(duì)象存在service中,通過(guò)MapperListener來(lái)監(jiān)聽(tīng)Container到生命周期,動(dòng)態(tài)調(diào)整Mapper中的Container。
通過(guò)Mapper找到目標(biāo)Container后,就可以轉(zhuǎn)發(fā)給Container來(lái)處理了:

connector.getService().getContainer().getPipeline().getFirst().invoke(
                        request, response);

Container的處理流程

Adapter傳給Container的操作是將request、response參數(shù)交給engine的pipeline處理。每一個(gè)級(jí)別的Container都維護(hù)了一個(gè)pipeline,用來(lái)處理請(qǐng)求,這是典型的流水線(xiàn)模式,pipeline中維護(hù)了一個(gè)Valve鏈表,請(qǐng)求在pipeline中處理的過(guò)程就是從第一個(gè)Valve處理到最后一個(gè),Valve可以方便的進(jìn)行配置來(lái)實(shí)現(xiàn)各層容器的擴(kuò)展功能,每個(gè)pipeline的最后一個(gè)Valve是"basic",完成基本的邏輯處理,如Engine的basic將調(diào)用傳給下一級(jí)的Host的pipeline,Host的basic將調(diào)用傳給下一級(jí)的Context的pipeline,Context的basic將調(diào)用傳給下一級(jí)的Wrapper的pipeline,如Engine的basic StandardEngineValve:

public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // Select the Host to be used for this Request
        Host host = request.getHost();
        if (host == null) {
            // HTTP 0.9 or HTTP 1.0 request without a host when no default host
            // is defined. This is handled by the CoyoteAdapter.
            return;
        }
        if (request.isAsyncSupported()) {
            request.setAsyncSupported(host.getPipeline().isAsyncSupported());
        }

        // Ask this Host to process this request
        host.getPipeline().getFirst().invoke(request, response);
    }

而Wrapper作為最底層的Container,basic完成最終Servlet的加載、初始化,并組裝filterChain進(jìn)行調(diào)用:
standardWrapperValve.invoke部分代碼

...
if (!unavailable) {
  servlet = wrapper.allocate();
}
...
// Create the filter chain for this request
ApplicationFilterChain filterChain =
  ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);
...
filterChain.doFilter(request.getRequest(), response.getResponse());
...

那么什么時(shí)候調(diào)用了Servlet的service來(lái)處理請(qǐng)求呢?在filter鏈的調(diào)用過(guò)程中,鏈中所有的filter處理完成后,后執(zhí)行Servlet.service來(lái)處理請(qǐng)求:

private void internalDoFilter(ServletRequest request,
                                  ServletResponse response)
        throws IOException, ServletException {

        // Call the next filter if there is one
        if (pos < n) {
            ...
            return;
        }

        // We fell off the end of the chain -- call the servlet instance
        //chain處理完成后,執(zhí)行servlet.service
        try {
           ...
            // Use potentially wrapped request from this point
            if ((request instanceof HttpServletRequest) &&
                    (response instanceof HttpServletResponse) &&
                    Globals.IS_SECURITY_ENABLED ) {
               ...
            } else {
                servlet.service(request, response);
            }

至此,tomcat已經(jīng)完成了從接受連接請(qǐng)求到找到目標(biāo)servlet來(lái)處理請(qǐng)求的流程,實(shí)現(xiàn)了作為Servlet容器的功能。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容