概述
Tomcat涉及的內(nèi)容較多,本文主要分析其關(guān)鍵部分的高并發(fā)設(shè)計。
主要內(nèi)容
- IO模型概述
- Tomcat的IO模型實(shí)現(xiàn)
- Tomcat的并發(fā)控制
IO模型概述
從操作系統(tǒng)層面看服務(wù)端的數(shù)據(jù)交互流程:

如圖所示,當(dāng)服務(wù)端收到請求后,用戶空間的用戶線程會發(fā)起read調(diào)用,這個read調(diào)用依賴兩個過程:從網(wǎng)卡拷貝數(shù)據(jù)到內(nèi)核空間、從內(nèi)核空間拷貝數(shù)據(jù)到用戶空間,write調(diào)用是反向的兩個步驟。
那在處理這兩個耗時的Copy操作時,用戶線程是占著CPU還是出讓CPU?怎樣可以更高效的處理數(shù)據(jù)?不同IO模型的作用就在這里體現(xiàn)出來了。
同步阻塞IO
用戶線程發(fā)起read調(diào)用后就進(jìn)入阻塞(出讓CPU),等待read執(zhí)行完(數(shù)據(jù)拷貝到用戶空間)后再喚醒。

同步非阻塞IO
用戶線程發(fā)起read調(diào)用后不進(jìn)入阻塞,而是持續(xù)不斷的發(fā)起read輪詢,在數(shù)據(jù)拷貝到內(nèi)核空間之前read調(diào)用都返回false,數(shù)據(jù)拷貝到內(nèi)核空間后,read開始阻塞,等待數(shù)據(jù)從內(nèi)核空間拷貝的用戶空間后再喚醒用戶線程。

IO多路復(fù)用
用戶線程讀取數(shù)據(jù)分成兩步,先輪詢調(diào)用select詢問數(shù)據(jù)是否已到內(nèi)核,如果到了則調(diào)用read讀取數(shù)據(jù),此時read調(diào)用會等待數(shù)據(jù)從內(nèi)核空間拷貝到用戶空間,這個過程是阻塞的。而這里多路復(fù)用的意思是:<font color="#cc0000">一次select可以得到多個channel的結(jié)果。</font>

異步IO
用戶線程在調(diào)用read時注冊一個回調(diào)函數(shù),當(dāng)數(shù)據(jù)到用戶空間后會調(diào)用此回調(diào)通知用戶線程,這個過程用戶線程不阻塞。

本文介紹的Tomcat9默認(rèn)是<font color="#cc0000">基于IO多路復(fù)用模型</font>做的的高并發(fā)設(shè)計。
Tomcat的IO模型實(shí)現(xiàn)
假設(shè)我們把Tomcat當(dāng)做黑盒子,按照Spring的方式來處理請求,簡化的流程是這樣的:

Tomcat負(fù)責(zé)讀取內(nèi)核的數(shù)據(jù),轉(zhuǎn)換成Servlet對象,然后由Spring框架處理業(yè)務(wù)后通過Response對象寫入返回數(shù)據(jù),Tomcat再將返回數(shù)據(jù)通過內(nèi)核寫入網(wǎng)卡,最后返回到客戶端。
接下來我們將Tomcat這部分放大,看看黑盒子里是怎么處理的。

