Tomcat請求處理分析(一) - 從套接字到容器組件

在分析了Tomcat的啟動過程和各個組件后,本文開始分析Tomcat是如何處理請求的。讓我們回到Tomcat啟動分析(六)文章的末尾,在AbstractEndPoint類的processSocket方法中,工作線程池執(zhí)行的任務(wù)是一個SocketProcessorBase,本文從SocketProcessorBase開始分析。

SocketProcessorBase類

SocketProcessorBase是一個實現(xiàn)了Runnable接口的抽象類,run方法調(diào)用了doRun抽象方法:

public abstract class SocketProcessorBase<S> implements Runnable {
    protected SocketWrapperBase<S> socketWrapper;
    protected SocketEvent event;

    public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
        reset(socketWrapper, event);
    }

    public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
        Objects.requireNonNull(event);
        this.socketWrapper = socketWrapper;
        this.event = event;
    }

    @Override
    public final void run() {
        synchronized (socketWrapper) {
            // It is possible that processing may be triggered for read and
            // write at the same time. The sync above makes sure that processing
            // does not occur in parallel. The test below ensures that if the
            // first event to be processed results in the socket being closed,
            // the subsequent events are not processed.
            if (socketWrapper.isClosed()) {
                return;
            }
            doRun();
        }
    }

    protected abstract void doRun();
}

NioEndPoint的createSocketProcessor函數(shù)返回的SocketProcessor是NioEndPoint類的內(nèi)部類,繼承了SocketProcessorBase抽象類,實現(xiàn)了doRun方法。該類的注釋說明SocketProcessor與Worker的作用等價。

/**
 * This class is the equivalent of the Worker, but will simply use in an
 * external Executor thread pool.
 */
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {

    public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
        super(socketWrapper, event);
    }

    @Override
    protected void doRun() {
        NioChannel socket = socketWrapper.getSocket();
        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
        try {
            int handshake = -1;
            try {
                if (key != null) {
                    if (socket.isHandshakeComplete()) {
                        // No TLS handshaking required. Let the handler
                        // process this socket / event combination.
                        handshake = 0;
                    } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                            event == SocketEvent.ERROR) {
                        // Unable to complete the TLS handshake. Treat it as
                        // if the handshake failed.
                        handshake = -1;
                    } else {
                        handshake = socket.handshake(key.isReadable(), key.isWritable());
                        // The handshake process reads/writes from/to the
                        // socket. status may therefore be OPEN_WRITE once
                        // the handshake completes. However, the handshake
                        // happens when the socket is opened so the status
                        // must always be OPEN_READ after it completes. It
                        // is OK to always set this as it is only used if
                        // the handshake completes.
                        event = SocketEvent.OPEN_READ;
                    }
                }
            } catch (IOException x) {
                handshake = -1;
                if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
            } catch (CancelledKeyException ckx) {
                handshake = -1;
            }
            if (handshake == 0) {
                SocketState state = SocketState.OPEN;
                // Process the request from this socket
                if (event == null) {
                    state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                } else {
                    state = getHandler().process(socketWrapper, event);
                }
                if (state == SocketState.CLOSED) {
                    close(socket, key);
                }
            } else if (handshake == -1 ) {
                close(socket, key);
            } else if (handshake == SelectionKey.OP_READ){
                socketWrapper.registerReadInterest();
            } else if (handshake == SelectionKey.OP_WRITE){
                socketWrapper.registerWriteInterest();
            }
        } catch (CancelledKeyException cx) {
            socket.getPoller().cancelledKey(key);
        } catch (VirtualMachineError vme) {
            ExceptionUtils.handleThrowable(vme);
        } catch (Throwable t) {
            log.error("", t);
            socket.getPoller().cancelledKey(key);
        } finally {
            socketWrapper = null;
            event = null;
            //return to cache
            if (running && !paused) {
                processorCache.push(this);
            }
        }
    }
}
  • handshake相關(guān)的變量與函數(shù)都與SSL的握手有關(guān),暫時忽略,NioChannel的handshake函數(shù)直接返回了0;
  • 真正的處理過程在getHandler().process這處調(diào)用。

