Context
在寫一個Socket I/O模塊,功能要求如下:
- 作為服務(wù)端,需要永遠(yuǎn)循環(huán)等待連接
- 建立TCP連接后可以收發(fā)數(shù)據(jù)
- 收發(fā)數(shù)據(jù)相互獨立,不能阻塞
Trouble
代碼如下
def run_server(send_queue, receive_queue):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen(1)
conn, addr = s.accept()
print(f"[server] Connecting with {addr}")
with conn:
while True:
try:
m = send_queue.get(block=False)
except queue.Empty as e:
m = None
if m:
print(isinstance(m, AbstractMessage))
if isinstance(m, AbstractMessage):
send_bytes = message2bytes(m)
conn.sendall(send_bytes)
print(f"Send message is {type(m)} : {send_bytes}")
try:
data = conn.recv(4096)
except BlockingIOError as e:
data = None
if data:
print(f"data is {data}")
receive_message = bytes2message(data)
print(f"Receive message is {receive_message}")
receive_queue.put(receive_message)
BUS.push(receive_message)
調(diào)試時發(fā)現(xiàn)當(dāng)Client沒有發(fā)送數(shù)據(jù)時,Server會阻塞地等待接收數(shù)據(jù),也就是data = conn.recv(4096)這一行代碼,導(dǎo)致無法發(fā)送數(shù)據(jù)。
Solution
查閱queue — A synchronized queue class
后,得知recv()方法需要傳入兩個參數(shù),bufsize和flags:
Receive data from the socket. The return value is a bytes object representing the data received. The maximum amount of data to be received at once is specified by bufsize. See the Unix manual page recv(2) for the meaning of the optional argument flags; it defaults to zero.
文檔內(nèi)只描述了bufsize的用法,關(guān)于flags只是一筆帶過。
在StackOverflow的When does socket.recv(recv_size) return?問題中@Ray的回答:
You can also call
recv()as nonblocking if you give the right flag for it:socket.recv(10240, 0x40) # 0x40 = MSG_DONTWAIT a.k.a. O_NONBLOCKPlease note that you have to catch the [Errno 11] Resource temporarily unavailable exceptions when there is no input data.
得知通過flags參數(shù)可以將recv()方法設(shè)置為MSG_DONTWAIT,通過try-except寫法可以實現(xiàn)非阻塞。
代碼如下:
try:
data = conn.recv(4096, 0x40)
except BlockingIOError as e:
data = None
tips: 在查閱了recv(2) - Linux man page文檔后依然沒能找到0x40和MSG_DONTWAIT的對照表。
Sunmmary
Python的socket.recv()方法可以通過傳入flags=0x40參數(shù)配合try-except方法實現(xiàn)非阻塞。