前言
在前面[Tomcat學(xué)習(xí)筆記之啟動分析(Connector)(七)]一文中,介紹了Connector容器的初始化與啟動,這里以NioEndpoint為例,詳細分析一下請求處理流程。
組件結(jié)構(gòu)圖
先來看一下整個Connector組件的結(jié)構(gòu)圖:

Connector組件的結(jié)構(gòu)圖
在之前的文章中,已經(jīng)介紹了Acceptor、Poller、Worker 等核心組件的初始化過程。下面就這些核心組件一個一個來看。
備注:這個圖是Tomcat7.0版本的,所以Acceptor還在NioEndpoint中,而在9.0版本,Acceptor已經(jīng)單獨提出去了。
NioEndpoint介紹
- 主要屬性
/**
* 線程安全的非阻塞selector池
*/
private NioSelectorPool selectorPool = new NioSelectorPool();
/**
* Server socket "pointer".
*/
private volatile ServerSocketChannel serverSock = null;
private volatile CountDownLatch stopLatch = null;
/**
* PollerEvent緩存
*/
private SynchronizedStack<PollerEvent> eventCache;
/**
* NioChannel緩存,每個NioChannel持有一部分Bytebuffer(一般2個,SSL有4個),
*/
private SynchronizedStack<NioChannel> nioChannels;
/**
* poller線程的默認優(yōu)先級
*/
private int pollerThreadPriority = Thread.NORM_PRIORITY;
/**
* Poller線程數(shù),最多2個
*/
private int pollerThreadCount = Math.min(2, Runtime.getRuntime().availableProcessors());
/**
* selector.select()超時時間
*/
private long selectorTimeout = 1000;
/**
* The socket pollers.
*/
private Poller[] pollers = null;
其他方法介紹后面會說明。
Acceptor接受請求
Acceptor在7.0版本屬于NioEndpoint內(nèi)部類,9.0版本引入了Nio2Endpoit,所以Acceptor被提出來了。
public void run() {
int errorDelay = 0;
// 循環(huán),知道收到shutdown命令
while (endpoint.isRunning()) {
// 如果endpoint暫停,循環(huán)
while (endpoint.isPaused() && endpoint.isRunning()) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!endpoint.isRunning()) {
break;
}
state = AcceptorState.RUNNING;
try {
//如果達到最大連接,等待
endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
if (endpoint.isPaused()) {
continue;
}
U socket = null;
try {
//Acceptor獲取socket
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
//設(shè)置socket,注冊到pollerevent隊列中,如果失敗,關(guān)閉scoket
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
String msg = sm.getString("endpoint.accept.fail");
// APR specific.
// Could push this down but not sure it is worth the trouble.
if (t instanceof Error) {
Error e = (Error) t;
if (e.getError() == 233) {
// Not an error on HP-UX so log as a warning
// so it can be filtered out on that platform
// See bug 50273
log.warn(msg, t);
} else {
log.error(msg, t);
}
} else {
log.error(msg, t);
}
}
}
state = AcceptorState.ENDED;
}
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//1. socket設(shè)置為非阻塞
socket.configureBlocking(false);
Socket sock = socket.socket();
//2. 設(shè)置Socket參數(shù)值(從server.xml的Connector節(jié)點上獲取參數(shù)值)比如Socket發(fā)送、接收的緩存大小、心跳檢測等
socketProperties.setProperties(sock);
//3. 從NioChannel的緩存棧中取出一個NioChannel,NioChannel是SocketChannel的一個的包裝類
NioChannel channel = nioChannels.pop();
//4. 緩存隊列中沒有則新建一個NioChannel,并將SocketChannel關(guān)聯(lián)到從緩存隊列中獲取的NioChannel上來
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
//5. 注冊到Poller中
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
主要流程如下:
- 獲取請求;
- 將socket設(shè)置為非阻塞的;
- 從NioChannel的緩存棧中獲取NioChannel(socket和buf的包裝類),沒有則新建一個。
- 注冊到Poller中。
事件注冊
public void register(final NioChannel socket) {
//1. 設(shè)置關(guān)聯(lián)的poller
socket.setPoller(this);
//2. NioChannel的包裝類,增加了一些控制,讀寫超時時間之類的
NioSocketWrapper socketWrapper = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(socketWrapper);
socketWrapper.setPoller(this);
socketWrapper.setReadTimeout(getConnectionTimeout());
socketWrapper.setWriteTimeout(getConnectionTimeout());
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
socketWrapper.setSecure(isSSLEnabled());
//3. 從pollerevent緩存棧中獲取緩存,如果沒有新建
PollerEvent r = eventCache.pop();
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if (r == null) {
r = new PollerEvent(socket, socketWrapper, OP_REGISTER);
} else {
r.reset(socket, socketWrapper, OP_REGISTER);
}
//4. 添加至pollerevent緩存棧
addEvent(r);
}
主要流程:
- 設(shè)置關(guān)聯(lián)的poller;
- NioChannel進一步包裝;
- 從緩存中獲取或者新建一個pollerevent;
- 放入pollerevent緩存棧中。
PollerEvent流程處理
上面說到NioChannel和NioSocketWrapper會被包裝到PollerEvent,然后添加到PollerEvent隊列中去,我們在這里看一下PollerEvent會做些什么:
public void run() {
//如果socket第一次注冊到selector中,完成對socket讀事件的注冊
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
//尋找selector中的key,如果key為空,連接數(shù)減一,并且socketWrapper.close置位true;否則更新更新socket所感興趣的事件
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socket.socketWrapper.getEndpoint().countDownConnection();
((NioSocketWrapper) socket.socketWrapper).closed = true;
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
// We are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key);
} catch (Exception ignore) {
}
}
}
}
PollerEvent主要用來更新selector所感興趣的事件。
總結(jié)
Connector初步處理請求的流程如下:

初步流程圖
接下來看Poller的流程處理。