Tomcat學(xué)習(xí)筆記之NIO處理分析(一)

前言

在前面[Tomcat學(xué)習(xí)筆記之啟動分析(Connector)(七)]一文中,介紹了Connector容器的初始化與啟動,這里以NioEndpoint為例,詳細分析一下請求處理流程。

組件結(jié)構(gòu)圖

先來看一下整個Connector組件的結(jié)構(gòu)圖:


Connector組件的結(jié)構(gòu)圖

在之前的文章中,已經(jīng)介紹了Acceptor、Poller、Worker 等核心組件的初始化過程。下面就這些核心組件一個一個來看。
備注:這個圖是Tomcat7.0版本的,所以Acceptor還在NioEndpoint中,而在9.0版本,Acceptor已經(jīng)單獨提出去了。

NioEndpoint介紹

  • 主要屬性
    /**
     * 線程安全的非阻塞selector池
     */
    private NioSelectorPool selectorPool = new NioSelectorPool();
    /**
     * Server socket "pointer".
     */
    private volatile ServerSocketChannel serverSock = null;
    private volatile CountDownLatch stopLatch = null;
    /**
     * PollerEvent緩存
     */
    private SynchronizedStack<PollerEvent> eventCache;
    /**
     * NioChannel緩存,每個NioChannel持有一部分Bytebuffer(一般2個,SSL有4個),
     */
    private SynchronizedStack<NioChannel> nioChannels;
/**
     * poller線程的默認優(yōu)先級
     */
    private int pollerThreadPriority = Thread.NORM_PRIORITY;
/**
     * Poller線程數(shù),最多2個
     */
    private int pollerThreadCount = Math.min(2, Runtime.getRuntime().availableProcessors());
/**
     * selector.select()超時時間
     */
    private long selectorTimeout = 1000;
 /**
     * The socket pollers.
     */
    private Poller[] pollers = null;

其他方法介紹后面會說明。

Acceptor接受請求

Acceptor在7.0版本屬于NioEndpoint內(nèi)部類,9.0版本引入了Nio2Endpoit,所以Acceptor被提出來了。

