Tomcat源碼學(xué)習(xí)筆記 - Connector組件(二)

前言

上一篇文章講到Poller處理完之后,交給SocketProcessor執(zhí)行處理,這篇就詳細(xì)記錄下這個(gè)處理過程。

SocketProcessor

SocketProcessor實(shí)現(xiàn)Runnable接口,對(duì)外暴露run()方法,內(nèi)部封裝doRun()。

protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
    public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
        super(socketWrapper, event);
    }

    @Override
    protected void doRun() {
        NioChannel socket = socketWrapper.getSocket();
        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

        try {
            // 這里的 handshake 是用來標(biāo)記https的握手完成情況,
            // 如果是http不需要該握手階段,在從c1處直接置為0
            int handshake = -1;
            try {
                if (key != null) {
                    if (socket.isHandshakeComplete()) { // c1
                        handshake = 0;
                    } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                               event == SocketEvent.ERROR) {
                        // 如果不能完成TLS握手過程,標(biāo)記握手失敗
                        handshake = -1;
                    } else {
                        // 處理https的SecureNioChannel覆寫了該hanshake()方法
                        // 返回注冊(cè)的SelectionKey
                        handshake = socket.handshake(key.isReadable(), key.isWritable());
                        event = SocketEvent.OPEN_READ;
                    }
                }
            } catch (IOException x) {
                ...
            }
            if (handshake == 0) { // 握手完成
                SocketState state = SocketState.OPEN; // 標(biāo)記Socket狀態(tài)
                // 處理該Sockt里的請(qǐng)求
                if (event == null) {
                    // c2
                    state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                } else {
                    state = getHandler().process(socketWrapper, event);
                }
                if (state == SocketState.CLOSED) {
                    close(socket, key); // 否則關(guān)閉通道
                }
            } else if (handshake == -1) { // 握手失敗則關(guān)閉通道
                close(socket, key);
            } else if (handshake == SelectionKey.OP_READ) { // TLS會(huì)走到這里
                socketWrapper.registerReadInterest(); // 注冊(cè)讀就緒
            } else if (handshake == SelectionKey.OP_WRITE) {
                socketWrapper.registerWriteInterest(); // 注冊(cè)寫就緒
            }
        } catch (CancelledKeyException cx) {
            ... // 處理異常
        } finally {
            socketWrapper = null;
            event = null;
            if (running && !paused) {
                processorCache.push(this); // 將SocketProcessor放回緩存中
            }
        }
    }
}

在c2處,該方法將包裝好的socketWrapper繼續(xù)傳下去,來看下調(diào)用棧,最終在Http11Processor的service方法中處理,而這個(gè)Http11Processor正是Processor的一個(gè)實(shí)現(xiàn)類,用來真正解析流。

1
Http11Processor

先看一下Http11Processor的構(gòu)造方法。首先調(diào)用抽象父類的構(gòu)造方法,new了一個(gè)Request和Response,用于接收解析結(jié)果和Servlet處理完的內(nèi)容,值得注意的是這里的Request和Response是org.apache.coyote包下的,與org.apache.catalina.connector包下的Request和Response不同,拿Request來說,前者是服務(wù)器請(qǐng)求的一種底層,但高效的表示,同時(shí)它也是GC-free,并且將消耗計(jì)算資源的操作作了延遲處理,可以說是按需再取。說它底層且高效是因?yàn)樗侵苯硬僮鲾?shù)據(jù)流,轉(zhuǎn)換成需要的信息,所以它不適合用戶代碼。而后者是Coyote Request的包裝,為用戶(也就是servlet)提供了一個(gè)高級(jí)視圖(外觀模式)。

并且在c3處可以看到傳入了一個(gè)Adapter,這個(gè)Adapter則是基于coyote的servlet容器中的入口點(diǎn)。也就是說其作用是Connector與Container之間的橋梁。

