
轉(zhuǎn)自 Madman
Synopsis: Richard Stevens所著的《UNIX? Network Programming Volume 1, Third Edition: The Sockets Networking》一書中,6.2章節(jié)"IO Models"列出了五種IO模型,本文將詳細(xì)介紹這幾種IO模型,并說明阻塞(blocking)和非阻塞(non-blocking)的區(qū)別、同步(synchronous)和異步(asynchronous)的區(qū)別
1. 預(yù)備知識
1.1 CPU Ring
對于操作系統(tǒng)而言,穩(wěn)定且可靠地運(yùn)行是最重要的。現(xiàn)行技術(shù)方案是將內(nèi)核與用戶進(jìn)程、用戶進(jìn)程與用戶進(jìn)程之間進(jìn)行分離,內(nèi)核可以管理用戶進(jìn)程,但是用戶進(jìn)程之間不能互相干擾,更不能"侵入"內(nèi)核,即使用戶的程序崩潰了,內(nèi)核也不會(huì)受影響。
為了提升計(jì)算機(jī)安全,避免惡意操作,CPU的硬件機(jī)制中一般將使用權(quán)劃分為四個(gè)特權(quán)級別:

Ring 0級別最高,可以執(zhí)行一切指令,包括像清空內(nèi)存、磁盤I/O操作等特權(quán)指令(privilege instruction)和其它非特權(quán)指令,內(nèi)核代碼運(yùn)行在這個(gè)模式下;Ring 3級別最低,只能執(zhí)行非特權(quán)指令,用戶進(jìn)程運(yùn)行在這個(gè)模式下。所以CPU模式(CPU models)可以劃分為:
-
kernel mode,也叫內(nèi)核態(tài) -
user mode,也叫用戶態(tài)
計(jì)算機(jī)開機(jī)啟動(dòng)后,首先會(huì)加載內(nèi)核,由于占了先機(jī),操作系統(tǒng)內(nèi)核將自己設(shè)置為最高級別,而之后創(chuàng)建的用戶進(jìn)程都設(shè)置為最低級別。這樣內(nèi)核就能控制CPU、內(nèi)存、磁盤等一切資源,而用戶進(jìn)程不能直接使用這些資源。例如,如果用戶進(jìn)程可以直接使用磁盤,就沒必要在內(nèi)核中實(shí)現(xiàn)一套文件系統(tǒng)的權(quán)限管理了
1.2 Kernel space vs. User space
不管是內(nèi)核代碼還是用戶程序代碼都需要加載到內(nèi)存中,如果不對內(nèi)存進(jìn)行管理,就會(huì)出現(xiàn)用戶代碼之間、用戶代碼與內(nèi)核之間出現(xiàn)被覆蓋的情況,所以內(nèi)核將內(nèi)存劃分成兩部分:
-
內(nèi)核空間(kernel space): 內(nèi)核代碼的運(yùn)行空間 -
用戶空間(user space): 用戶應(yīng)用程序代碼的運(yùn)行空間

用戶進(jìn)程只能訪問用戶空間,而內(nèi)核可以訪問所有內(nèi)存。因?yàn)閮?nèi)核已將用戶進(jìn)程設(shè)置為最低級別,它只能運(yùn)行在CPU的Ring 3上,所以如果用戶進(jìn)程要進(jìn)行磁盤I/O或網(wǎng)絡(luò)I/O時(shí),只能通過系統(tǒng)調(diào)用(system call)將請求發(fā)給內(nèi)核,由內(nèi)核代為執(zhí)行相應(yīng)的指令(CPU模式由用戶態(tài)轉(zhuǎn)成內(nèi)核態(tài)),數(shù)據(jù)會(huì)先緩存到內(nèi)核空間中,然后內(nèi)核將數(shù)據(jù)從內(nèi)核空間拷貝到用戶空間中,之后用戶進(jìn)程才能繼續(xù)處理數(shù)據(jù)(CPU模式由內(nèi)核態(tài)轉(zhuǎn)成用戶態(tài))

