Tomcat是如何處理請求的(上)

概述

Tomcat涉及的內(nèi)容較多,本文主要分析其關(guān)鍵部分的高并發(fā)設(shè)計。

主要內(nèi)容

  1. IO模型概述
  2. Tomcat的IO模型實(shí)現(xiàn)
  3. Tomcat的并發(fā)控制

IO模型概述

從操作系統(tǒng)層面看服務(wù)端的數(shù)據(jù)交互流程:


image

如圖所示,當(dāng)服務(wù)端收到請求后,用戶空間的用戶線程會發(fā)起read調(diào)用,這個read調(diào)用依賴兩個過程:從網(wǎng)卡拷貝數(shù)據(jù)到內(nèi)核空間、從內(nèi)核空間拷貝數(shù)據(jù)到用戶空間,write調(diào)用是反向的兩個步驟。

那在處理這兩個耗時的Copy操作時,用戶線程是占著CPU還是出讓CPU?怎樣可以更高效的處理數(shù)據(jù)?不同IO模型的作用就在這里體現(xiàn)出來了。

同步阻塞IO

用戶線程發(fā)起read調(diào)用后就進(jìn)入阻塞(出讓CPU),等待read執(zhí)行完(數(shù)據(jù)拷貝到用戶空間)后再喚醒。

image

同步非阻塞IO

用戶線程發(fā)起read調(diào)用后不進(jìn)入阻塞,而是持續(xù)不斷的發(fā)起read輪詢,在數(shù)據(jù)拷貝到內(nèi)核空間之前read調(diào)用都返回false,數(shù)據(jù)拷貝到內(nèi)核空間后,read開始阻塞,等待數(shù)據(jù)從內(nèi)核空間拷貝的用戶空間后再喚醒用戶線程。

image

IO多路復(fù)用

用戶線程讀取數(shù)據(jù)分成兩步,先輪詢調(diào)用select詢問數(shù)據(jù)是否已到內(nèi)核,如果到了則調(diào)用read讀取數(shù)據(jù),此時read調(diào)用會等待數(shù)據(jù)從內(nèi)核空間拷貝到用戶空間,這個過程是阻塞的。而這里多路復(fù)用的意思是:<font color="#cc0000">一次select可以得到多個channel的結(jié)果。</font>

image

異步IO

用戶線程在調(diào)用read時注冊一個回調(diào)函數(shù),當(dāng)數(shù)據(jù)到用戶空間后會調(diào)用此回調(diào)通知用戶線程,這個過程用戶線程不阻塞。

image

本文介紹的Tomcat9默認(rèn)是<font color="#cc0000">基于IO多路復(fù)用模型</font>做的的高并發(fā)設(shè)計。

Tomcat的IO模型實(shí)現(xiàn)

假設(shè)我們把Tomcat當(dāng)做黑盒子,按照Spring的方式來處理請求,簡化的流程是這樣的:

image

Tomcat負(fù)責(zé)讀取內(nèi)核的數(shù)據(jù),轉(zhuǎn)換成Servlet對象,然后由Spring框架處理業(yè)務(wù)后通過Response對象寫入返回數(shù)據(jù),Tomcat再將返回數(shù)據(jù)通過內(nèi)核寫入網(wǎng)卡,最后返回到客戶端。

接下來我們將Tomcat這部分放大,看看黑盒子里是怎么處理的。

image

如圖所示,請求的處理分如下幾個步驟:

  1. Tomcat在啟動時會初始化一個ServerSocket用于監(jiān)聽指定端口的IO請求(比如8080)
  2. 接著啟動Acceptor線程,循環(huán)調(diào)用accept方法接收IO請求(TCP連接建立)
  3. 將ServerChannel包裝成PollerEvent,注冊到Poller的event隊列中
  4. Poller線程循環(huán)遍歷event隊列,將Poller關(guān)注的ServerChannel的READ操作注冊到Selector中
  5. 在同一個Poller循環(huán)中,用Selector查詢ServerChannel的狀態(tài),這里一次可以查詢到多個ServerChannel的狀態(tài),即<font color="#cc0000">多路復(fù)用</font>
  6. 將查詢到的SelectionKey對應(yīng)的ServerChannel挨個創(chuàng)建SocketProcessor,SocketProcessor是Runnable的
  7. 將SocketProcessor扔到工作線程進(jìn)行處理
  8. 接下來的協(xié)議解析、Request和Response適配、Servlet處理、業(yè)務(wù)處理等都在SocketProcessor的流程中