public Http11Processor(AbstractHttp11Protocol<?> protocol, Adapter adapter) {
    super(adapter);
    this.protocol = protocol; // protocol

    httpParser = new HttpParser(protocol.getRelaxedPathChars(),
            protocol.getRelaxedQueryChars());

    inputBuffer = new Http11InputBuffer(request, protocol.getMaxHttpHeaderSize(),
            protocol.getRejectIllegalHeaderName(), httpParser);
    request.setInputBuffer(inputBuffer); // 輸入緩沖,用于解析請(qǐng)求頭及傳輸編碼

    outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpHeaderSize());
    response.setOutputBuffer(outputBuffer);  // 輸出緩沖,寫headers和response主題

    inputBuffer.addFilter(new IdentityInputFilter(protocol.getMaxSwallowSize()));
    outputBuffer.addFilter(new IdentityOutputFilter()); // 添加身份認(rèn)證過濾器
    inputBuffer.addFilter(new ChunkedInputFilter(protocol.getMaxTrailerSize(),
            protocol.getAllowedTrailerHeadersInternal(), protocol.getMaxExtensionSize(),
            protocol.getMaxSwallowSize()));
    outputBuffer.addFilter(new ChunkedOutputFilter()); // 添加分塊過濾器
    
    // Void輸入輸出過濾器,在嘗試讀取時(shí)返回-1。 與GET,HEAD或類似請(qǐng)求一起使用。
    inputBuffer.addFilter(new VoidInputFilter());
    outputBuffer.addFilter(new VoidOutputFilter()); 

    // 輸入過濾器負(fù)責(zé)讀取和緩沖請(qǐng)求體,以便它不會(huì)干擾客戶端SSL握手消息。
    inputBuffer.addFilter(new BufferedInputFilter());

    //inputBuffer.addFilter(new GzipInputFilter());
    outputBuffer.addFilter(new GzipOutputFilter()); // Gzip

    pluggableFilterIndex = inputBuffer.getFilters().length; // 標(biāo)記過濾器數(shù)量
}

public AbstractProcessor(Adapter adapter) {
    this(adapter, new Request(), new Response()); // new了一個(gè)Request、Response
}
protected AbstractProcessor(Adapter adapter, Request coyoteRequest, Response coyoteResponse) {
    this.adapter = adapter; // Adapter c3
    asyncStateMachine = new AsyncStateMachine(this);
    request = coyoteRequest;
    response = coyoteResponse;
    response.setHook(this);
    request.setResponse(response);
    request.setHook(this);
    userDataHelper = new UserDataHelper(getLog());
}

