一、內核接收數據流程

1.網卡發(fā)現 MAC 地址符合,就將包收進來;發(fā)現 IP 地址符合,根據 IP 頭中協(xié)議項,知道上一層是 TCP 協(xié)議;2.DMA把TCP數據包copy到內核緩沖區(qū);3.觸發(fā)CPU中斷,中斷程序摘除TCP頭通過socket五要素(源IP/PORT、目的IP/PORT、協(xié)議)找到對應的socket文件,并把原始二進制數據報copy到socket接收緩沖區(qū);4.中斷程序喚醒被阻塞的內核線程;5.內核線程切換到用戶線程把數據從socket接口緩沖區(qū)copy到應用內存;
二、中斷處理流程

I/O發(fā)出的信號的異常代碼,拿到異常代碼之后,CPU就會觸發(fā)異常處理的流程。計算機在內存里會保存中斷向量表,用來存放不同的異常代碼對應的異常處理程序所在的地址。
CPU在拿到了異常碼之后,會先把當前的程序執(zhí)行的現場,保存到程序棧里面,然后根據異常碼查詢,找到對應的異常處理程序,最后把后續(xù)指令執(zhí)行的指揮權,交給這個異常處理程序。
異常處理程序結束之后返回到原來指令執(zhí)行的位置繼續(xù)執(zhí)行;
三、阻塞不占用 cpu
網卡何時接收到數據是依賴發(fā)送方和傳輸路徑的,這個延遲通常都很高,是毫秒(ms)級別的。而應用程序處理數據是納秒(ns)級別的。也就是說整個過程中,內核態(tài)等待數據,處理協(xié)議棧是個相對很慢的過程。這么長的時間里,用戶態(tài)的進程是無事可做的,因此用到了“阻塞(掛起)”。
阻塞是進程調度的關鍵一環(huán),指的是進程在等待某事件發(fā)生之前的等待狀態(tài)。請看下表,在 Linux 中,進程狀態(tài)大致有 7 種(在 include/linux/sched.h 中有更多狀態(tài)):

從說明中可以發(fā)現,“可運行狀態(tài)”會占用 CPU 資源,另外創(chuàng)建和銷毀進程也需要占用 CPU 資源(內核)。重點是,當進程被"阻塞/掛起"時,是不會占用 CPU 資源的。
為了支持多任務,Linux 實現了進程調度的功能(CPU 時間片的調度)。而這個時間片的切換,只會在“可運行狀態(tài)”的進程間進行。因此“阻塞/掛起”的進程是不占用 CPU 資源的。
四、工作隊列和等待隊列