涉及的主要代碼:

  • Acceptor線程:Acceptor.run->NioEndpoint.setSocketOptions->NioEndpoint.register->Poller.addEvent
  • Poller線程:Poller.run->Poller.events->PollerEvent.run->Poller.processKey->AbstractEndpoint.processSocket

下一節(jié)我們再對部分源碼進(jìn)行分析。

Tomcat的并發(fā)控制

上面分析了Tomcat的IO多路復(fù)用模型實(shí)現(xiàn),在這個實(shí)現(xiàn)中每一個環(huán)節(jié)都有影響并發(fā)的關(guān)鍵控制,先看圖:


image

內(nèi)核-Accept List

TCP三次握手建立連接的過程中,內(nèi)核會為每一個LISTEN狀態(tài)的Socket維護(hù)兩個隊列:

  • SYN隊列:這些連接已收到客戶端的SYN
  • ACCEPT隊列:這些連接已經(jīng)收到客戶端的ACK,完成了三次握手,等待被系統(tǒng)accept調(diào)用取走

Tomcat的Acceptor負(fù)責(zé)從ACCEPT隊列中取走連接,當(dāng)Acceptor處理不過來時,連接就堆積在ACCEPT隊列中,這個隊列長度由<font color="#cc0000">acceptCount(默認(rèn)100)</font>控制,當(dāng)我們嘗試把a(bǔ)cceptCount設(shè)置的很小,發(fā)起并發(fā)請求時,會收到一個Socket Error。

// 初始化服務(wù)端端口監(jiān)聽-NioEndpoint.initServerSocket
protected void initServerSocket() throws Exception {
    if (!getUseInheritedChannel()) {
        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
        // 綁定acceptCount,可以通過配置server.tomcat.accept-count調(diào)整大小
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
        // Retrieve the channel provided by the OS
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }
    serverSock.configureBlocking(true); //mimic APR behavior
}

Acceptor-LimitLatch

并發(fā)處理請求的最大連接控制器,內(nèi)部通過AQS實(shí)現(xiàn)等待。即如果服務(wù)端請求處理不過來,并發(fā)的請求數(shù)量超過了<font color="#cc0000">maxConnections(默認(rèn)8192)</font>,則會進(jìn)入等待。當(dāng)Acceptor線程接收出現(xiàn)異常,Socket被異常關(guān)閉或者SocketProessor處理完后會回收連接數(shù)。

// Acceptor線程執(zhí)行邏輯-Acceptor.run
public void run() {
        // Loop until we receive a shutdown command
        while (endpoint.isRunning()) {
            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait
                // LimitLatch控制最大并發(fā)連接數(shù),如果達(dá)到最大連接數(shù)則進(jìn)入等待,直到有空余連接數(shù)
                endpoint.countUpOrAwaitConnection();

                // Endpoint might have been paused while waiting for latch
                // If that is the case, don't accept new connections
                if (endpoint.isPaused()) {
                    continue;
                }

                U socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    // 調(diào)用accept方法從內(nèi)核提取IO請求
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    // We didn't get a socket
                    // 發(fā)生錯誤則回收連接數(shù)
                    endpoint.countDownConnection();
                }
                // Successful accept, reset the error delay
                errorDelay = 0;

                // Configure the socket
                if (endpoint.isRunning() && !endpoint.isPaused()) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else {
                    endpoint.destroySocket(socket);
                }
            } catch (Throwable t) {
            }
        }
        state = AcceptorState.ENDED;
    }

那怎么知道當(dāng)前有多少連接呢?可以用<font color="#cc0000">lsof</font>查看占用8080端口的文件

lsof -i :8080

也可以用<font color="#cc0000">netstat</font>,需要注意的是,查看當(dāng)前正在通信的連接需要篩選ESTABLISHED

netstat -anp | grep 8080 | grep ESTABLISHED

Poller-PollerEvent Queue

是一個Tomcat自己實(shí)現(xiàn)的輕量級的同步隊列<font color="#cc0000">SynchronizedQueue</font>,默認(rèn)大小是128,超過后會自動擴(kuò)容到當(dāng)前的兩倍。主要特點(diǎn)是很輕量,內(nèi)部用了System.copy提升性能,對GC很友好。