看完基本的構(gòu)造方法,大致能了解這個(gè)Http11Processor的功能便是解析流,然后封裝成Request,最后給Servlet做處理。下面來看下調(diào)用棧中提到的service方法??梢钥吹教幚韘ocket分為幾個(gè)階段,解析——>準(zhǔn)備——>服務(wù)——>結(jié)束輸入、輸出——>KEEPALIVE——>結(jié)束,當(dāng)然如果對(duì)于keepalive的請(qǐng)求來說,會(huì)在前5個(gè)階段循環(huán)。前四個(gè)階段表示了一個(gè)完成的請(qǐng)求處理及響應(yīng)。在c4處可以看到獲取了Adapter,而這個(gè)Adapter的實(shí)現(xiàn)類為CoyoteAdapter,這個(gè)Adapter將Request和Reponse的處理委托給servlet。

    @Override
    public SocketState service(SocketWrapperBase<?> socketWrapper)
        throws IOException {
        // 包含有關(guān)請(qǐng)求處理的統(tǒng)計(jì)、管理信息。
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // 解析階段

        setSocketWrapper(socketWrapper); // 初始化I/O設(shè)置,
        inputBuffer.init(socketWrapper);
        outputBuffer.init(socketWrapper);

        keepAlive = true; // 標(biāo)志位
        openSocket = false;
        readComplete = true;
        boolean keptAlive = false;
        SendfileState sendfileState = SendfileState.DONE;
 
        while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !protocol.isPaused()) {
            try {
                // 從buffer中讀取并解析請(qǐng)求基本信息,比如方法(GET/POST/PUT等等)、URI、協(xié)議等等
                if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(), protocol.getKeepAliveTimeout())) {
                    if (inputBuffer.getParsingRequestLinePhase() == -1) {
                        return SocketState.UPGRADING;
                    } else if (handleIncompleteRequestLineRead()) {
                        break;
                    }
                }

                if (protocol.isPaused()) {
                    response.setStatus(503); // 如果協(xié)議解析服務(wù)被暫停,則返回503
                    setErrorState(ErrorState.CLOSE_CLEAN, null);
                } else {
                    keptAlive = true;
                    // 每次更新header屬性的數(shù)量限制(可以通過JMX修改)
                    request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());
                    // 解析Request Headers
                    if (!inputBuffer.parseHeaders()) {
                        openSocket = true;
                        readComplete = false;
                        break;
                    }
                    if (!protocol.getDisableUploadTimeout()) { socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());
                    }
                }
            } catch (IOException e) {
                ... // 處理異常
            }
            Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection"); // 從Header里面取到Connection屬性
            boolean foundUpgrade = false;
            while (connectionValues.hasMoreElements() && !foundUpgrade) {
                foundUpgrade = connectionValues.nextElement().toLowerCase(
                        Locale.ENGLISH).contains("upgrade");
            }

            if (foundUpgrade) { // 如果包含upgrade,表示協(xié)議需要升級(jí)
                String requestedProtocol = request.getHeader("Upgrade"); // 獲取升級(jí)協(xié)議
                UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol); // 包裝成UpgradeProtocol
                if (upgradeProtocol != null) {
                    if (upgradeProtocol.accept(request)) {
                        response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
                        response.setHeader("Connection", "Upgrade"); // 設(shè)置響應(yīng)體
                        response.setHeader("Upgrade", requestedProtocol);
                        action(ActionCode.CLOSE,  null);
                        getAdapter().log(request, response, 0);

                        InternalHttpUpgradeHandler upgradeHandler =
                                upgradeProtocol.getInternalUpgradeHandler(
                                        socketWrapper, getAdapter(), cloneRequest(request));
                        UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);
                        action(ActionCode.UPGRADE, upgradeToken);
                        return SocketState.UPGRADING;
                    }
                }
            }
            if (getErrorState().isIoAllowed()) {
                // 準(zhǔn)備階段
                rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
                try {
                    // 主要設(shè)置過濾器,并且解析部分headers
                    // 比如檢查是否是keepAlive、如果是http/1.1,判斷是否包含Expect:100-continue
                    // (用于客戶端在發(fā)送POST數(shù)據(jù)給服務(wù)器前,征詢服務(wù)器情況,
                    // 看服務(wù)器是否處理POST的數(shù)據(jù),常用于大文件post,例如文件上傳)、
                    // 用戶代理user-agent狀況、host等等
                    prepareRequest();
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("http11processor.request.prepare"), t);
                    }
                    // 500 - Internal Server Error
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, t);
                }
            }

            // 如果是長(zhǎng)連接,則需要判斷長(zhǎng)連接是否達(dá)到服務(wù)器上限
            int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
            if (maxKeepAliveRequests == 1) {
                keepAlive = false;
            } else if (maxKeepAliveRequests > 0 &&
                    socketWrapper.decrementKeepAlive() <= 0) {
                keepAlive = false;
            }

            // 通過Adapter交給Container處理請(qǐng)求,并構(gòu)建Response
            if (getErrorState().isIoAllowed()) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    getAdapter().service(request, response); // c4
                    if(keepAlive && !getErrorState().isError() && !isAsync() &&
                            statusDropsConnection(response.getStatus())) {
                        setErrorState(ErrorState.CLOSE_CLEAN, null);
                    }
                } catch (InterruptedIOException e) {
                    ... // 處理異常
            }

            // 結(jié)束請(qǐng)求處理
            rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
            if (!isAsync()) {
                // 如果這是異步請(qǐng)求,則請(qǐng)求在完成后結(jié)束。
                // 在這種情況下,AsyncContext負(fù)責(zé)調(diào)用endRequest()。
                endRequest();
            }
            rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

            // 確保設(shè)置錯(cuò)誤碼,且更新請(qǐng)求的統(tǒng)計(jì)計(jì)數(shù)
            if (getErrorState().isError()) {
                response.setStatus(500);
            }
            if (!isAsync() || getErrorState().isError()) {
                request.updateCounters();
                if (getErrorState().isIoAllowed()) {
                    inputBuffer.nextRequest();
                    outputBuffer.nextRequest();
                }
            }

            if (!protocol.getDisableUploadTimeout()) {
                int connectionTimeout = protocol.getConnectionTimeout();
                if(connectionTimeout > 0) {
                    socketWrapper.setReadTimeout(connectionTimeout);
                } else {
                    socketWrapper.setReadTimeout(0);
                }
            }

            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); // 如果是長(zhǎng)連接繼續(xù)循環(huán)

            sendfileState = processSendfile(socketWrapper);
        }

        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
        ...
    }

Connector的對(duì)請(qǐng)求數(shù)據(jù)流的處理流程基本已經(jīng)完成,接下去便是交由Container組件去做具體的Servlet處理了。在上一篇博客中提到Tomcat為了高效的處理請(qǐng)求流,做了很多包括數(shù)據(jù)結(jié)構(gòu)(輕量級(jí)的同步棧SynchronizedStack),以及延遲的異常處理等努力。接下來可以看下本篇文章代碼中涉及到的一些數(shù)據(jù)結(jié)構(gòu)等。

MessageBytes

我們?cè)谏瞎?jié)中提到org.apache.coyote包下的Request是一種底層且高效的表示。因?yàn)槲覀儚钠鋐inal域中可以看到其類型都是MessageBytes類型的。這個(gè)類用來表示HTTP消息中的字節(jié)數(shù)組,可以代表請(qǐng)求/響應(yīng)中的全部元素。且字節(jié)、字符之間的轉(zhuǎn)換都是延遲且有緩存機(jī)制,并且是可以回收的。

該對(duì)象可以表示byte [],char []或(子)String。也就是說,在接受到socket傳入的字節(jié)之后并不會(huì)馬上進(jìn)行編碼轉(zhuǎn)換,而是保持byte[]的方式,在用到的時(shí)候再進(jìn)行轉(zhuǎn)換。