工作隊列:為了方便時間片的調度,所有“可運行狀態(tài)”狀態(tài)的進程組成的隊列;fd文件列表:內核打開的文件句柄,Linux一切皆文件,用戶線程執(zhí)行創(chuàng)建Socket時內核就會創(chuàng)建一個由文件系統(tǒng)管理的sock對象;sock:socket內核中的數據結構,主要包含發(fā)送緩沖區(qū)、接收緩沖區(qū)、等待隊列;
struct sock {
__u32 daddr; /* 外部IP地址 */
__u32 rcv_saddr; /* 綁定的本地IP地址 */
__u16 dport; /* 目標端口 */
__u16 sport; /* 源端口 */
unsigned short family; /* 地址簇 */
int rcvbuf; /* 接受緩沖區(qū)長度(單位:字節(jié)) */
struct sk_buff_head receive_queue; /* 接收包隊列 */
int sndbuf; /* 發(fā)送緩沖區(qū)長度(單位:字節(jié)) */
struct sk_buff_head write_queue; /* 包發(fā)送隊列 */
wait_queue_head_t *sleep; /* 等待隊列,通常指向socket的wait域 */
......
}
等待隊列:等待當前socket的線程;
工作隊列中線程執(zhí)行到阻塞操作等待socket時,會從工作隊列中移除,移動到該socket的等待隊列中;當socket接收到數據后,操作系統(tǒng)將該socket等待隊列上的進程重新放回到工作隊列,該進程變成運行狀態(tài),繼續(xù)執(zhí)行代碼。
五、BIO
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
// 沒有連接-阻塞
Socket socket = serverSocket.accept();
byte[] bytes = new byte[1024];
InputStream inputStream = socket.getInputStream();
while (true) {
// 沒有數據-阻塞
int read = inputStream.read(bytes);
if (read != -1) {
System.out.println(new String(bytes, 0, read));
} else {
break;
}
}
socket.close();
}
}
BIO模式存在兩個阻塞點,一個時accept阻塞等待客戶端連接,一個是阻塞等待socket請求數據;簡單跟以下源碼就會發(fā)現
new ServerSocket(9000)最終通過
-
int newfd = socket0(stream, false /*v6 Only*/);調用Linuxint socket(int domain, int type, int protocol);創(chuàng)建服務端socket; -
bind0(nativefd, address, port, exclusiveBind);調用Linuxint bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);綁定端口; -
listen0(nativefd, backlog);調用Linuxint listen(int sockfd, int backlog);設置監(jiān)聽;
接下來serverSocket.accept()最終通過
-
newfd = accept0(nativefd, isaa);調用Linuxint accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);阻塞等待客戶端連接,會創(chuàng)建一個socket用于跟該客戶端進行通信,返回socket文件描述符fd;
最后inputStream.read(bytes);調用Linux ssize_t recv(int sockfd, void *buf, size_t len, int flags);阻塞從socket讀緩沖區(qū)讀取客戶端請求數據;
可以通過 man 命令查看Linux 系統(tǒng)調用方法具體描述;
通過傳統(tǒng)BIO的操作方式可以看出一個請求必須要創(chuàng)建一個內核線程進行處理,recv只能監(jiān)視單個socket,當并發(fā)比較高時就會消耗大量系統(tǒng)資源,也就是所謂的C10K問題;那么如何解決這個問題呢?后面出現的多路復用 select / poll / epoll思路都是使用一個線程來處理若干個連接(監(jiān)視若干個socket),類似餐廳服務員的角色。
六、select
select 方案是用一個 fd_set 結構體來告訴內核同時監(jiān)控多個socket,當其中有socket的狀態(tài)發(fā)生變化或超時,則調用返回。之后應用使用 FD_ISSET 來逐個查看是哪個socket的狀態(tài)發(fā)生了變化。
6.1 select原理
以下面select偽代碼為例:
int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...)
listen(s, ...)
//接受客戶端連接
int c = accept(s, ...)
int readSet[] = 存放需要監(jiān)聽的socket文件描述符
while(1){
int n = select(..., readSet, ...)
for(int i=0; i < readSet.count; i++){
if(FD_ISSET(readSet[i], ...)){
//fds[i]的數據處理
}
}
}
select 系統(tǒng)調用函數
# maxfd:表示的是待測試的描述符基數,它的值是待測試的最大描述符加 1
# readset:讀描述符集合
# writeset:寫描述符集合
# exceptset:錯誤描述符集合
# timeout:超時時間
int select(int maxfd, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);
上面示例代碼中,先準備一個數組 readSet 存放著所有需要監(jiān)視讀事件的socket。然后調用select,如果 readSet 中的所有socket都沒有數據,select會阻塞,直到有一個socket接收到數據,select返回,喚醒線程。用戶可以遍歷 readSet,通過FD_ISSET (對這個數組進行檢測,判斷出對應socket的元素 readSet[fd]是 0 還是 1)判斷具體哪個 socket 收到數據,然后做出處理。
6.2 select流程

線程 A 調用 select readSet 數組元素為sock1、sock2、sock3 ,此時3個socket 都沒有數據可讀,就把線程A 從工作隊列中移除,并分別添加到3個sock 的等待隊列中;