1.3 blocking vs. non-blocking
-
阻塞(blocking): 用戶進(jìn)程在等待某個(gè)操作完成期間,自身無法繼續(xù)干別的事情,則稱進(jìn)程在該操作上是阻塞的。Blocking I/O means that the calling system does not return control to the caller until the operation is finished -
非阻塞(non-blocking): 用戶進(jìn)程在等待某個(gè)操作完成期間,自身可以繼續(xù)干別的事情,則稱進(jìn)程在該操作上是非阻塞的。A non-blocking synchronous call returns control to the caller immediately. The caller is not made to wait, and the invoked system immediately returns one of two responses: If the call was executed and the results are ready, then the caller is told of that. Alternatively, the invoked system can tell the caller that the system has no resources (no data in the socket) to perform the requested action. In that case, it is the responsibility of the caller may repeat the call until it succeeds. For example, aread()operation on a socket in non-blocking mode may return the number of read bytes or a special return code -1 with errno set toEWOULBLOCK/EAGAIN, meaning "not ready; try again later."
1.4 Synchronous I/O vs. Asynchronous I/O
- A
synchronous I/Ooperation causes the requesting process to be blocked until that I/O operation completes. - An
asynchronous I/Ooperation does not cause the requesting process to be blocked.
如果用戶進(jìn)程因?yàn)镮/O操作而阻塞的話,那就是同步I/O,否則是異步I/O。后續(xù)要介紹的blocking I/O、nonblocking I/O、I/O multiplexing、signal driven I/O這四種I/O模型都是同步I/O,因?yàn)榈诙A段中的真正I/O操作(比如,recvfrom)會(huì)阻塞用戶進(jìn)程,只有asynchronous I/O模型才是異步I/O
2. Unix中五種I/O模型
blocking I/Ononblocking I/O-
I/O multiplexing(select, poll, epoll, kqueque) -
signal driven I/O(SIGIO) -
asynchronous I/O(the POSIX aio_read functions)
例如,用戶進(jìn)程要讀取網(wǎng)絡(luò)數(shù)據(jù)或磁盤數(shù)據(jù)時(shí)(Input),數(shù)據(jù)會(huì)經(jīng)過兩個(gè)階段:
-
內(nèi)核空間等待數(shù)據(jù)準(zhǔn)備完成。Waiting for the data to be ready - 將數(shù)據(jù)從
內(nèi)核空間拷貝到用戶空間中。Copying the data from the kernel to the process
網(wǎng)絡(luò)套接字的輸入操作第一步是等待網(wǎng)絡(luò)數(shù)據(jù)包(對CPU來說耗時(shí)特別久),當(dāng)數(shù)據(jù)包到達(dá)時(shí),它先被復(fù)制到內(nèi)核的緩沖區(qū)中,第二步是將這些數(shù)據(jù)從內(nèi)核的緩沖區(qū)復(fù)制到我們的應(yīng)用程序緩沖區(qū)中(速度快)。上述五種I/O模型的區(qū)別就在于I/O經(jīng)歷的兩個(gè)階段的不同上
2.1 blocking I/O
默認(rèn)情況下,所有套接字都是阻塞的。下圖中我們使用UDP的數(shù)據(jù)報(bào)套接字來說明網(wǎng)絡(luò)I/O的兩個(gè)階段:

首先是我們的用戶進(jìn)程運(yùn)行(左邊),當(dāng)需要獲取網(wǎng)絡(luò)數(shù)據(jù)報(bào)(datagram)時(shí),用戶進(jìn)程只能通過recvfrom系統(tǒng)調(diào)用將請求發(fā)給內(nèi)核,然后在內(nèi)核中運(yùn)行(右邊)
用戶進(jìn)程在兩個(gè)階段都是阻塞的,這期間不能做任何其它事情,直到數(shù)據(jù)被拷貝到用戶空間(或發(fā)生錯(cuò)誤,如系統(tǒng)調(diào)用被信號中斷)后,我們的應(yīng)用程序才能夠繼續(xù)處理數(shù)據(jù)報(bào)。即用戶進(jìn)程從調(diào)用recvfrom到有數(shù)據(jù)返回的整個(gè)時(shí)間內(nèi),進(jìn)程都是被阻塞的,所以它是同步I/O
舉例來說,如果要下載1000張圖片,用阻塞I/O(blocking I/O)模型的話,必須依序下載,在等待第1張圖片數(shù)據(jù)時(shí),整個(gè)用戶進(jìn)程被阻塞,只有第1張圖片數(shù)據(jù)到達(dá)內(nèi)核,并復(fù)制到用戶空間后,才能保存到本地磁盤,然后依次類推,下載其它圖片
(1) 單進(jìn)程TCP Server
如果Web服務(wù)器采用這種模式的話,那么一次只能為一個(gè)客戶服務(wù)(注意:當(dāng)服務(wù)器為這個(gè)客戶服務(wù)的時(shí)候,只要服務(wù)器的listen隊(duì)列還有空閑,那么當(dāng)其它新的客戶端發(fā)起連接后,服務(wù)器就會(huì)為新客戶端建立連接,并且新客戶端也可以發(fā)送數(shù)據(jù),但服務(wù)器還不會(huì)處理。只有當(dāng)?shù)?個(gè)客戶關(guān)閉連接后,服務(wù)器才會(huì)一次性將第2個(gè)客戶發(fā)送的所有數(shù)據(jù)接收完,并繼續(xù)只為第2個(gè)客戶服務(wù),依次類推):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server,單進(jìn)程,阻塞 blocking I/O
import socket
# 創(chuàng)建監(jiān)聽socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# socket默認(rèn)不支持地址復(fù)用,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 綁定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)
# socket默認(rèn)是主動(dòng)連接,調(diào)用listen()函數(shù)將socket變?yōu)楸粍?dòng)連接,這樣就可以接收客戶端連接了
server_sock.listen(5)
try:
while True:
print('Main Process, waiting for client connection...')
# client_sock是專為這個(gè)客戶端服務(wù)的socket,client_addr是包含客戶端IP和端口的元組
client_sock, client_addr = server_sock.accept()
print('Client {} is connected'.format(client_addr))
try:
while True:
# 接收客戶端發(fā)來的數(shù)據(jù),阻塞,直到有數(shù)據(jù)到來
# 事實(shí)上,除非當(dāng)前客戶端關(guān)閉后,才會(huì)跳轉(zhuǎn)到外層的while循環(huán),即一次只能服務(wù)一個(gè)客戶
# 如果客戶端關(guān)閉了連接,data是空字符串
data = client_sock.recv(4096)
if data:
print('Received {}({} bytes) from {}'.format(data, len(data), client_addr))
# 返回響應(yīng)數(shù)據(jù),將客戶端發(fā)送來的數(shù)據(jù)原樣返回
client_sock.send(data)
print('Sent {} to {}'.format(data, client_addr))
else:
print('Client {} is closed'.format(client_addr))
break
finally:
# 關(guān)閉為這個(gè)客戶端服務(wù)的socket
client_sock.close()
finally:
# 關(guān)閉監(jiān)聽socket,不再響應(yīng)其它客戶端連接
server_sock.close()
TCP客戶端測試代碼如下,可以觀察如果指定50個(gè)客戶端時(shí),服務(wù)端輸出結(jié)果和客戶端輸出結(jié)果:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
from datetime import datetime
import socket
server_ip = input('Please enter the TCP server ip: ')
server_port = int(input('Enter the TCP server port: '))
client_num = int(input('Enter the TCP clients count: '))
# 保存所有已成功連接的客戶端TCP socket
client_socks = []
for i in range(client_num):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((server_ip, server_port))
client_socks.append(sock)
print('Client {}[ID: {}] has connected to {}'.format(sock, i, (server_ip, server_port)))
while True:
for s in client_socks:
data = str(datetime.now()).encode('utf-8')
s.send(data)
print('Client {} has sent {} to {}'.format(s, data, (server_ip, server_port)))
# 睡眠3秒后,繼續(xù)讓每個(gè)客戶端連接向TCP Server發(fā)送數(shù)據(jù)
time.sleep(3)
Windows平臺也可以下載 Hercules SETUP utility,先打開一個(gè)Hercules使用TCP Client去連接服務(wù)器,如果再打開一個(gè)Hercules,可以發(fā)現(xiàn),也能夠連接且可以發(fā)送數(shù)據(jù),但服務(wù)器不會(huì)處理數(shù)據(jù)也就不會(huì)返回(此時(shí),在Linux服務(wù)器上
watch -n 1 'netstat|grep tcp查看TCP連接的狀態(tài)有很多SYN_RECV)
(2) 多進(jìn)程TCP Server
由于上面單進(jìn)程版本中,client_sock.recv(4096)會(huì)一直阻塞,所以實(shí)際上并不能跳轉(zhuǎn)到外層while循環(huán)中去為其它新的客戶端創(chuàng)建socket,只能一次為一個(gè)客戶服務(wù)。這根本滿足不了實(shí)際應(yīng)用需要,為了實(shí)現(xiàn)并發(fā)處理多個(gè)客戶端請求,可以使用多進(jìn)程,應(yīng)用程序的主進(jìn)程只負(fù)責(zé)為每一個(gè)新的客戶端連接創(chuàng)建socket,然后為每個(gè)客戶創(chuàng)建一個(gè)子進(jìn)程,用來分別處理每個(gè)客戶的數(shù)據(jù):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server,多進(jìn)程,阻塞 blocking I/O
import os
import socket
from multiprocessing import Process
def client_handler(client_sock, client_addr):
'''接收各個(gè)客戶端發(fā)來的數(shù)據(jù),并原樣返回'''
try:
while True:
# 接收客戶端發(fā)來的數(shù)據(jù),阻塞,直到有數(shù)據(jù)到來
# 如果客戶端關(guān)閉了連接,data是空字符串
data = client_sock.recv(4096)
if data:
print('Child Process [PID: {}], received {}({} bytes) from {}'.format(os.getpid(), data, len(data), client_addr))
# 返回響應(yīng)數(shù)據(jù),將客戶端發(fā)送來的數(shù)據(jù)原樣返回
client_sock.send(data)
print('Child Process [PID: {}], sent {} to {}'.format(os.getpid(), data, client_addr))
else:
print('Child Process [PID: {}], client {} is closed'.format(os.getpid(), client_addr))
break
except:
# 如果客戶端強(qiáng)制關(guān)閉連接,會(huì)報(bào)異常: ConnectionResetError: [Errno 104] Connection reset by peer
pass
finally:
# 關(guān)閉為這個(gè)客戶端服務(wù)的socket
client_sock.close()
# 創(chuàng)建監(jiān)聽socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# socket默認(rèn)不支持地址復(fù)用,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 綁定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)
# socket默認(rèn)是主動(dòng)連接,調(diào)用listen()函數(shù)將socket變?yōu)楸粍?dòng)連接,這樣就可以接收客戶端連接了
server_sock.listen(5)
try:
while True:
print('Main Process [PID: {}], waiting for client connection...'.format(os.getpid()))
# 主進(jìn)程只用來負(fù)責(zé)監(jiān)聽新的客戶連接
# client_sock是專為這個(gè)客戶端服務(wù)的socket,client_addr是包含客戶端IP和端口的元組
client_sock, client_addr = server_sock.accept()
print('Main Process [PID: {}], client {} is connected'.format(os.getpid(), client_addr))
# 為每個(gè)新的客戶連接創(chuàng)建一個(gè)子進(jìn)程,用來處理客戶數(shù)據(jù)
client = Process(target=client_handler, args=(client_sock, client_addr))
client.start()
# 子進(jìn)程已經(jīng)復(fù)制了一份client_sock,所以主進(jìn)程中可以關(guān)閉此client_sock
client_sock.close()
finally:
# 關(guān)閉監(jiān)聽socket,不再響應(yīng)其它客戶端連接
server_sock.close()
(3) 多線程TCP Server
上面多進(jìn)程版本的問題在于,為每個(gè)客戶端連接都分別創(chuàng)建一個(gè)進(jìn)程,如果同時(shí)有10000個(gè)客戶連接,操作系統(tǒng)不可能創(chuàng)建10000個(gè)進(jìn)程,那樣系統(tǒng)開銷會(huì)非常大,內(nèi)存會(huì)被耗盡,導(dǎo)致系統(tǒng)崩潰。就算沒有崩潰,使用了虛擬內(nèi)存,那么性能將急劇下降。同時(shí),這么多個(gè)進(jìn)程,CPU進(jìn)行進(jìn)程間切換(上下文切換)的代價(jià)也無比巨大,最終的結(jié)果就是大部分時(shí)間都花在進(jìn)程切換上了,而為客戶提供服務(wù)的時(shí)間幾乎沒有
雖然可以使用進(jìn)程池concurrent.futures.ProcessPoolExecutor創(chuàng)建固定數(shù)量的進(jìn)程,一旦有客戶端關(guān)閉了連接后,對應(yīng)的進(jìn)程就可以重新為下一個(gè)新的客戶連接服務(wù),但是多進(jìn)程間的上下文切換的代價(jià)還是太大
多線程版本比多進(jìn)程版本的系統(tǒng)開銷小幾個(gè)數(shù)量級,操作系統(tǒng)可以同時(shí)開啟更多的線程,而線程間的調(diào)度切換比多進(jìn)程也小很多:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server,多線程,阻塞 blocking I/O
import socket
import threading
def client_handler(client_sock, client_addr):
'''接收各個(gè)客戶端發(fā)來的數(shù)據(jù),并原樣返回'''
try:
while True:
# 接收客戶端發(fā)來的數(shù)據(jù),阻塞,直到有數(shù)據(jù)到來
# 如果客戶端關(guān)閉了連接,data是空字符串
data = client_sock.recv(4096)
if data:
print('Child Thread [{}], received {}({} bytes) from {}'.format(threading.current_thread().name, data, len(data), client_addr))
# 返回響應(yīng)數(shù)據(jù),將客戶端發(fā)送來的數(shù)據(jù)原樣返回
client_sock.send(data)
print('Child Thread [{}], sent {} to {}'.format(threading.current_thread().name, data, client_addr))
else:
print('Child Thread [{}], client {} is closed'.format(threading.current_thread().name, client_addr))
break
except:
# 如果客戶端強(qiáng)制關(guān)閉連接,會(huì)報(bào)異常: ConnectionResetError: [Errno 104] Connection reset by peer
pass
finally:
# 關(guān)閉為這個(gè)客戶端服務(wù)的socket
client_sock.close()
# 創(chuàng)建監(jiān)聽socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# socket默認(rèn)不支持地址復(fù)用,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 綁定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)
# socket默認(rèn)是主動(dòng)連接,調(diào)用listen()函數(shù)將socket變?yōu)楸粍?dòng)連接,這樣就可以接收客戶端連接了
server_sock.listen(5)
try:
while True:
print('Main Thread [{}], waiting for client connection...'.format(threading.current_thread().name))
# 主進(jìn)程只用來負(fù)責(zé)監(jiān)聽新的客戶連接
# client_sock是專為這個(gè)客戶端服務(wù)的socket,client_addr是包含客戶端IP和端口的元組
client_sock, client_addr = server_sock.accept()
print('Main Thread [{}], client {} is connected'.format(threading.current_thread().name, client_addr))
# 為每個(gè)新的客戶連接創(chuàng)建一個(gè)線程,用來處理客戶數(shù)據(jù)
client = threading.Thread(target=client_handler, args=(client_sock, client_addr))
client.start()
# 因?yàn)橹骶€程與子線程共享client_sock,所以在主線程中不能關(guān)閉client_sock
# client_sock.close()
finally:
# 關(guān)閉監(jiān)聽socket,不再響應(yīng)其它客戶端連接
server_sock.close()
也可以使用線程池concurrent.futures.ThreadPoolExecutor實(shí)現(xiàn)
2.2 nonblocking I/O
當(dāng)我們將socket設(shè)置為nonblocking時(shí),相當(dāng)于告訴內(nèi)核:when an I/O operation that I request cannot be completed without putting the process to sleep, do not put the process to sleep, but return an error instead.
下圖中,前三次系統(tǒng)調(diào)用時(shí),數(shù)據(jù)報(bào)還沒有到達(dá),所以內(nèi)核立即返回一個(gè)叫EWOULDBLOCK的錯(cuò)誤。第四次系統(tǒng)調(diào)用時(shí),一個(gè)數(shù)據(jù)報(bào)已經(jīng)到達(dá),所以內(nèi)核不會(huì)立即返回,當(dāng)數(shù)據(jù)報(bào)從內(nèi)核空間復(fù)制到用戶空間后,recvfrom系統(tǒng)調(diào)用返回成功信息給用戶進(jìn)程,然后用戶進(jìn)程就能處理數(shù)據(jù)了:

像這種在 nonblocking socket 上重復(fù)調(diào)用recvfrom被稱為輪詢(polling),用戶進(jìn)程不斷輪詢內(nèi)核以查看某些操作是否已準(zhǔn)備就緒(忙等待,busy-waiting),這通常很浪費(fèi)CPU時(shí)間(一般網(wǎng)絡(luò)I/O的第一階段數(shù)據(jù)到達(dá)內(nèi)核前會(huì)非常慢,如果用戶進(jìn)程頻繁且不必要的向內(nèi)核發(fā)起系統(tǒng)調(diào)用,CPU就會(huì)不斷地在用戶態(tài)和內(nèi)核態(tài)之間切換,即用戶進(jìn)程不斷地睡眠、被喚醒,這將極大浪費(fèi)CPU資源)。如果有1000個(gè)socket,就算平均每個(gè)socket你第4次系統(tǒng)調(diào)用時(shí)告知數(shù)據(jù)到達(dá),你也要進(jìn)行4000次系統(tǒng)調(diào)用,太恐怖了,所以一般不會(huì)使用這種I/O模型
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server,單進(jìn)程,非阻塞 nonblocking I/O
import socket
# 用來保存所有已成功連接的客戶端,每個(gè)列表元素是client_sock和client_addr組成的元組
clients = []
# 創(chuàng)建監(jiān)聽socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# socket默認(rèn)不支持地址復(fù)用,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 綁定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)
# socket默認(rèn)是主動(dòng)連接,調(diào)用listen()函數(shù)將socket變?yōu)楸粍?dòng)連接,這樣就可以接收客戶端連接了
server_sock.listen(5)
# 將監(jiān)聽用的server_sock設(shè)置為非阻塞
server_sock.setblocking(False)
print('Main Process, waiting for client connection...')
try:
while True:
try:
# client_sock是專為這個(gè)客戶端服務(wù)的socket,client_addr是包含客戶端IP和端口的元組
client_sock, client_addr = server_sock.accept()
except:
# server_sock設(shè)置為非堵塞后,如果accept時(shí),恰巧沒有客戶端connect,那么accept會(huì)產(chǎn)生一個(gè)異常
pass
else:
print('Client {} is connected'.format(client_addr))
# 將新的客戶端連接socket也設(shè)置為非阻塞
client_sock.setblocking(False)
# 添加到client_socks列表中
clients.append((client_sock, client_addr))
# 循環(huán)處理每個(gè)客戶端連接
for client_sock, client_addr in clients:
try:
data = client_sock.recv(4096)
if data:
print('Received {}({} bytes) from {}'.format(data, len(data), client_addr))
# 返回響應(yīng)數(shù)據(jù),將客戶端發(fā)送來的數(shù)據(jù)原樣返回
client_sock.send(data)
print('Sent {} to {}'.format(data, client_addr))
else:
print('Client {} is closed'.format(client_addr))
# 關(guān)閉為這個(gè)客戶端服務(wù)的socket
client_sock.close()
# 從列表中刪除
clients.remove((client_sock, client_addr))
except:
# client_sock設(shè)置為非堵塞后,如果recv時(shí),恰巧客戶端沒有發(fā)送數(shù)據(jù)過來,將會(huì)產(chǎn)生一個(gè)異常
pass
finally:
# 關(guān)閉監(jiān)聽socket,不再響應(yīng)其它客戶端連接
server_sock.close()
上面的代碼中,while循環(huán)先在監(jiān)聽socket上accept(),如果有數(shù)據(jù)表示此時(shí)有新的客戶端連接,不管怎樣都要進(jìn)行一次系統(tǒng)調(diào)用。然后,for循環(huán)依次遍歷各個(gè)客戶連接socket,查看是否有哪個(gè)客戶發(fā)來數(shù)據(jù),有多少個(gè)客戶就發(fā)起多少次系統(tǒng)調(diào)用。所以,這種I/O模型非常浪費(fèi)CPU時(shí)間
第一階段不會(huì)阻塞,但數(shù)據(jù)到達(dá)內(nèi)核空間后,第二階段從內(nèi)核空間復(fù)制到用戶空間時(shí)會(huì)阻塞,所以它是同步I/O
2.3 I/O multiplexing
blocking I/O使用多線程技術(shù),當(dāng)并發(fā)客戶端達(dá)到千萬級時(shí),操作系統(tǒng)不可能同時(shí)創(chuàng)建這么多線程。nonblocking I/O中用戶進(jìn)程不斷polling(輪詢)內(nèi)核,浪費(fèi)CPU資源。
每個(gè)網(wǎng)絡(luò)socket在類Unix操作系統(tǒng)中都是一個(gè)文件描述符(FD, file descriptor),如果操作系統(tǒng)能提供一種機(jī)制,只需要單個(gè)線程就可以同時(shí)監(jiān)視多個(gè)文件描述符,當(dāng)一個(gè)或多個(gè)描述符就緒(一般是讀就緒或?qū)懢途w)時(shí),就通知應(yīng)用程序進(jìn)行相應(yīng)的讀寫操作的話,那么效率將大幅提升
于是,現(xiàn)代操作系統(tǒng)底層陸續(xù)實(shí)現(xiàn)了各自的I/O多路復(fù)用(I/O multiplexing)機(jī)制,multiplexing是指在單個(gè)線程中調(diào)用一次select/poll/epoll等函數(shù),可以同時(shí)監(jiān)視多個(gè)文件描述符上的可讀取事件或可寫入事件,一旦這些事件發(fā)生,內(nèi)核通知用戶進(jìn)程,然后用戶進(jìn)程調(diào)用recvfrom等函數(shù)進(jìn)行讀寫(真正的I/O操作),這樣就能實(shí)現(xiàn)在單個(gè)線程中對多個(gè)文件描述符進(jìn)行并發(fā)讀寫(I/O multiplexing也叫event driven I/O - 事件驅(qū)動(dòng)I/O),Multiplexing Wiki: https://en.wikipedia.org/wiki/Multiplexing,可以參考電氣工程中的 "時(shí)分復(fù)用" 圖進(jìn)行理解:

下圖中,第一階段阻塞在select系統(tǒng)調(diào)用上,第二階段阻塞在recvfrom系統(tǒng)調(diào)用上,所以它是同步I/O:

(1) select
I/O多路復(fù)用概念被提出以后,1983年左右,在BSD里面最早實(shí)現(xiàn)了select,目前幾乎所有的操作系統(tǒng)都提供了select函數(shù),具體實(shí)現(xiàn)可以參考:http://man7.org/linux/man-pages/man2/select.2.html 和 《UNIX? Network Programming Volume 1, Third Edition: The Sockets Networking》的6.8章節(jié)示例
select的缺點(diǎn):
- 能夠監(jiān)視的文件描述符最大數(shù)量有限制,在Linux中默認(rèn)是1024個(gè),定義在
FD_SETSIZE中 - 當(dāng)一個(gè)或多個(gè)文件描述就緒后,
select函數(shù)返回已就緒的文件描述符的數(shù)目(超時(shí)返回0,出錯(cuò)返回-1),用戶并不知道哪些描述符可讀、哪些描述符可寫,還要通過FD_ISSET()宏去輪詢所有的文件描述符(假設(shè)描述符3可讀,內(nèi)核會(huì)修改fd_set *readset中描述符3對應(yīng)的位bit為1,那么FD_ISSET(3, &readset)會(huì)返回1,表示描述符3可讀),才能知道哪個(gè)描述符可讀寫。如果是最后一個(gè)文件描述符就緒,也會(huì)從頭開始線性遍歷至結(jié)尾,時(shí)間復(fù)雜度為O(n),這樣會(huì)很浪費(fèi)CPU資源 - 因?yàn)閮?nèi)核會(huì)修改文件描述符集合的位bit,所以用戶進(jìn)程每次調(diào)用
select函數(shù),都要重新復(fù)制初始化過的文件描述符集合給內(nèi)核,當(dāng)描述符數(shù)據(jù)量大時(shí),效率低
(2) poll
http://man7.org/linux/man-pages/man2/poll.2.html
因?yàn)樗褂?strong>鏈表數(shù)據(jù)結(jié)構(gòu)pollfd表示待監(jiān)視的文件描述符,所以沒有1024個(gè)最大文件描述符的限制。但是,poll返回后,還是要通過輪詢所有文件描述符才能獲取哪些描述符可讀、哪些描述符可寫,時(shí)間復(fù)雜度為O(n)。隨著文件描述符的增加,性能會(huì)線性下降
(3) epoll
http://man7.org/linux/man-pages/man7/epoll.7.html
select和poll都有一個(gè)缺點(diǎn),需要將要監(jiān)視的文件描述符集合在用戶空間和內(nèi)核空間之間來回復(fù)制,當(dāng)有描述符就緒后,又需要遍歷整個(gè)描述符集合才能得知哪些可讀寫,當(dāng)描述符很多時(shí),性能就越差。epoll于Linux 2.6內(nèi)核開始引入,是為了處理大批量文件描述符而作了改進(jìn)的poll(只支持Linux)
epoll的相關(guān)函數(shù):
-
int epoll_create(int size): 創(chuàng)建一個(gè)epoll實(shí)例,返回代表這個(gè)實(shí)例的文件描述符。自從Linux 2.6.8之后,size參數(shù)被忽略 -
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event):epoll的事件注冊函數(shù),它可以動(dòng)態(tài)修改要監(jiān)視的文件描述符及對應(yīng)的事件。第一個(gè)參數(shù)是epoll_create()的返回值;第二個(gè)參數(shù)表示注冊處理動(dòng)作,用三個(gè)宏來表示:EPOLL_CTL_ADD(注冊新的fd到epfd中)、EPOLL_CTL_MOD(修改已經(jīng)注冊的fd的監(jiān)聽事件)、EPOLL_CTL_DEL(從epfd中刪除一個(gè)fd);第三個(gè)參數(shù)是需要監(jiān)聽的fd;第四個(gè)參數(shù)是告訴內(nèi)核需要監(jiān)聽什么事,比如EPOLLIN(可讀)、EPOLLOUT(可寫) -
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout): 用戶進(jìn)程調(diào)用epoll_wait()函數(shù)后會(huì)被阻塞,直到要監(jiān)聽的I/O事件發(fā)生,返回就緒的文件描述符數(shù)目(超時(shí)返回0)。這個(gè)步驟相當(dāng)于調(diào)用select或poll,但是不需要傳遞描述符集合給內(nèi)核,因?yàn)閮?nèi)核已經(jīng)在步驟2中通過epoll_ctl()拿到了要監(jiān)視的描述符列表。epoll采用基于事件的就緒通知方式,一旦某個(gè)文件描述符就緒時(shí),內(nèi)核會(huì)采用類似callback的回調(diào)機(jī)制,將就緒的文件描述符放到就緒鏈表里面,當(dāng)用戶進(jìn)程調(diào)用epoll_wait()時(shí)便得到通知。另外,epoll使用了Linux的mmap(內(nèi)存映射技術(shù)),內(nèi)核不需要復(fù)制就緒鏈表給用戶空間,用戶進(jìn)程可以直接讀取就緒鏈表中的文件描述符
epoll的優(yōu)點(diǎn):
- 沒有最大文件描述符的限制,1GB內(nèi)存大約支持10萬左右的連接數(shù),可運(yùn)行
cat /proc/sys/fs/file-max進(jìn)行查看 - I/O效率不隨fd數(shù)目增加而線性下降,比如有10萬個(gè)連接,由于長連接和網(wǎng)絡(luò)傳輸延時(shí)等原因,同一時(shí)刻可能只有少部分是"活躍"的(有數(shù)據(jù)可讀寫),如果采用
select或poll,每次都從頭至尾線性掃描全部描述符集合。而epoll只關(guān)心"活躍"的連接,而跟連接總數(shù)無關(guān)(因?yàn)閮?nèi)核只會(huì)為就緒的描述符調(diào)用callback),時(shí)間復(fù)雜度為O(1) - 使用
mmap技術(shù),加速內(nèi)核與用戶空間的消息傳遞,調(diào)用epoll_create后,內(nèi)核就已經(jīng)在內(nèi)核態(tài)開始準(zhǔn)備幫你存儲(chǔ)要監(jiān)控的文件描述符了,每次調(diào)用epoll_ctl只是在往內(nèi)核的數(shù)據(jù)結(jié)構(gòu)里塞入新的描述符
(4) select vs. poll vs. epoll
| select | poll | epoll | |
|---|---|---|---|
| 最大連接數(shù) | 1024(x86)或 2048(x64) | 無上限 | 無上限 |
| 存儲(chǔ)fds | 數(shù)組 | 鏈表 | 紅黑樹 |
| 傳遞fds | 每次調(diào)用select,都需要將fds在用戶空間和內(nèi)核空間復(fù)制2次 |
每次調(diào)用poll,都需要將fds在用戶空間和內(nèi)核空間復(fù)制2次 |
調(diào)用epoll_ctl時(shí)注冊fd到內(nèi)核中紅黑樹上,之后每次epoll_wait無需復(fù)制fds |
| 獲取就緒的描述符 | 遍歷 | 遍歷 | 回調(diào)函數(shù) |
| I/O效率 | 每次調(diào)用select都進(jìn)行線性遍歷,時(shí)間復(fù)雜度為O(n) |
每次調(diào)用poll都進(jìn)行線性遍歷,時(shí)間復(fù)雜度為O(n) |
基于事件通知方式,每當(dāng)fd就緒,內(nèi)核注冊的回調(diào)函數(shù)就會(huì)被調(diào)用,將就緒fd放到就緒鏈表里,時(shí)間復(fù)雜度為O(1) |
- select / poll / epoll: practical difference for system architects
- LINUX – IO MULTIPLEXING – SELECT VS POLL VS EPOLL
(5) Python 3.4+ selectors模塊
Python select模塊提供了select/poll/epoll/kqueue等函數(shù),它是底層模塊,用戶可以更精細(xì)地自主選擇使用哪個(gè)I/O multiplexing接口。而Python 3.4版本加入了selectors高級模塊,它基于select,但提供了統(tǒng)一的接口,會(huì)自動(dòng)選擇當(dāng)前操作系統(tǒng)中最優(yōu)化的I/O多路復(fù)用接口,比如在Linux系統(tǒng)中,它會(huì)優(yōu)先使用epoll,在BSD中使用kqueque
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import selectors
import socket
# 自動(dòng)選擇當(dāng)前OS中最優(yōu)的I/O multiplexing接口,Linux中會(huì)使用selectors.EpollSelector
sel = selectors.DefaultSelector()
def accept(sock, mask):
'''監(jiān)聽套接字創(chuàng)建新的客戶端連接'''
conn, addr = sock.accept() # Should be ready
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read) # 將新的客戶端socket注冊到epoll實(shí)例上,并監(jiān)聽讀事件
def read(conn, mask):
'''接收客戶端數(shù)據(jù),并原樣返回'''
data = conn.recv(1000) # Should be ready
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(('', 9090))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
2.4 signal driven I/O
我們也可以使用信號,告訴內(nèi)核在描述符就緒時(shí)用SIGIO信號通知我們,我們稱之為信號驅(qū)動(dòng)的I/O,如下圖所示:

應(yīng)用程序首先創(chuàng)建socket,并提供信號處理器(signal handler),然后發(fā)起sigaction系統(tǒng)調(diào)用,該調(diào)用會(huì)立即返回,用戶進(jìn)程不會(huì)被阻塞,可以繼續(xù)往下執(zhí)行。當(dāng)socket準(zhǔn)備就緒可以讀取數(shù)據(jù)時(shí),內(nèi)核產(chǎn)生SIGIO信號通知signal handler去調(diào)用recvfrom,當(dāng)數(shù)據(jù)被復(fù)制到用戶空間后,告訴用戶進(jìn)程主循環(huán),你要的數(shù)據(jù)已經(jīng)準(zhǔn)備好你可以直接處理了?;蛘?,signal handler只是告訴用戶進(jìn)程主循環(huán)數(shù)據(jù)到達(dá)內(nèi)核空間了,你可以自己調(diào)用recvfrom隨時(shí)去復(fù)制
總之,此模型的優(yōu)勢在于第一階段不會(huì)被阻塞(等待數(shù)據(jù)到達(dá)內(nèi)核空間),The main loop can continue executing and just wait to be notified by the signal handler that either the data is ready to process or the datagram is ready to be read.
第一階段不會(huì)阻塞,第二階段調(diào)用recvfrom會(huì)阻塞,所以是同步I/O
2.5 asynchronous I/O
- http://man7.org/linux/man-pages/man7/aio.7.html
- http://lse.sourceforge.net/io/aio.html
- https://oxnz.github.io/2016/10/13/linux-aio/
asynchronous I/O是指用戶進(jìn)程發(fā)起I/O讀、寫操作后,不會(huì)被阻塞,當(dāng)I/O操作真正完成后(數(shù)據(jù)已被復(fù)制到用戶空間,可以直接處理),內(nèi)核使用信號或回調(diào)函數(shù)進(jìn)行異步通知用戶進(jìn)程。第一階段和第二階段都是是非阻塞的,是真正的異步I/O

Linux AIO API
- POSIX AIO API (
glibc):aio_read/aio_write/aio_fsync/lio_listio/aio_cancel/aio_suspend/aio_return/aio_error - Native Linux AIO API (
libaio):io_setup/io_destroy/io_submit/io_getevents/io_cancel
Linux AIO不夠成熟,Windows中的IOCP是成熟的異步I/O
2.6 五種I/O模型的比較
下圖是五種不同I/O模型的比較。它表明前四個(gè)模型之間的主要區(qū)別是第一個(gè)階段,因?yàn)榍八膫€(gè)模型中的第二個(gè)階段是相同的:當(dāng)數(shù)據(jù)從內(nèi)核復(fù)制到調(diào)用者的緩沖區(qū)時(shí),在調(diào)用recvfrom時(shí)阻塞進(jìn)程。根據(jù)1.4的定義,前四個(gè)I/O模型都是同步的,因?yàn)檎嬲腎/O操作recvfrom會(huì)阻塞進(jìn)程。但是,asynchronous I/O處理的兩個(gè)階段,與前四個(gè)模型不同,兩個(gè)階段都不會(huì)阻塞進(jìn)程,所以只有它是異步的
