第三篇總結(jié)下TCP交互數(shù)據(jù)流與多進程編程以及python中多客戶端編程的幾種實現(xiàn)方案,測試環(huán)境為macos10.12和ubuntu16.04。
1 交互數(shù)據(jù)流
先看一段簡單的代碼,這里先把服務端更加簡化一下,只接收一次數(shù)據(jù)就關閉客戶端的連接,客戶端代碼不變,如下所示。
#onceserver.py
import socket
def start_server(ip, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((ip, port))
sock.listen(1)
while True:
conn, cliaddr = sock.accept()
print 'server connect from: ', cliaddr
data = conn.recv(1024)
print 'server received:', data
conn.send(data.upper())
conn.close()
except Exception, ex:
print 'exception occured:', ex
finally:
sock.close()
if __name__ == "__main__":
start_server('127.0.0.1', 7777)
#client.py
from socket import *
import sys
def start_client(ip, port):
try:
sock = socket(AF_INET, SOCK_STREAM, 0)
sock.connect((ip, port))
print 'connected'
while True:
data = sys.stdin.readline().strip()
if not data: break
sock.send(data)
result = sock.recv(1024)
if not result:
print 'other side has closed'
else:
print 'response from server:%s' % result
sock.close()
except Exception, ex:
print ex
if __name__ == "__main__":
start_client('127.0.0.1', 7777)
先開一個終端python onceserver.py,再開另一個終端運行python client.py,然后在客戶端依次輸入haha, hehe, wawa,可以發(fā)現(xiàn)結(jié)果如下:
ssj@ssj-mbp ~/Prog/network $ python client.py
connected
haha
response from server:HAHA
hehe
other side has closed
wawa
[Errno 32] Broken pipe
而對應到wireshark里面,可以看到數(shù)據(jù)包如下,出現(xiàn)這個結(jié)果也很容易解釋了:序號5的數(shù)據(jù)包是客戶端發(fā)送了4個字節(jié)的數(shù)據(jù)haha給服務端;序號6的數(shù)據(jù)包是服務端回應一個ACK包,可以看到序號6的ACK的值比序號5上一個Seq的增加了4,這是因為傳輸了4個字節(jié)的數(shù)據(jù),所以請求的下一個seq的值加了4。接著的序號7的數(shù)據(jù)包是服務端發(fā)給客戶端的4個字節(jié)的數(shù)據(jù)HAHA,ACK的值不變,PSH標志置位。序號8是客戶端對這四個字節(jié)的ACK包。序號9則是服務端關閉連接的FIN包,然后序號10是客戶端對FIN的ACK包。

前一段都是正常的,下面看看后面的輸入產(chǎn)生這個結(jié)果的原因,這個時候,服務端已經(jīng)關閉了該連接,我們在客戶端再次輸入hehe,這時對應序號11,而由于服務端已經(jīng)關閉了連接,所以回應了一個RST包,對應序號12??蛻舳藄end完數(shù)據(jù)后就不管了,收到RST包后,發(fā)現(xiàn)數(shù)據(jù)為0,所以打印出other side has closed,但是這個時候并不能立刻通知應用程序,而是保存在內(nèi)核的TCP協(xié)議層,這樣直到最后再一次準備發(fā)送wawa的時候,由于TCP協(xié)議層已經(jīng)處于RST狀態(tài)了,因此不會將數(shù)據(jù)發(fā)出,而是發(fā)一個SIGPIPE信號給應用層,SIGPIPE信號的缺省處理動作是終止程序,所以看到上面的現(xiàn)象。為了避免客戶端異常退出,上面的代碼應該在判斷對方關閉了連接后break出循環(huán),而不是繼續(xù)send。而服務端要多次接收數(shù)據(jù),則改成之前文章中那樣。
2 處理多客戶端請求-多進程方案
上一節(jié)修正后的服務端和客戶端代碼如下:
#server.py
import socket
def start_server(ip, port):
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
listensock.bind((ip, port))
listensock.listen(5)
while True:
conn, cliaddr = listensock.accept()
print 'server connect from: ', cliaddr
while True:
data = conn.recv(1024)
if not data:
print 'client closed:', cliaddr
break
print 'server received:', data
conn.send(data.upper())
conn.close()
except Exception, ex:
print 'exception occured:', ex
finally:
listensock.close()
if __name__ == "__main__":
start_server('127.0.0.1', 7777)
#client.py
from socket import *
import sys
def start_client(ip, port):
try:
sock = socket(AF_INET, SOCK_STREAM, 0)
sock.connect((ip, port))
print 'connected'
while True:
data = sys.stdin.readline().strip()
if not data: break
sock.send(data)
result = sock.recv(1024)
if not result:
print 'other side has closed'
break
else:
print 'response from server:%s' % result
sock.close()
except Exception, ex:
print ex
if __name__ == "__main__":
start_client('127.0.0.1', 7777)
這個時候開啟第一個終端,運行python server.py,這時候再開啟第二個終端運行python client.py,輸入數(shù)據(jù),也得到了正常的回應,可是當我們開啟另外一個終端運行第二個客戶端的時候,會發(fā)現(xiàn)發(fā)送數(shù)據(jù)后并只得到了一個ACK回應,服務端并沒有發(fā)送數(shù)據(jù)過來。原因也很簡單,服務端還卡在第二個循環(huán)里面,第一個客戶端連接不退出,服務端不會再次運行accept函數(shù)處理新的連接。
處理多客戶端有幾種方式,比如多進程,一個進程對應一個連接,還有多線程,以及進程和線程混合模式等。當然還有更好的select,epoll等方案可以一個進程處理多個客戶端,這節(jié)就用多進程的來實現(xiàn)下多客戶端處理。修改代碼如下:
import socket
import os
import sys
def start_server(ip, port):
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
listensock.bind((ip, port))
listensock.listen(5)
while True:
conn, cliaddr = listensock.accept()
try:
pid = os.fork()
except OSError, e:
break
if pid == 0:
print 'server connect from: ', cliaddr
listensock.close()
while True:
data = conn.recv(1024)
if not data:
print 'client closed:', cliaddr
break
print 'server received:', data
conn.send(data.upper())
conn.close()
os._exit(0)
else:
conn.close()
except Exception, ex:
print 'exception occured:', ex
finally:
listensock.close()
這樣每次來一個連接,就創(chuàng)建一個新的子進程來處理,處理完子進程退出,就可以達到處理多個客戶端的情況了。注意的是,這里子進程退出了而父進程也不進行回收處理的話,子進程會變成僵尸進程,如下圖所示,一個客戶端退出后,可以看到多了一個Python的僵尸進程,狀態(tài)是Z+,在linux下面會顯示狀態(tài)為<defunct>。
? data ps aux|grep Python
ssj 7908 0.0 0.0 0 0 s001 Z+ 4:14下午 0:00.00 (Python)
為什么會有僵尸進程的存在呢?我們知道一個進程在終止時會關閉所有文件描述符,釋放在用戶空間分配的內(nèi)存,但是它的進程控制塊(PCB)還保留著,內(nèi)核在其中保存了一些信息:如果是正常終止則保存著退出狀態(tài),如果是異常終止則保存著導致該進程終止的信號是哪個。如果一個進程已經(jīng)終止,但是它的父進程尚未調(diào)用wait或waitpid對它進行清理,這時的進程狀態(tài)稱為僵尸進程。也可以參考下stackoverflow上面的這個問題 why-zombie-processes-exist。
為了解決僵尸進程問題,父進程需要處理SIGCHLD信號并調(diào)用wait清理僵尸進程,當然為了簡單起見,我這里是在父進程里面直接忽略SIGCHLD信號,相當于直接告訴系統(tǒng),我不關心子進程的狀態(tài),不要產(chǎn)生僵尸進程,這樣也可以達到解決僵尸進程的目的,修改后的代碼如下:
......
import signal #導入signal模塊
def start_server(ip, port):
signal.signal(signal.SIGCHLD, signal.SIG_IGN) #忽略SIGCHLD信號
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
......
另外說一句,與僵尸進程對應的還有個孤兒進程,就是父進程已經(jīng)退出,而子進程還沒有退出時所處的狀態(tài),孤兒進程的父進程退出后會被init進程接管,也就是說它的父進程會被設置為1,子進程運行結(jié)束會被init進程回收,不會產(chǎn)生僵尸進程。另外一點,如果要終止一個僵尸進程是不能通過kill命令來實現(xiàn)的,因為僵尸進程已經(jīng)終止了,沒法再kill,正確的方法是kill掉僵尸進程的父進程,讓init進程接管僵尸進程并回收。
3 處理多客戶端請求-select方案
在之前提到的TCP編程中,其中的socket是阻塞socket,因為python程序會停止運行,直到一個event發(fā)生。其中accept()調(diào)用會阻塞,直到接收到一個客戶端連接。而recv()調(diào)用也會阻塞,直到這次接收客戶端數(shù)據(jù)完成(或者沒有更多的數(shù)據(jù)要接收)。send()調(diào)用也會阻塞,直到將這次需要返回給客戶端的數(shù)據(jù)都放到Linux的發(fā)送緩沖隊列中。使用多進程或者多線程來處理多客戶端請求,容易引起性能問題,異步socket是一種不錯的解決方案。異步socket在python的API里面有select,poll,epoll三種,其中epoll性能最好,select性能較差,因為它每次都要輪詢程序鎖需要的所有socket去查找感興趣的event。注意一下,select在這里雖然稱之為異步socket,并不是說它的讀取和寫入不阻塞,只是因為select函數(shù)給你找到了已經(jīng)有的讀事件和寫事件的socket,你在accept,recv,send調(diào)用的時候可以直接讀取到數(shù)據(jù)而不需要再等待,因為數(shù)據(jù)已經(jīng)到達。
select幾乎在所有平臺都能支持,良好的跨平臺支持是它為數(shù)不多的優(yōu)點了。select的一個缺點在于單個進程能夠監(jiān)視的文件描述符的數(shù)量存在最大限制,如果要增大則需要修改參數(shù)重新編譯內(nèi)核。另外,select()所維護的socket文件描述符的數(shù)據(jù)結(jié)構(gòu),隨著文件描述符數(shù)量的增大,調(diào)用select()掃描所有的socket的開銷也會增加。poll()與select()類似,這里就不再討論。select()將就緒的讀寫事件的socket告訴進程后,如果進程沒有對其進行IO操作,那么下次調(diào)用select()的時候?qū)⒃俅畏祷剡@些socket,所以它們一般不會丟失消息(比如在下面代碼中第一次不處理wset中的socket,第二次select的時候還是會返回對應的socket的集合)。這種方式稱為水平觸發(fā)(Level Triggered),后面會看到epoll里面支持水平觸發(fā)和垂直觸發(fā)。
select服務端的實現(xiàn)如下所示:
#selectserver.py
import socket
import os
import select
import Queue
def start_server(ip, port):
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
listensock.bind((ip, port))
listensock.listen(511)
inputs = [listensock]
outputs = []
msg_queue = {}
while inputs:
print 'waiting for next event'
rset, wset, expset = select.select(inputs, outputs, inputs)
if not rset and not wset and not expset:
print 'timeout'
break
print 'rset %s, wset:%s' % (rset, wset)
#處理讀事件
for s in rset:
if s is listensock: #如果是監(jiān)聽socket,則accept接受連接。
conn, cliaddr = s.accept()
print 'connect from ', cliaddr
inputs.append(conn)
msg_queue[conn] = Queue.Queue() #為每個連接分配一個隊列接收數(shù)據(jù)
else:
data = s.recv(1024)
if data:
print 'server received %s from %s' % (data, s.getpeername())
msg_queue[s].put(data)
if s not in outputs:
outputs.append(s)
else:
print 'client %s closed' % s.getpeername()
if s in outputs:
outputs.remove(s) //客戶端關閉,將對應socket從outputs中移除。
inputs.remove(s)
del msg_queue[s]
s.close()
#處理寫事件
for s in wset:
try:
#用get_nowait()防止阻塞,如果隊列為空會拋出Empty異常,python隊列用get會阻塞。
next_msg = msg_queue[s].get_nowait()
print 'server sending %s to %s' % (next_msg.upper(), s.getpeername())
s.send(next_msg.upper())
except Queue.Empty:
print s.getpeername(), 'queue empty'
outputs.remove(s)
#處理異常
for s in expset:
print 'exception on %s' % s.getpeername()
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
del msg_queue[s]
except Exception, ex:
print 'exception occured:', ex
finally:
listensock.close()
if __name__ == "__main__":
start_server('127.0.0.1', 7777)
運行python selectserver.py,然后在另一個終端開啟python client.py,輸入數(shù)據(jù)hehe,可以看到服務端的輸出如下,也就是說,select會阻塞等待,等到有事件來的時候,select函數(shù)會遍歷所有的socket,找到有讀取事件和寫入事件的socket,然后讀取事件的socket設置在rset中,寫入事件的socket的設置在wset中,異常的socket在exception中,然后分別處理即可。注意讀取事件有個特例是監(jiān)聽關鍵字,要單獨處理。
ssj@ssj-mbp ~/Prog/network/data $ python selectserver.py
waiting for next event
rset [<socket._socketobject object at 0x1022e37c0>], wset:[]
connect from ('127.0.0.1', 61612)
waiting for next event
rset [<socket._socketobject object at 0x1022e39f0>], wset:[]
server received haha from ('127.0.0.1', 61612)
waiting for next event
rset [], wset:[<socket._socketobject object at 0x1022e39f0>]
server sending HAHA to ('127.0.0.1', 61612)
waiting for next event
rset [], wset:[<socket._socketobject object at 0x1022e39f0>]
('127.0.0.1', 61612) queue empty
waiting for next event
4 處理多客戶端請求-epoll方案
上一節(jié)的select方案是不需要多進程了,只要有I/O事件產(chǎn)生,我們的程序就會阻塞在select處。但是依然有個問題,我們從select那里僅僅知道I/O事件發(fā)生,但卻并不知道是那幾個socket的I/O事件(可能有一個,多個,甚至全部),于是只能無差別輪詢所有流,找出能讀出數(shù)據(jù),或者寫入數(shù)據(jù)的流,對他們進行操作。輪詢的時間復雜度為O(n),而且socket越多,時間越長。epoll就是對select的改進,它不再需要輪詢所有的socket了,而是把哪個socket發(fā)生了什么I/O事件直接通知給我們,如下代碼中的epoll.poll()方法就是返回有I/O事件的socket的文件描述符和事件類型,大大降低了時間復雜度,提高了性能。關于epoll的原理可以參見參考資料5,python中的API已經(jīng)簡化了不少操作。
epoll有水平觸發(fā)(LT, level triggered)和邊緣觸發(fā)(ET, edge triggered)兩種方式。其中LT是默認的工作方式,LT模式同時支持block和no-block socket,內(nèi)核告訴你一個文件描述符是否就緒了,然后你可以對這個就緒的fd進行IO操作。如果你不作任何操作,內(nèi)核還是會繼續(xù)通知,這種模式編程出錯誤可能性要小一點。而ET是一種加速模式,當一個新的事件到來時,ET模式下可以從poll調(diào)用中獲取到這個事件,可是如果這次沒有把這個事件對應的套接字緩沖區(qū)處理完,在這個套接字中沒有新的事件再次到來時,在ET模式下是無法再次從poll調(diào)用中獲取這個事件的,使用ET方式的epoll代碼可以參見參考資料4。macos沒有epoll方法,這里用的測試環(huán)境為Ubuntu16.04.
python中使用epoll代碼如下:
import socket
import os
import select
import Queue
def start_server(ip, port):
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listensock.bind((ip, port))
listensock.listen(511)
listensock.setblocking(0)
epoll = select.epoll()
epoll.register(listensock.fileno(), select.EPOLLIN)
try:
connections = {}
msg_queue = {}
while True:
events = epoll.poll(1)
for fileno, event in events:
if fileno == listensock.fileno():
conn, cliaddr = listensock.accept()
conn.setblocking(0)
epoll.register(conn.fileno(), select.EPOLLIN)
connections[conn.fileno()] = conn
msg_queue[conn.fileno()] = Queue.Queue()
elif event & select.EPOLLIN:
data = connections[fileno].recv(1024)
if data:
print 'server recv ', data
msg_queue[fileno].put(data)
epoll.modify(fileno, select.EPOLLOUT)
else:
print 'no data recv, server close ', fileno
epoll.modify(fileno, select.EPOLLHUP)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLOUT:
try:
data = msg_queue[fileno].get_nowait()
print 'server send ', data
connections[fileno].send(data.upper())
except Queue.Empty:
epoll.modify(fileno, select.EPOLLIN)
elif event & select.EPOLLHUP:
print 'close ', fileno
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
except Exception, ex:
print 'exception occured:', ex
finally:
epoll.unregister(listensock.fileno())
epoll.close()
listensock.close()
if __name__ == "__main__":
start_server('127.0.0.1', 7777)
5 參考資料
- Linux C一站式編程 - 網(wǎng)絡編程相關章節(jié)
- Python網(wǎng)絡編程中的select 和 poll I/O復用的簡單使用
- epoll或者kqueue的原理是什么
- Python中如何使用Linux的epoll
- 高并發(fā)編程之epoll詳解