當3個sock中任意一個有數據可讀時,中斷程序都會把線程A 從所有sock等待隊列中移除并重新加入工作隊列,等待cpu時間片繼續(xù)執(zhí)行(即從select中返回)。但是此時線程A 并不知道哪個socket有數據,于是還要遍歷readSet使用 FD_ISSET 來逐個查看是哪個socket的有數據可讀。
6.3 select的缺點
1、每次調用select都需要將線程加入到所有監(jiān)視socket的等待隊列,每次喚醒都需要從每個隊列中移除。這里涉及了兩次遍歷,而且每次都要將整個fd_set列表傳遞給內核,有一定的開銷。正是因為遍歷操作開銷大,出于效率的考量,才會規(guī)定select的最大監(jiān)視數量,默認只能監(jiān)視1024個socket。
2、由于只有一個字段記錄關注和發(fā)生事件,每次調用之前要重新初始化 fd_set 結構體。
3、線程被喚醒后,程序并不知道哪些socket收到數據,還需要遍歷一次。
七、poll
總結以下 select 的缺點就是句柄上限+重復初始化+逐個排查所有文件句柄狀態(tài)效率不高。poll 主要解決 select 的前兩個問題:通過改變跟內核交互的數據結構突破了文件描述符的限制,同時使用不同字段分別標注關注事件和發(fā)生事件,來避免重復初始化。
int poll(struct pollfd *fds, unsigned long nfds, int timeout);
# fd:描述符 fd
# events:描述符上待檢測的事件類型events,注意這里的 events 可以表示多個不同的事件,
# 具體的實現可以通過使用二進制掩碼位操作來完成,例如,POLLIN 和 POLLOUT 可以表示讀和寫事件
# revents:
struct pollfd {
int fd; /* file descriptor */
short events; /* events to look for */
short revents; /* events returned */
};
和 select 非常不同的地方在于:
- poll 每次檢測之后的結果不會修改原來的傳入值,而是將結果保留在 revents 字段中,這樣就不需要每次檢測完都得重置待檢測的描述字和感興趣的事件。
- 在 select 里面,文件描述符的個數已經隨著 fd_set 的實現而固定,沒有辦法對此進行配置;而在 poll 函數里,可以控制 pollfd 結構的數組大小,這意味著可以突破原來 select 函數最大描述符的限制,在這種情況下,應用程序調用者需要分配 pollfd 數組并通知 poll 函數該數組的大小。
八、epoll
epoll 使用 eventpoll 作為中間層,線程不用加入在被監(jiān)視的每一個 socket 阻塞隊列中,也不用再遍歷 socket 列表查看哪些有事件發(fā)生。
epoll提供了三個函數:
# 創(chuàng)建了一個 epoll 實例 epollevent
int epoll_create(int size);
# 往這個 epoll 實例增加或刪除監(jiān)控的事件
# epfd:epoll_create 創(chuàng)建的 epoll 實例句柄
# op:增加還是刪除一個監(jiān)控事件
# fd:注冊的事件的文件描述符
# event:注冊的事件類型,并且可以在這個結構體里設置用戶需要的數據
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
# 類似之前的 poll 和 select 函數,調用者進程被掛起,在等待內核 I/O 事件的分發(fā)
# epfd:實例描述字,也就是 epoll 句柄
# events:用戶空間需要處理的 I/O 事件,這是一個數組,數組的大小由 epoll_wait 的返回值決定,這個數組的每個元素都是一個需要待處理的 I/O 事件,其中 events 表示具體的事件類型,事件類型取值和 epoll_ctl 可設置的值一樣,這個 epoll_event 結構體里的 data 值就是在 epoll_ctl 那里設置的 data,也就是用戶空間和內核空間調用時需要的數據
# maxevents:大于 0 的整數,表示 epoll_wait 可以返回的最大事件值
# timeout:阻塞調用的超時值,如果這個值設置為 -1,表示不超時;如果設置為 0 則立即返回,即使沒有任何 I/O 事件發(fā)生
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
8.1 代碼示例
public class ServerConnect {
private static final int BUF_SIZE = 1024;
private static final int PORT = 8080;
private static final int TIMEOUT = 3000;
public static void main(String[] args) {
selector();
}
public static void selector() {
Selector selector = null;
ServerSocketChannel ssc = null;
try {
// 打開一個Channel
ssc = ServerSocketChannel.open();
// 將Channel綁定端口
ssc.socket().bind(new InetSocketAddress(PORT));
// 設置Channel為非阻塞,如果設置為阻塞,其實和BIO差不多了。
ssc.configureBlocking(false);
// 打開一個Slectore
selector = Selector.open();
// 向selector中注冊Channel和感興趣的事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// selector監(jiān)聽事件,select會被阻塞,直到selector監(jiān)聽的channel中有事件發(fā)生或者超時,會返回一個事件數量
//TIMEOUT就是超時時間,selector初始化的時候會添加一個用于主動喚醒的pipe,待會源碼分析會說
if (selector.select(TIMEOUT) == 0) {
System.out.println("==");
continue;
}
/**
* SelectionKey的組成是selector和Channel
* 有事件發(fā)生的channel會被包裝成selectionKey添加到selector的publicSelectedKeys屬性中
* publicSelectedKeys是SelectionKey的Set集合
*下面這一部分遍歷,就是遍歷有事件的channel
*/
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
handleAccept(key);
}
if (key.isReadable()) {
handleRead(key);
}
if (key.isWritable() && key.isValid()) {
handleWrite(key);
}
if (key.isConnectable()) {
System.out.println("isConnectable = true");
}
//每次使用完,必須將該SelectionKey移除,否則會一直存儲在publicSelectedKeys中
//下一次遍歷又會重復處理
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (selector != null) {
selector.close();
}
if (ssc != null) {
ssc.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(BUF_SIZE));
}
public static void handleRead(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
long bytesRead = sc.read(buf);
while (bytesRead > 0) {
buf.flip();
while (buf.hasRemaining()) {
System.out.print((char) buf.get());
}
System.out.println();
buf.clear();
bytesRead = sc.read(buf);
}
if (bytesRead == -1) {
sc.close();
}
}
public static void handleWrite(SelectionKey key) throws IOException {
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip();
SocketChannel sc = (SocketChannel) key.channel();
while (buf.hasRemaining()) {
sc.write(buf);
}
buf.compact();
}
}
調用到內核的流程如下:
1、ssc = ServerSocketChannel.open() -> EPollSelectorProvider -> int socket(int domain, int type, int protocol);
2、ssc.socket().bind(newInetSocketAddress(PORT)); -> int bind(int sockfd, const struct sockaddr addr, socklen_t addrlen);
3、getImpl().listen(backlog); -> int listen(int sockfd, int backlog);
4、ssc.configureBlocking(false); -> int fcntl(int fd, int cmd, ... / arg */ );
5、selector=Selector.open(); -> EPollSelectorImpl epollCreate() -> int epoll_create(int size);
6、ssc.register(selector,SelectionKey.OP_ACCEPT); -> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
while
7、selector.select(TIMEOUT) -> int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
8.2 epoll流程