getHandler函數(shù)定義在AbstractEndPoint類中,相關(guān)代碼如下:

/**
 * Handling of accepted sockets.
 */
private Handler<S> handler = null;
public void setHandler(Handler<S> handler ) { this.handler = handler; }
public Handler<S> getHandler() { return handler; }

Handler是何時被賦值的呢?答案在Http11NioProtocol對象被創(chuàng)建的過程中,調(diào)用父類構(gòu)造函數(shù)時會為端點(diǎn)的Handler賦值:

public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) {
    super(endpoint);
    setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
    ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
    setHandler(cHandler);
    getEndpoint().setHandler(cHandler);
}

ConnectionHandler類

ConnectionHandler是AbstractProtocol類的靜態(tài)內(nèi)部類,上文調(diào)用了該類的process函數(shù),由于代碼較多此處不再展示。由于能力有限,本文只分析最常見情景對應(yīng)的代碼:

  • processor = getProtocol().createProcessor();
    createProcessor是定義在AbstractProtocol中的抽象方法,AbstractHttp11Protocol實現(xiàn)了它:
    @Override
    protected Processor createProcessor() {
        Http11Processor processor = new Http11Processor(getMaxHttpHeaderSize(),
                getAllowHostHeaderMismatch(), getRejectIllegalHeaderName(), getEndpoint(),
                getMaxTrailerSize(), allowedTrailerHeaders, getMaxExtensionSize(),
                getMaxSwallowSize(), httpUpgradeProtocols, getSendReasonPhrase(),
                relaxedPathChars, relaxedQueryChars);
        processor.setAdapter(getAdapter());
        processor.setMaxKeepAliveRequests(getMaxKeepAliveRequests());
        processor.setConnectionUploadTimeout(getConnectionUploadTimeout());
        processor.setDisableUploadTimeout(getDisableUploadTimeout());
        processor.setCompressionMinSize(getCompressionMinSize());
        processor.setCompression(getCompression());
        processor.setNoCompressionUserAgents(getNoCompressionUserAgents());
        processor.setCompressibleMimeTypes(getCompressibleMimeTypes());
        processor.setRestrictedUserAgents(getRestrictedUserAgents());
        processor.setMaxSavePostSize(getMaxSavePostSize());
        processor.setServer(getServer());
        processor.setServerRemoveAppProvidedValues(getServerRemoveAppProvidedValues());
        return processor;
    }
    
  • state = processor.process(wrapper, status);這一行委托給Processor的process函數(shù)繼續(xù)處理請求。

Processor接口

Processor接口用來處理協(xié)議的請求,類層次結(jié)構(gòu)如下圖所示。


Processor類層次結(jié)構(gòu).png

Http11Processor類的process函數(shù)定義在其父類AbstractProcessorLight中,代碼如下,它會接著調(diào)用service抽象方法處理請求。

@Override
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
        throws IOException {
    SocketState state = SocketState.CLOSED;
    Iterator<DispatchType> dispatches = null;
    do {
        if (dispatches != null) {
            DispatchType nextDispatch = dispatches.next();
            state = dispatch(nextDispatch.getSocketStatus());
        } else if (status == SocketEvent.DISCONNECT) {
            // Do nothing here, just wait for it to get recycled
        } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
            state = dispatch(status);
            if (state == SocketState.OPEN) {
                // There may be pipe-lined data to read. If the data isn't
                // processed now, execution will exit this loop and call
                // release() which will recycle the processor (and input
                // buffer) deleting any pipe-lined data. To avoid this,
                // process it now.
                state = service(socketWrapper);
            }
        } else if (status == SocketEvent.OPEN_WRITE) {
            // Extra write event likely after async, ignore
            state = SocketState.LONG;
        } else if (status == SocketEvent.OPEN_READ){
            state = service(socketWrapper);
        } else {
            // Default to closing the socket if the SocketEvent passed in
            // is not consistent with the current state of the Processor
            state = SocketState.CLOSED;
        }
        // 省略一些代碼
    } while (state == SocketState.ASYNC_END ||
            dispatches != null && state != SocketState.CLOSED);
    return state;
}