public void run() {

        int errorDelay = 0;

        // 循環(huán),知道收到shutdown命令
        while (endpoint.isRunning()) {

            // 如果endpoint暫停,循環(huán)
            while (endpoint.isPaused() && endpoint.isRunning()) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }

            if (!endpoint.isRunning()) {
                break;
            }
            state = AcceptorState.RUNNING;

            try {
                //如果達到最大連接,等待
                endpoint.countUpOrAwaitConnection();

                // Endpoint might have been paused while waiting for latch
                // If that is the case, don't accept new connections
                if (endpoint.isPaused()) {
                    continue;
                }

                U socket = null;
                try {
                    //Acceptor獲取socket
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    // We didn't get a socket
                    endpoint.countDownConnection();
                    if (endpoint.isRunning()) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
                // Successful accept, reset the error delay
                errorDelay = 0;

                // Configure the socket
                if (endpoint.isRunning() && !endpoint.isPaused()) {
                    //設(shè)置socket,注冊到pollerevent隊列中,如果失敗,關(guān)閉scoket
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else {
                    endpoint.destroySocket(socket);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                String msg = sm.getString("endpoint.accept.fail");
                // APR specific.
                // Could push this down but not sure it is worth the trouble.
                if (t instanceof Error) {
                    Error e = (Error) t;
                    if (e.getError() == 233) {
                        // Not an error on HP-UX so log as a warning
                        // so it can be filtered out on that platform
                        // See bug 50273
                        log.warn(msg, t);
                    } else {
                        log.error(msg, t);
                    }
                } else {
                        log.error(msg, t);
                }
            }
        }
        state = AcceptorState.ENDED;
    }
protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //1. socket設(shè)置為非阻塞
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            //2. 設(shè)置Socket參數(shù)值(從server.xml的Connector節(jié)點上獲取參數(shù)值)比如Socket發(fā)送、接收的緩存大小、心跳檢測等
            socketProperties.setProperties(sock);
            //3. 從NioChannel的緩存棧中取出一個NioChannel,NioChannel是SocketChannel的一個的包裝類
            NioChannel channel = nioChannels.pop();
            //4. 緩存隊列中沒有則新建一個NioChannel,并將SocketChannel關(guān)聯(lián)到從緩存隊列中獲取的NioChannel上來
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                channel.reset();
            }
            //5. 注冊到Poller中
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error(sm.getString("endpoint.socketOptionsError"), t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

主要流程如下:

  • 獲取請求;
  • 將socket設(shè)置為非阻塞的;
  • 從NioChannel的緩存棧中獲取NioChannel(socket和buf的包裝類),沒有則新建一個。
  • 注冊到Poller中。

事件注冊

      public void register(final NioChannel socket) {
            //1. 設(shè)置關(guān)聯(lián)的poller
            socket.setPoller(this);
            //2. NioChannel的包裝類,增加了一些控制,讀寫超時時間之類的
            NioSocketWrapper socketWrapper = new NioSocketWrapper(socket, NioEndpoint.this);
            socket.setSocketWrapper(socketWrapper);
            socketWrapper.setPoller(this);
            socketWrapper.setReadTimeout(getConnectionTimeout());
            socketWrapper.setWriteTimeout(getConnectionTimeout());
            socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            socketWrapper.setSecure(isSSLEnabled());
            //3. 從pollerevent緩存棧中獲取緩存,如果沒有新建
            PollerEvent r = eventCache.pop();
            socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if (r == null) {
                r = new PollerEvent(socket, socketWrapper, OP_REGISTER);
            } else {
                r.reset(socket, socketWrapper, OP_REGISTER);
            }
            //4. 添加至pollerevent緩存棧
            addEvent(r);
        }

主要流程:

  • 設(shè)置關(guān)聯(lián)的poller;
  • NioChannel進一步包裝;
  • 從緩存中獲取或者新建一個pollerevent;
  • 放入pollerevent緩存棧中。

PollerEvent流程處理

上面說到NioChannel和NioSocketWrapper會被包裝到PollerEvent,然后添加到PollerEvent隊列中去,我們在這里看一下PollerEvent會做些什么:

        public void run() {
            //如果socket第一次注冊到selector中,完成對socket讀事件的注冊
            if (interestOps == OP_REGISTER) {
                try {
                    socket.getIOChannel().register(
                            socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
                } catch (Exception x) {
                    log.error(sm.getString("endpoint.nio.registerFail"), x);
                }
            } else {
                //尋找selector中的key,如果key為空,連接數(shù)減一,并且socketWrapper.close置位true;否則更新更新socket所感興趣的事件
                final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
                try {
                    if (key == null) {
                        // The key was cancelled (e.g. due to socket closure)
                        // and removed from the selector while it was being
                        // processed. Count down the connections at this point
                        // since it won't have been counted down when the socket
                        // closed.
                        socket.socketWrapper.getEndpoint().countDownConnection();
                        ((NioSocketWrapper) socket.socketWrapper).closed = true;
                    } else {
                        final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                        if (socketWrapper != null) {
                            // We are registering the key to start with, reset the fairness counter.
                            int ops = key.interestOps() | interestOps;
                            socketWrapper.interestOps(ops);
                            key.interestOps(ops);
                        } else {
                            socket.getPoller().cancelledKey(key);
                        }
                    }
                } catch (CancelledKeyException ckx) {
                    try {
                        socket.getPoller().cancelledKey(key);
                    } catch (Exception ignore) {
                    }
                }
            }
        }

PollerEvent主要用來更新selector所感興趣的事件。

總結(jié)

Connector初步處理請求的流程如下:


初步流程圖

接下來看Poller的流程處理。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容