前言
距離上一次發(fā)布文章將近半年左右了,具體為什么停更,說實話一部分原因是去年10月1放假之后我玩瘋了....另外一部原因是總感覺文章寫到一定地步之后,我有點不知道寫什么了,去年主要更新的是Spring源碼系列的文章,我的主要精力也放在了Spring相關(guān)源碼的研究上,Spring源碼系列的文章,到現(xiàn)在為止,大體也告一段落了,后續(xù)是準備出一版關(guān)于Netty相關(guān)的系列文章,過年的時候著重研究了下!上個圖:

后續(xù)會慢慢更新!我們回歸正題!
一、為什么必須去了解NIO
首先你需要之后Netty的主要實現(xiàn)手段就是Nio,很多人一直學不明白Netty,根本原因是 除了日常開發(fā)中很難能夠?qū)嵺`,很大一部分原因是不熟悉NIO,事實上真正熟悉了NIO和它背后的原理之后,去查看Netty的源碼就有如神助!我們今天就從最基本的IO、以及NIO學起!
二、操作系統(tǒng)是如何定義I/O的
I/O相關(guān)的操作,詳細各位從事java的人員并不陌生,顧名思義也就是Input/Output,對應著連個動詞,Read/Write 讀寫兩個動作,但是在上層系統(tǒng)應用中無論是讀還是寫,操作系統(tǒng)都不會直接的操作物理機磁盤數(shù)據(jù),而是由系統(tǒng)內(nèi)核加載磁盤數(shù)據(jù)!我們以Read為例,當程序中發(fā)起了一個Read請求后,操作系統(tǒng)會將數(shù)據(jù)從內(nèi)核緩沖區(qū)加載到用戶緩沖區(qū),如果內(nèi)核緩沖區(qū)內(nèi)沒有數(shù)據(jù),內(nèi)核會將該次讀請求追加到請求隊列,當內(nèi)核將磁盤數(shù)據(jù)讀取到內(nèi)核緩沖區(qū)后,再次執(zhí)行讀請求,將內(nèi)核緩沖區(qū)的數(shù)據(jù)復制到用戶緩沖區(qū),繼而返回給上層應用系統(tǒng)!

write請求也是類似于上圖的情況,用戶進程寫入到用戶緩沖區(qū),復制到內(nèi)核緩沖區(qū),然后當數(shù)據(jù)到達一定量級之后由內(nèi)核寫入到網(wǎng)口或者磁盤文件!
假設(shè)我們以Socket服務端為例,我們口述一下一個完整的讀寫操作的流程:
- 客戶端發(fā)送一個數(shù)據(jù)到網(wǎng)卡,由操作系統(tǒng)內(nèi)核將數(shù)據(jù)復制到內(nèi)核緩沖區(qū)!
- 當用戶進程發(fā)起read請求后,將數(shù)據(jù)從內(nèi)核緩沖區(qū)復制到用戶緩沖區(qū)!
- 用戶緩沖區(qū)獲取到數(shù)據(jù)之后程序開始進行業(yè)務處理!處理完成后,調(diào)用Write請求,將數(shù)據(jù)從用戶緩沖區(qū)寫入到內(nèi)核緩沖區(qū)!
- 系統(tǒng)內(nèi)核將數(shù)據(jù)從內(nèi)核緩沖區(qū)寫入到網(wǎng)卡,通過底層的通訊協(xié)議發(fā)送到客戶端!
三、網(wǎng)絡(luò)編程中的IO模型
本文旨在讓初學者先大致了解一下基本原理,所以這里并不會涉及到太多代碼,具體的實現(xiàn)邏輯,可以關(guān)注后續(xù)源碼分析的時候的文章,這里只做一個鋪墊,為日后的學習做一個比較好的鋪墊!
1. 同步阻塞I/O
I. 傳統(tǒng)的阻塞IO模型

這種模型是單線程應用,服務端監(jiān)聽客戶端連接,當監(jiān)聽到客戶端的連接后立即去做業(yè)務邏輯的處理,該次請求沒有處理完成之前,服務端接收到的其他連接全部阻塞不可操作!當然開發(fā)中,我們也不會這樣寫,這種寫法只會存在于協(xié)議demo中!這種寫法的缺陷在哪呢?
我們看圖發(fā)現(xiàn),當一個新連接被接入后,其他客戶端的連接全部處于阻塞狀態(tài),那么當該客戶端處理客戶端時間過長的時候,會導致阻塞的客戶端連接越來越多導致系統(tǒng)崩潰,我們是否能夠找到一個辦法,使其能夠?qū)I(yè)務處理與Accept接收新連接分離開來!這樣業(yè)務處理不影響新連接接入就能夠解決該問題!
II. 偽異步阻塞IO模型

這種業(yè)務模型是是對上一步單線程模型的一種優(yōu)化,當一個新連接接入后,獲取到這個鏈接的Socket,交給一條新的線程去處理,主程序繼續(xù)接收下一個新連接,這樣就能夠解決同一時間只能處理一個新連接的問題,但是,明眼人都能看出來,這樣有一個很致命的問題,這種模型處理小并發(fā)短時間可能不會出現(xiàn)問題,但是假設(shè)有10w連接接入,我需要開啟10w個線程,這樣會把系統(tǒng)直接壓崩!我們需要限制線程的數(shù)量,那么肯定就會想到線程池,我們來優(yōu)化一下這個模型吧!
III. 優(yōu)化偽異步阻塞IO模型

這個模型是JDK1.4之前,沒有NIO的時候的一個經(jīng)典Socket模型,服務端接收到客戶端新連接會后,將Socket連接以及業(yè)務邏輯包裝為任務提交到線程池,由線程池開始執(zhí)行,同時服務端繼續(xù)接收新連接!這樣能夠解決上一步因為線程爆炸所引發(fā)的問題,但是我們回想下線程池的的提交步驟:當核心線程池滿了之后會將任務放置到隊列,當隊列滿了之后,會占用最大線程數(shù)的數(shù)量繼續(xù)開啟線程,當達到最大線程數(shù)的時候開始拒絕策略! 證明我最大的并發(fā)數(shù)只有1500個,其余的都在隊列里面占1024個,假設(shè)現(xiàn)在的連接數(shù)是1w個,并且使用的是丟棄策略,那么會有近6000的連接任務被丟棄掉,而且1500個線程,線程之間的切換也是一個特別大的開銷!這是一個致命的問題!
上述的三種模型除了有上述的問題之外,還有一個特別致命的問題,他是阻塞的!
在哪里阻塞的呢?
- 連接的時候,當沒有客戶端連接的時候是阻塞的!沒有客戶端連接的時候,線程只能傻傻的阻塞在哪里等待新連接接入!
- 等待數(shù)據(jù)寫入的時候是阻塞的,當一個新連接接入后但是不寫入數(shù)據(jù),那么線程會一直等待數(shù)據(jù)寫入,直到數(shù)據(jù)寫入完成后才會停止阻塞! 假設(shè)我們使用 優(yōu)化后的偽異步線程模型 ,1000個連接可能只有 100個連接會頻繁寫入數(shù)據(jù),剩余900個連接都很少寫入,那么就會有900個線程在傻傻等待客戶端寫入數(shù)據(jù),所以,這也是一個很嚴重的性能開銷!
現(xiàn)在我們總結(jié)一下上述模型的問題:
- 線程開銷浪費嚴重!
- 線程間的切換頻繁,效率低下!
- read/write執(zhí)行的時候會進行阻塞!
- accept會阻塞等待新連接
那么,我們是否有一種方案,用很少的線程去管理成千上萬的連接,read/write會阻塞進程,那么就會進入到下面的模型
2. 同步非阻塞I/O
同步非阻塞I/O模型就必須使用java NIO來實現(xiàn)了,看一段簡單的代碼:
public static void main(String[] args) throws IOException {
//新接連池
List<SocketChannel> socketChannelList = new ArrayList<>(8);
//開啟服務端Socket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8098));
//設(shè)置為非阻塞
serverSocketChannel.configureBlocking(false);
while (true) {
//探測新連接,由于設(shè)置了非阻塞,這里即使沒有新連接也不會阻塞,而是直接返回null
SocketChannel socketChannel = serverSocketChannel.accept();
//當返回值不為null的時候,證明存在新連接
if(socketChannel!=null){
System.out.println("新連接接入");
//將客戶端設(shè)置為非阻塞 這樣read/write不會阻塞
socketChannel.configureBlocking(false);
//將新連接加入到線程池
socketChannelList.add(socketChannel);
}
//迭代器遍歷連接池
Iterator<SocketChannel> iterator = socketChannelList.iterator();
while (iterator.hasNext()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
SocketChannel channel = iterator.next();
//讀取客戶端數(shù)據(jù) 當客戶端數(shù)據(jù)沒有寫入完成的時候也不會阻塞,長度為0
int read = channel.read(byteBuffer);
if(read > 0) {
//當存在數(shù)據(jù)的時候打印數(shù)據(jù)
System.out.println(new String(byteBuffer.array()));
}else if(read == -1) {
//客戶端退出的時候刪除該連接
iterator.remove();
System.out.println("斷開連接");
}
}
}
}
上述代碼我們可以看到一個關(guān)鍵的邏輯:serverSocketChannel.configureBlocking(false); 這里被設(shè)置為非阻塞的時候無論是 accept還是read/write都不會阻塞!具體的為什么會非阻塞,我放到文章后面說,我們看一下這種的實現(xiàn)邏輯有什么問題!

看這里,我們似乎的確使用了一條線程處理了所有的連接以及讀寫操作,但是假設(shè)我們有10w連接,活躍連接(經(jīng)常read/write)只有1000,但是我們這個線程需要每次否輪詢10w條數(shù)據(jù)處理,極大的消耗了CPU!
我們期待什么? 期待的是,每次輪詢值輪詢有數(shù)據(jù)的Channel, 沒有數(shù)據(jù)的就不管他,比如剛剛的例子,只有1000個活躍連接,那么每次就只輪詢這1000個,其他的有讀寫了有數(shù)據(jù)就輪詢,沒讀寫就不輪詢!
3. 多路復用模型
多路復用模型是JAVA NIO 推薦使用的經(jīng)典模型,內(nèi)部通過 Selector進行事件選擇,Selector事件選擇通過系統(tǒng)實現(xiàn),具體流程看一段代碼:
public static void main(String[] args) throws IOException {
//開啟服務端Socket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8098));
//設(shè)置為非阻塞
serverSocketChannel.configureBlocking(false);
//開啟一個選擇器
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 阻塞等待需要處理的事件發(fā)生
selector.select();
// 獲取selector中注冊的全部事件的 SelectionKey 實例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//獲取已經(jīng)準備完成的key
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
//當發(fā)現(xiàn)連接事件
if(next.isAcceptable()) {
//獲取客戶端連接
SocketChannel socketChannel = serverSocketChannel.accept();
//設(shè)置非阻塞
socketChannel.configureBlocking(false);
//將該客戶端連接注冊進選擇器 并關(guān)注讀事件
socketChannel.register(selector, SelectionKey.OP_READ);
//如果是讀事件
}else if(next.isReadable()){
ByteBuffer allocate = ByteBuffer.allocate(128);
//獲取與此key唯一綁定的channel
SocketChannel channel = (SocketChannel) next.channel();
//開始讀取數(shù)據(jù)
int read = channel.read(allocate);
if(read > 0){
System.out.println(new String(allocate.array()));
}else if(read == -1){
System.out.println("斷開連接");
channel.close();
}
}
//刪除這個事件
iterator.remove();
}
}
}
相比上面的同步非阻塞IO,這里多了一個selector選擇器,能夠?qū)﹃P(guān)注不同事件的Socket進行注冊,后續(xù)如果關(guān)注的事件滿足了條件的話,就將該socket放回到到里面,等待客戶端輪詢!

NIO底層在JDK1.4版本是用linux的內(nèi)核函數(shù)select()或poll()來實現(xiàn),跟上面的NioServer代碼類似,selector每次都會輪詢所有的sockchannel看下哪個channel有讀寫事件,有的話就處理,沒有就繼續(xù)遍歷,JDK1.5開始引入了epoll基于事件響應機制來優(yōu)化NIO,首先我們會將我們的SocketChannel注冊到對應的選擇器上并選擇關(guān)注的事件,后續(xù)操作系統(tǒng)會根據(jù)我們設(shè)置的感興趣的事件將完成的事件SocketChannel放回到選擇器中,等待用戶的處理!那么它能夠解決上述的問題嗎?
肯定是可以的,因為上面的一個同步非阻塞I/O痛點在于CPU總是在做很多無用的輪詢,在這個模型里被解決了!這個模型從selector中獲取到的Channel全部是就緒的,后續(xù)只需要也就是說他每次輪詢都不會做無用功!
深入 底層概念解析
select模型
如果要深入分析NIO的底層我們需要逐步的分析,首先,我們需要了解一種叫做select()函數(shù)的模型,它是什么呢?他也是NIO所使用的多路復用的模型之一,是JDK1.4的時候所使用的一種模型,他是epoll模型之前所普遍使用的一種模型,他的效率不高,但是當時被普遍使用,后來才會被人優(yōu)化為epoll!
他是如何做到多路復用的呢?如圖:
- 首先我們需要了解操作系統(tǒng)有一個叫做工作隊列的概念,由CPU輪流執(zhí)行工作隊列里面的進程,我們平時書寫的Socket服務端客戶端程序也是存在于工作隊列的進程中,只要它存在于工作隊列,它就會被CPU調(diào)用執(zhí)行!我們下文將該網(wǎng)絡(luò)程序稱之為進程A

-
他的內(nèi)部會維護一個 Socket列表,當調(diào)用系統(tǒng)函數(shù)select(socket[])的時候,操作系統(tǒng)會將進程A加入到Socket列表中的每一個Socket的等待隊列中,同時將進程A從工作隊列移除,此時,進程A處于阻塞狀態(tài)!
image-20210310223709483 -
當網(wǎng)卡接收到數(shù)據(jù)之后,觸發(fā)操作系統(tǒng)的中斷程序,根據(jù)該程序的Socket端口取對應的Socket列表中尋找該進程A,并將進程A從所有的Socket列表中的等待隊列移除,并加入到操作系統(tǒng)的工作隊列!
image-20210310223932406 -
此時進程A被喚醒,此時知道至少有一個Socket存在數(shù)據(jù),開始依次遍歷所有的Socket,尋找存在數(shù)據(jù)的Socket并進行后續(xù)的業(yè)務操作
image-20210310224109432
該種結(jié)構(gòu)的核心思想是,我先讓所有的Socket都持有這個進程A的引用,當操作系統(tǒng)觸發(fā)Socket中斷之后,基于端口尋找到對應的Socket,就能夠找到該Socket對應的進程,再基于進程,就能夠找到所有被監(jiān)控的Socket! 要注意,當進程A被喚醒,就證明一件事,操作系統(tǒng)發(fā)生了Socket中斷,就至少有一個Socket的數(shù)據(jù)準備就緒,只需要將所有的Socket遍歷,就能夠找到并處理本次客戶端傳入的數(shù)據(jù)!
但是,你會發(fā)現(xiàn),這種操作極為繁瑣,中間似乎存在了很多遍歷,先將進程A加入的所有的Socket等待隊列需要遍歷一次,發(fā)生中斷之后需要遍歷一次Socket列表,將所有對于進程A的引用移除,并將進程A的引用加入到工作隊列!因為此時進程A并不知道哪一個Socket是有數(shù)據(jù)的,所以,由需要再次遍歷一遍Socket列表,才能真正的處理數(shù)據(jù),整個操作總共遍歷了3此Socket,為了保證性能,所以1.4版本種,最多只能監(jiān)控1024個Socket,去掉標準輸出輸出和錯誤輸出只剩下1021個,因為如果Socket過多勢必造成每次遍歷消耗性能極大!
epoll模型
epoll總共分為三個比較重要的函數(shù):
- epoll_create 對應JDK NIO代碼種的Selector.open()
- epoll_ctl 對應JDK NIO代碼中的socketChannel.register(selector,xxxx);
- epoll_wait 對應JDK NIO代碼中的 selector.select();
感興趣的可以下載一個open-jdk-8u的源代碼,也可以關(guān)注公眾號回復openJdk獲取源碼壓縮包!
他是如何優(yōu)化select的呢?
-
epoll_create:這些系統(tǒng)調(diào)用將返回一個非負文件描述符,他也和Socket一樣,存在一個等待隊列,但是,他還存在一個就緒隊列!
image-20210310231234730 -
epoll_ctl :添加Socket的監(jiān)視,對應Java中將SocketChannel注冊到Selector中,他會將創(chuàng)建的文件描述符的引用添加到Socket的等待隊列!這點比較難理解,注意是將EPFD(Epoll文件描述符)放到Socket的等待隊列!
image-20210310231305931 -
當操作系統(tǒng)發(fā)生中斷程序后,基于端口號(客戶端的端口號是唯一的)尋找到對應的Socket,獲取到EPFD的引用,將該Socket的引用加入到EPFD的就序列表!
image-20210310232256771 epoll_wait:查看EPFD的就緒列表是否存在Socket的引用,如果存在就直接返回,不存在就將進程A加入到EPFD的等待隊列,并移除進程A再工作隊列的引用!


-
當網(wǎng)卡再次接收到數(shù)據(jù),發(fā)生中斷,進行上述步驟,將該Socket的因引用加入到就序列表,并喚醒進程A,移除該EPFD等待隊列的進程A,將進程A加入到工作隊列,程序繼續(xù)執(zhí)行!
image-20210310232111376
4. 異步非阻塞I/O
異步非阻塞模型是用戶應用只需要發(fā)出對應的事件,并注冊對應的回調(diào)函數(shù),由操作系統(tǒng)完成后,回調(diào)回調(diào)函數(shù),完成具體的約為操作!先看一段代碼
public static void main(String[] args) throws Exception {
final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000));
//監(jiān)聽連接事件,并注冊回調(diào)
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
try {
System.out.println("2--"+Thread.currentThread().getName());
// 再此接收客戶端連接,如果不寫這行代碼后面的客戶端連接連不上服務端
serverChannel.accept(attachment, this);
System.out.println(socketChannel.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(1024);
//監(jiān)聽read事件并注冊回調(diào)
socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
System.out.println("3--"+Thread.currentThread().getName());
buffer.flip();
System.out.println(new String(buffer.array(), 0, result));
//向客戶端回寫一個數(shù)據(jù)
socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));
}
//發(fā)生錯誤調(diào)這個
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
exc.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
//發(fā)生錯誤調(diào)這個
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
System.out.println("1--"+Thread.currentThread().getName());
Thread.sleep(Integer.MAX_VALUE);
}
}
AIO客戶端
public static void main(String... args) throws Exception {
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9000)).get();
socketChannel.write(ByteBuffer.wrap("HelloServer".getBytes()));
ByteBuffer buffer = ByteBuffer.allocate(512);
Integer len = socketChannel.read(buffer).get();
if (len != -1) {
System.out.println("客戶端收到信息:" + new String(buffer.array(), 0, len));
}
}

原諒我畫圖功底,整體邏輯就是,告訴系統(tǒng)我要關(guān)注一個連接的事件,如果有連接事件就調(diào)用我注冊的這個回調(diào)函數(shù),回調(diào)函數(shù)中獲取到客戶端的連接,然后再次注冊一個read請求,告訴系統(tǒng),如果有可讀的數(shù)據(jù)就調(diào)用我注冊的這個回調(diào)函數(shù)!當存在數(shù)據(jù)的時候,執(zhí)行read回調(diào),并寫出數(shù)據(jù)!
為什么Netty使用NIO而不是AIO?
在Linux系統(tǒng)上,AIO的底層實現(xiàn)仍使用Epoll,沒有很好實現(xiàn)AIO,因此在性能上沒有明顯的優(yōu)勢,而且被JDK封裝了一層不容易深度優(yōu)化,Linux上AIO還不夠成熟。Netty是異步非阻塞框架,Netty在NIO上做了很多異步的封裝。簡單來說,現(xiàn)在的AIO實現(xiàn)比較雞肋!