如圖,eventpoll 主要包含3個結構
-
監(jiān)視隊列:epoll_ctl添加或刪除所要監(jiān)聽的socket。例如:圖中通過epoll_ctl添加sock1、sock2的監(jiān)視,內核會將eventpoll添加到這socket的等待隊列中(圖中虛線)。為了方便的添加和移除,還要便于搜索,以避免重復添加,使用紅黑樹結構進行存儲(搜索、插入和刪除時間復雜度都是O(log(N)),效率較好。 -
等待隊列:eventpoll也是一個文件句柄,因此也有等待隊列,記錄阻塞的線程,例如:圖中線程A 執(zhí)行了 epoll_wait 操作,被從工作隊列中移除,然后被eventpoll 的等待隊列引用; -
就緒隊列:當某個socket有事件發(fā)生時,中斷處理程序就會把該socket加入到就緒隊列,同時喚醒eventpoll 阻塞隊列中的線程,此時線程只需要遍歷就緒隊列就可以知道哪個socket有事件發(fā)生,例如:圖中sock2有數據到來時,中斷處理程序先把sock2 放入就緒隊列中,然后喚醒等待隊列中的線程A,這時線程A 被重新加入工作隊列中,等到CPU時間片輪詢到線程A時,遍歷就緒隊列中的socket進行處理。
九、總結
select,poll,epoll都是IO多路復用機制,即可以同時監(jiān)視多個文件描述符,一旦某個描述符就緒(讀或寫就緒),能夠通知程序進行相應讀寫操作
- select,poll需要多次遍歷文件描述符集合 fd_set,把阻塞線程放到每一個socket中,當某個描述符就緒時再把這些線程從socket阻塞隊列中移除;而epoll 通過eventpoll作為中介,只需要把eventpoll放到socket 的阻塞隊列中,當有描述符就緒時只需要遍歷就緒隊列,而不需要遍歷所有fd;
- select,poll每次調用都要把整個fd_set集合從用戶態(tài)往內核態(tài)拷貝一次,而epoll只要一次拷貝,這也能節(jié)省不少的開銷。