Http11Processor類實現(xiàn)了自己的service方法,由于代碼較多此處不再展示,重要的處理流程是getAdapter().service(request, response);這一行。
Http11Processor的adapter是在對象生成后set上去的,其參數(shù)是調(diào)用AbstractProtocol的getAdapter方法得到的,而AbstractProtocol的adapter是Connector初始化的時候被賦值的,代碼如下:

@Override
protected void initInternal() throws LifecycleException {
    super.initInternal();
    // Initialize adapter
    adapter = new CoyoteAdapter(this);
    protocolHandler.setAdapter(adapter);
    // 省略一些代碼
}

CoyoteAdapter類

給ProtocolHandler設(shè)置的Adapter是CoyoteAdapter類型,其service方法代碼如下:

@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
        throws Exception {
    Request request = (Request) req.getNote(ADAPTER_NOTES);
    Response response = (Response) res.getNote(ADAPTER_NOTES);
    if (request == null) {
        // Create objects
        request = connector.createRequest();
        request.setCoyoteRequest(req);
        response = connector.createResponse();
        response.setCoyoteResponse(res);

        // Link objects
        request.setResponse(response);
        response.setRequest(request);

        // Set as notes
        req.setNote(ADAPTER_NOTES, request);
        res.setNote(ADAPTER_NOTES, response);

        // Set query string encoding
        req.getParameters().setQueryStringCharset(connector.getURICharset());
    }

    if (connector.getXpoweredBy()) {
        response.addHeader("X-Powered-By", POWERED_BY);
    }

    boolean async = false;
    boolean postParseSuccess = false;

    req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());

    try {
        // Parse and set Catalina and configuration specific
        // request parameters
        postParseSuccess = postParseRequest(req, request, res, response);
        if (postParseSuccess) {
            //check valves if we support async
            request.setAsyncSupported(
                    connector.getService().getContainer().getPipeline().isAsyncSupported());
            // Calling the container
            connector.getService().getContainer().getPipeline().getFirst().invoke(
                    request, response);
        }
        if (request.isAsync()) {
            async = true;
            ReadListener readListener = req.getReadListener();
            if (readListener != null && request.isFinished()) {
                // Possible the all data may have been read during service()
                // method so this needs to be checked here
                ClassLoader oldCL = null;
                try {
                    oldCL = request.getContext().bind(false, null);
                    if (req.sendAllDataReadEvent()) {
                        req.getReadListener().onAllDataRead();
                    }
                } finally {
                    request.getContext().unbind(false, oldCL);
                }
            }

            Throwable throwable =
                    (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

            // If an async request was started, is not going to end once
            // this container thread finishes and an error occurred, trigger
            // the async error process
            if (!request.isAsyncCompleting() && throwable != null) {
                request.getAsyncContextInternal().setErrorState(throwable, true);
            }
        } else {
            request.finishRequest();
            response.finishResponse();
        }

    } catch (IOException e) {
        // Ignore
    } finally {
        // 省略一些代碼
    }
}

請求預(yù)處理

postParseRequest方法對請求做預(yù)處理,如對路徑去除分號表示的路徑參數(shù)、進(jìn)行URI解碼、規(guī)格化(點(diǎn)號和兩點(diǎn)號)