如圖所示,請求的處理分如下幾個步驟:
- Tomcat在啟動時會初始化一個ServerSocket用于監(jiān)聽指定端口的IO請求(比如8080)
- 接著啟動Acceptor線程,循環(huán)調(diào)用accept方法接收IO請求(TCP連接建立)
- 將ServerChannel包裝成PollerEvent,注冊到Poller的event隊列中
- Poller線程循環(huán)遍歷event隊列,將Poller關(guān)注的ServerChannel的READ操作注冊到Selector中
- 在同一個Poller循環(huán)中,用Selector查詢ServerChannel的狀態(tài),這里一次可以查詢到多個ServerChannel的狀態(tài),即<font color="#cc0000">多路復(fù)用</font>
- 將查詢到的SelectionKey對應(yīng)的ServerChannel挨個創(chuàng)建SocketProcessor,SocketProcessor是Runnable的
- 將SocketProcessor扔到工作線程進(jìn)行處理
- 接下來的協(xié)議解析、Request和Response適配、Servlet處理、業(yè)務(wù)處理等都在SocketProcessor的流程中
涉及的主要代碼:
- Acceptor線程:Acceptor.run->NioEndpoint.setSocketOptions->NioEndpoint.register->Poller.addEvent
- Poller線程:Poller.run->Poller.events->PollerEvent.run->Poller.processKey->AbstractEndpoint.processSocket
下一節(jié)我們再對部分源碼進(jìn)行分析。
Tomcat的并發(fā)控制
上面分析了Tomcat的IO多路復(fù)用模型實(shí)現(xiàn),在這個實(shí)現(xiàn)中每一個環(huán)節(jié)都有影響并發(fā)的關(guān)鍵控制,先看圖:

內(nèi)核-Accept List
TCP三次握手建立連接的過程中,內(nèi)核會為每一個LISTEN狀態(tài)的Socket維護(hù)兩個隊列:
- SYN隊列:這些連接已收到客戶端的SYN
- ACCEPT隊列:這些連接已經(jīng)收到客戶端的ACK,完成了三次握手,等待被系統(tǒng)accept調(diào)用取走
Tomcat的Acceptor負(fù)責(zé)從ACCEPT隊列中取走連接,當(dāng)Acceptor處理不過來時,連接就堆積在ACCEPT隊列中,這個隊列長度由<font color="#cc0000">acceptCount(默認(rèn)100)</font>控制,當(dāng)我們嘗試把a(bǔ)cceptCount設(shè)置的很小,發(fā)起并發(fā)請求時,會收到一個Socket Error。
// 初始化服務(wù)端端口監(jiān)聽-NioEndpoint.initServerSocket
protected void initServerSocket() throws Exception {
if (!getUseInheritedChannel()) {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
// 綁定acceptCount,可以通過配置server.tomcat.accept-count調(diào)整大小
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
serverSock.configureBlocking(true); //mimic APR behavior
}
Acceptor-LimitLatch
并發(fā)處理請求的最大連接控制器,內(nèi)部通過AQS實(shí)現(xiàn)等待。即如果服務(wù)端請求處理不過來,并發(fā)的請求數(shù)量超過了<font color="#cc0000">maxConnections(默認(rèn)8192)</font>,則會進(jìn)入等待。當(dāng)Acceptor線程接收出現(xiàn)異常,Socket被異常關(guān)閉或者SocketProessor處理完后會回收連接數(shù)。
// Acceptor線程執(zhí)行邏輯-Acceptor.run
public void run() {
// Loop until we receive a shutdown command
while (endpoint.isRunning()) {
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
// LimitLatch控制最大并發(fā)連接數(shù),如果達(dá)到最大連接數(shù)則進(jìn)入等待,直到有空余連接數(shù)
endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
if (endpoint.isPaused()) {
continue;
}
U socket = null;
try {
// Accept the next incoming connection from the server
// socket
// 調(diào)用accept方法從內(nèi)核提取IO請求
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
// 發(fā)生錯誤則回收連接數(shù)
endpoint.countDownConnection();
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
}
}
state = AcceptorState.ENDED;
}
那怎么知道當(dāng)前有多少連接呢?可以用<font color="#cc0000">lsof</font>查看占用8080端口的文件
lsof -i :8080
也可以用<font color="#cc0000">netstat</font>,需要注意的是,查看當(dāng)前正在通信的連接需要篩選ESTABLISHED
netstat -anp | grep 8080 | grep ESTABLISHED
Poller-PollerEvent Queue
是一個Tomcat自己實(shí)現(xiàn)的輕量級的同步隊列<font color="#cc0000">SynchronizedQueue</font>,默認(rèn)大小是128,超過后會自動擴(kuò)容到當(dāng)前的兩倍。主要特點(diǎn)是很輕量,內(nèi)部用了System.copy提升性能,對GC很友好。
// 注冊PollerEvent到SynchronizedQueue-NioEndpoint.register
public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent r = null;
if (eventCache != null) {
r = eventCache.pop();
}
if (r == null) {
// 包裝PollerEvent
r = new PollerEvent(socket, OP_REGISTER);
} else {
r.reset(socket, OP_REGISTER);
}
// 往Poller.events(SynchronizedQueue)里加入PollerEvent
addEvent(r);
}
//往Selector注冊感興趣的事件-PollerEvent.run
public void run() {
if (interestOps == OP_REGISTER) {
try {
// 往Selector注冊感興趣的數(shù)據(jù)可讀事件
socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(), SelectionKey.OP_READ, socket.getSocketWrapper());
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
}
}
// Poller線程執(zhí)行邏輯-Poller.run
public void run() {
while (true) {
boolean hasEvents = false;
try {
if (!close) {
// events()方法內(nèi)部會遍歷PollerEvent隊列,并往Selector注冊事件
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
// If we are here, means we have other stuff to do
// Do a non blocking select
keyCount = selector.selectNow();
} else {
// 當(dāng)數(shù)據(jù)可讀時,selector會從readylist中返回可讀的個數(shù),一次可以查詢到多個Channel的數(shù)據(jù)狀態(tài)
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
} catch (Throwable x) {
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (iterator != null && iterator.hasNext()) {
// 輪詢數(shù)據(jù)準(zhǔn)備好的Channel
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (socketWrapper == null) {
iterator.remove();
} else {
iterator.remove();
// 執(zhí)行數(shù)據(jù)解析和后續(xù)的業(yè)務(wù)處理
processKey(sk, socketWrapper);
}
}
}
}
Execotor-ThreadPoolExecutor、TaskQueue
為了性能最大化,Tomcat擴(kuò)展了默認(rèn)線程池策略和線程池隊列。
<font color="#cc0000">當(dāng)線程池達(dá)到最大數(shù)量時,不會立即執(zhí)行拒絕,而是再次嘗試向任務(wù)隊列添加任務(wù),如果還是添加不進(jìn)去才執(zhí)行拒絕</font>:
// 線程池擴(kuò)展-org.apache.tomcat.util.threads.ThreadPoolExecutor.execute
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// 當(dāng)達(dá)到最大線程數(shù)時,會嘗試把任務(wù)放到隊列,如果還是放不進(jìn)去才會拒絕
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
Tomcat用的線程池隊列是<font color="#cc0000">LinkedBlockingQueue</font>,默認(rèn)是用的無界模式,于是造成一個問題:當(dāng)線程數(shù)達(dá)到核心線程數(shù)后,任務(wù)可以無限往隊列添加,就不會再創(chuàng)建新線程了。Tomcat在往隊列添加任務(wù)的邏輯中增加了<font color="#cc0000">maximumPoolSize</font>的干預(yù),<font color="#cc0000">使得在線程數(shù)未達(dá)到maximumPoolSize時任務(wù)添加不進(jìn)去</font>,進(jìn)而可以新建線程。
// 線程池隊列擴(kuò)展-TaskQueue.offer
public boolean offer(Runnable o) {
// 此方法被調(diào)用說明:當(dāng)前線程數(shù)已經(jīng)大于核心線程數(shù)
if (parent==null) return super.offer(o);
// 線程數(shù)等于最大線程數(shù)時,不能再創(chuàng)建新線程,將任務(wù)放入隊列
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
// 當(dāng)前線程數(shù)大于核心線程數(shù),并且小于最大線程數(shù)
// 如果任務(wù)數(shù)小于線程數(shù),表名有空閑線程,不需要新建,放入隊列
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
// 如果任務(wù)數(shù)大于線程數(shù),且線程數(shù)小于最大線程數(shù),此時應(yīng)該創(chuàng)建新線程
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
// 其他情況放入隊列
return super.offer(o);
}
總結(jié)
- IO模型介紹
- Tomcat實(shí)現(xiàn)的IO多路復(fù)用
- Acceptor、Poller、Executor
- Accept List、LimitLatch、SynchronizedQueue、線程池擴(kuò)展