// 注冊PollerEvent到SynchronizedQueue-NioEndpoint.register
public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
    socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
    PollerEvent r = null;
    if (eventCache != null) {
        r = eventCache.pop();
    }
    if (r == null) {
        // 包裝PollerEvent
        r = new PollerEvent(socket, OP_REGISTER);
    } else {
        r.reset(socket, OP_REGISTER);
    }
    // 往Poller.events(SynchronizedQueue)里加入PollerEvent
    addEvent(r);
}
//往Selector注冊感興趣的事件-PollerEvent.run
public void run() {
    if (interestOps == OP_REGISTER) {
        try {
            // 往Selector注冊感興趣的數(shù)據(jù)可讀事件
            socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(), SelectionKey.OP_READ, socket.getSocketWrapper());
        } catch (Exception x) {
            log.error(sm.getString("endpoint.nio.registerFail"), x);
        }
    }
}
// Poller線程執(zhí)行邏輯-Poller.run
public void run() {
    while (true) {
        boolean hasEvents = false;
        try {
            if (!close) {
                // events()方法內(nèi)部會遍歷PollerEvent隊列,并往Selector注冊事件
                hasEvents = events();
                if (wakeupCounter.getAndSet(-1) > 0) {
                    // If we are here, means we have other stuff to do
                    // Do a non blocking select
                    keyCount = selector.selectNow();
                } else {
                    // 當(dāng)數(shù)據(jù)可讀時,selector會從readylist中返回可讀的個數(shù),一次可以查詢到多個Channel的數(shù)據(jù)狀態(tài)
                    keyCount = selector.select(selectorTimeout);
                }
                wakeupCounter.set(0);
            }
        } catch (Throwable x) {
        }

        Iterator<SelectionKey> iterator =
            keyCount > 0 ? selector.selectedKeys().iterator() : null;
        while (iterator != null && iterator.hasNext()) {
            // 輪詢數(shù)據(jù)準(zhǔn)備好的Channel
            SelectionKey sk = iterator.next();
            NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
            // Attachment may be null if another thread has called
            // cancelledKey()
            if (socketWrapper == null) {
                iterator.remove();
            } else {
                iterator.remove();
            // 執(zhí)行數(shù)據(jù)解析和后續(xù)的業(yè)務(wù)處理
                processKey(sk, socketWrapper);
            }
        }
    }
}

Execotor-ThreadPoolExecutor、TaskQueue

為了性能最大化,Tomcat擴(kuò)展了默認(rèn)線程池策略和線程池隊列。

<font color="#cc0000">當(dāng)線程池達(dá)到最大數(shù)量時,不會立即執(zhí)行拒絕,而是再次嘗試向任務(wù)隊列添加任務(wù),如果還是添加不進(jìn)去才執(zhí)行拒絕</font>:

// 線程池擴(kuò)展-org.apache.tomcat.util.threads.ThreadPoolExecutor.execute
public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // 當(dāng)達(dá)到最大線程數(shù)時,會嘗試把任務(wù)放到隊列,如果還是放不進(jìn)去才會拒絕
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }
        }
    }

Tomcat用的線程池隊列是<font color="#cc0000">LinkedBlockingQueue</font>,默認(rèn)是用的無界模式,于是造成一個問題:當(dāng)線程數(shù)達(dá)到核心線程數(shù)后,任務(wù)可以無限往隊列添加,就不會再創(chuàng)建新線程了。Tomcat在往隊列添加任務(wù)的邏輯中增加了<font color="#cc0000">maximumPoolSize</font>的干預(yù),<font color="#cc0000">使得在線程數(shù)未達(dá)到maximumPoolSize時任務(wù)添加不進(jìn)去</font>,進(jìn)而可以新建線程。

// 線程池隊列擴(kuò)展-TaskQueue.offer
public boolean offer(Runnable o) {
    // 此方法被調(diào)用說明:當(dāng)前線程數(shù)已經(jīng)大于核心線程數(shù)  
    if (parent==null) return super.offer(o);

    // 線程數(shù)等于最大線程數(shù)時,不能再創(chuàng)建新線程,將任務(wù)放入隊列
    if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);

    // 當(dāng)前線程數(shù)大于核心線程數(shù),并且小于最大線程數(shù)

    //  如果任務(wù)數(shù)小于線程數(shù),表名有空閑線程,不需要新建,放入隊列
    if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);

    //  如果任務(wù)數(shù)大于線程數(shù),且線程數(shù)小于最大線程數(shù),此時應(yīng)該創(chuàng)建新線程
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        
    // 其他情況放入隊列
    return super.offer(o);
}

總結(jié)

  1. IO模型介紹
  2. Tomcat實(shí)現(xiàn)的IO多路復(fù)用
  3. Acceptor、Poller、Executor
  4. Accept List、LimitLatch、SynchronizedQueue、線程池擴(kuò)展
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容