protected boolean postParseRequest(org.apache.coyote.Request req, Request request,
        org.apache.coyote.Response res, Response response) throws IOException, ServletException {
    // 省略部分代碼
    MessageBytes decodedURI = req.decodedURI();

    if (undecodedURI.getType() == MessageBytes.T_BYTES) {
        // Copy the raw URI to the decodedURI
        decodedURI.duplicate(undecodedURI);

        // Parse the path parameters. This will:
        //   - strip out the path parameters
        //   - convert the decodedURI to bytes
        parsePathParameters(req, request);

        // URI decoding
        // %xx decoding of the URL
        try {
            req.getURLDecoder().convert(decodedURI, false);
        } catch (IOException ioe) {
            res.setStatus(400);
            res.setMessage("Invalid URI: " + ioe.getMessage());
            connector.getService().getContainer().logAccess(
                    request, response, 0, true);
            return false;
        }
        // Normalization
        if (!normalize(req.decodedURI())) {
            res.setStatus(400);
            res.setMessage("Invalid URI");
            connector.getService().getContainer().logAccess(
                    request, response, 0, true);
            return false;
        }
        // Character decoding
        convertURI(decodedURI, request);
        // Check that the URI is still normalized
        if (!checkNormalize(req.decodedURI())) {
            res.setStatus(400);
            res.setMessage("Invalid URI character encoding");
            connector.getService().getContainer().logAccess(
                    request, response, 0, true);
            return false;
        }
    } else {
        /* The URI is chars or String, and has been sent using an in-memory
            * protocol handler. The following assumptions are made:
            * - req.requestURI() has been set to the 'original' non-decoded,
            *   non-normalized URI
            * - req.decodedURI() has been set to the decoded, normalized form
            *   of req.requestURI()
            */
        decodedURI.toChars();
        // Remove all path parameters; any needed path parameter should be set
        // using the request object rather than passing it in the URL
        CharChunk uriCC = decodedURI.getCharChunk();
        int semicolon = uriCC.indexOf(';');
        if (semicolon > 0) {
            decodedURI.setChars
                (uriCC.getBuffer(), uriCC.getStart(), semicolon);
        }
    }

    // Request mapping.
    MessageBytes serverName;
    if (connector.getUseIPVHosts()) {
        serverName = req.localName();
        if (serverName.isNull()) {
            // well, they did ask for it
            res.action(ActionCode.REQ_LOCAL_NAME_ATTRIBUTE, null);
        }
    } else {
        serverName = req.serverName();
    }

    // Version for the second mapping loop and
    // Context that we expect to get for that version
    String version = null;
    Context versionContext = null;
    boolean mapRequired = true;

    while (mapRequired) {
        // This will map the the latest version by default
        connector.getService().getMapper().map(serverName, decodedURI,
                version, request.getMappingData());
        // 省略部分代碼
    }
    // 省略部分代碼
}

以MessageBytes的類型是T_BYTES為例:

  • parsePathParameters方法去除URI中分號表示的路徑參數(shù);
  • req.getURLDecoder()得到一個UDecoder實例,它的convert方法對URI解碼,這里的解碼只是移除百分號,計算百分號后兩位的十六進(jìn)制數(shù)字值以替代原來的三位百分號編碼;
  • normalize方法規(guī)格化URI,解釋路徑中的“.”和“..”;
  • convertURI方法利用Connector的uriEncoding屬性將URI的字節(jié)轉(zhuǎn)換為字符表示;
  • 注意connector.getService().getMapper().map(serverName, decodedURI, version, request.getMappingData()) 這行,之前Service啟動時MapperListener注冊了該Service內(nèi)的各Host和Context。根據(jù)URI選擇Context時,Mapper的map方法采用的是convertURI方法解碼后的URI與每個Context的路徑去比較,可參考Mapper類Context配置文檔。

容器處理

如果請求可以被傳給容器的Pipeline即當(dāng)postParseRequest方法返回true時,則由容器繼續(xù)處理,在service方法中有connector.getService().getContainer().getPipeline().getFirst().invoke(request, response)這一行:

  • Connector調(diào)用getService返回StandardService;
  • StandardService調(diào)用getContainer返回StandardEngine;
  • StandardEngine調(diào)用getPipeline返回與其關(guān)聯(lián)的StandardPipeline;
  • Tomcat啟動分析(七) - 容器組件與Engine組件中曾提到StandardEngine的構(gòu)造函數(shù)為自己的Pipeline添加了基本閥StandardEngineValve,StandardPipeline調(diào)用getFirst得到第一個閥去處理請求,由于基本閥是最后一個,所以最后會由基本閥去處理請求,后續(xù)處理流程請看下一篇文章

參考文獻(xiàn)

Apache Tomcat 8 Request Process Flow

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容