private int type = T_NULL;
public static final int T_NULL = 0; // 表示空消息
public static final int T_STR = 1; // 表示字符串類型
public static final int T_BYTES = 2; // 表示字節(jié)數(shù)組類型
public static final int T_CHARS = 3; // 表示字符數(shù)組類型

另外擁有一個(gè)ByteChunk類型和一個(gè)CharChunk類型的對(duì)象,這兩個(gè)類可以看作是ByteBuffer的擴(kuò)展,提供對(duì)字節(jié)/字符數(shù)組的操作。

private final ByteChunk byteC = new ByteChunk();
private final CharChunk charC = new CharChunk();

在Http11Processor的service方法處理請(qǐng)求數(shù)據(jù)的時(shí)候,經(jīng)常用到MessageBytes的setBytes()方法,該方法將指定的字節(jié)數(shù)組存入緩沖區(qū)byteC里面。

    public void setBytes(byte[] b, int off, int len) {
        byteC.setBytes(b, off, len);
        type = T_BYTES;
        hasStrValue = false;
        hasHashCode = false;
        hasLongValue = false;
    }

既然說MessageBytes提供了一個(gè)延遲機(jī)制,在沒有轉(zhuǎn)換時(shí),請(qǐng)求數(shù)據(jù)一直以字節(jié)形式存儲(chǔ),直到調(diào)用toString()才去轉(zhuǎn)換為字符串。

    @Override
    public String toString() {
        if (hasStrValue) {
            return strValue;
        }
        switch (type) {
            case T_CHARS:
                strValue = charC.toString(); // 字符
                hasStrValue = true;
                return strValue;
            case T_BYTES:
                strValue = byteC.toString(); // 字節(jié)
                hasStrValue = true;
                return strValue;
        }
        return null;
    }

以字節(jié)形式為例,調(diào)用了ByteChunk的toString()方法。這個(gè)方法又調(diào)用的StringCache的toString(),這里便用到了緩存機(jī)制,這個(gè)StringCache的toString()大概就是先判斷緩存數(shù)組是否存在,如果不存在,則調(diào)用toStringInternal()方法,轉(zhuǎn)換好了之后新建緩存并放入數(shù)據(jù),如果緩存存在,則先嘗試從緩存中取,取不到的話就調(diào)用ByteChunk的toStringInternal()方法,從調(diào)用棧中也可以看到,代碼比較長(zhǎng)就不貼了。

    public String toString() {
        if (isNull()) {
            return null;
        } else if (end - start == 0) {
            return "";
        }
        return StringCache.toString(this);
    }
2

那么這個(gè)toStringInternal做了什么事來看一下,正是根據(jù)偏移量和待提取長(zhǎng)度進(jìn)行編碼提取轉(zhuǎn)換。如果直接使用new String(byte[], int, int, Charset),將會(huì)先copy整個(gè)byte數(shù)組,這很影響性能,所以先通過decode將數(shù)組的一部分編碼成CharBuffer,然后在調(diào)用new String,非常精髓的一個(gè)處理方式,可謂是費(fèi)盡心思地提高web服務(wù)器的性能。

    public String toStringInternal() {
        if (charset == null) {
            charset = DEFAULT_CHARSET;
        }
        CharBuffer cb = charset.decode(ByteBuffer.wrap(buff, start, end - start));
        return new String(cb.array(), cb.arrayOffset(), cb.length());
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 轉(zhuǎn)自陳明乾的博客,可能有一定更新。 轉(zhuǎn)原文聲明:原創(chuàng)作品,允許轉(zhuǎn)載,轉(zhuǎn)載時(shí)請(qǐng)務(wù)必以超鏈接形式標(biāo)明文章 原始出處 、...
    C86guli閱讀 4,884評(píng)論 6 72
  • 一. Java基礎(chǔ)部分.................................................
    wy_sure閱讀 4,017評(píng)論 0 11
  • IOC 控制反轉(zhuǎn)容器控制程序?qū)ο笾g的關(guān)系,而不是傳統(tǒng)實(shí)現(xiàn)中,有程序代碼之間控制,又名依賴注入。All 類的創(chuàng)建,...
    irckwk1閱讀 1,097評(píng)論 0 0
  • 在一個(gè)方法內(nèi)部定義的變量都存儲(chǔ)在棧中,當(dāng)這個(gè)函數(shù)運(yùn)行結(jié)束后,其對(duì)應(yīng)的棧就會(huì)被回收,此時(shí),在其方法體中定義的變量將不...
    Y了個(gè)J閱讀 4,576評(píng)論 1 14
  • 概念 Java Web,是基于Java語言實(shí)現(xiàn)web服務(wù)的技術(shù)總和。介于現(xiàn)在Java在web客戶端應(yīng)用的比較少,我...
    胥垣閱讀 1,544評(píng)論 0 8

友情鏈接更多精彩內(nèi)容