需求描述
創(chuàng)建一個多用戶,多房間的全雙工聊天室。
多用戶,多房間的意思是可以有多個聊天室,每個聊天室里可以有多個用戶,并且用戶可以通過輸入房間號進(jìn)入聊天室。
全雙工的意思是聊天室中的用戶在接收其他用戶的信息的同時,也能發(fā)送信息給其他用戶。而不用等待一個用戶發(fā)送完信息,等其他用戶接收到之后,才能允許下個用戶再次發(fā)送信息。
Python I/O多路復(fù)用
全雙工功能的實現(xiàn),可以通過多線程,I/O多路復(fù)用等方式,我在這邊采用了I/O多路復(fù)用方案。
Python的select模塊提供三種I/O多路復(fù)用的具體實現(xiàn)——select,poll,epoll,我在這里選用select.select(下面用select代替)。
select會監(jiān)聽socket或者文件描述符的I/O狀態(tài)變化,并返回變化的socket或者文件描述符對象
select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)
這是Python select方法的原型,接收4個參數(shù)
rlist:list類型,監(jiān)聽其中的socket或者文件描述符是否變?yōu)榭勺x狀態(tài),返回那些可讀的socket或者文件描述符組成的list
wlist:list類型,監(jiān)聽其中的socket或者文件描述符是否變?yōu)榭蓪憼顟B(tài),返回那些可寫的socket或者文件描述符組成的list
xlist:list類型,監(jiān)聽其中的socket或者文件描述符是否出錯,返回那些出錯的socket或者文件描述符組成的list
timeout:設(shè)置select的超時時間,設(shè)置為None代表永遠(yuǎn)不會超時,即阻塞。
注意:Python的select方法在Windows和Linux環(huán)境下的表現(xiàn)是不一樣的,Windows下它只支持socket對象,不支持文件描述符(file descriptions),而Linux兩者都支持。
Linux下,可以通過sys.stdin標(biāo)準(zhǔn)輸入流獲取用戶的輸入,而sys.stdin就是一個文件描述符。
所以可以用下面的代碼來獲取用戶輸入
rlist, wlist, xlist = select.select( [sys.stdin], [], [] )
print rlist[0].readline()
由于只監(jiān)聽了sys.stdin,當(dāng)用戶輸入之后,只會返回sys.stdin對象,可以通過readline方法來獲取用戶輸入的內(nèi)容。
聊天室服務(wù)端
服務(wù)端要完成三件事:
- 接收多個客戶端的連接
- 管理用戶的聊天室分組
- 將一個客戶端輸入的消息廣播到他所在聊天室的所有其他客戶端
第一件事,定義一個list類型變量_current_in_list來表示監(jiān)聽多個socket連接的可讀事件,利用上面說的select來處理I/O多路復(fù)用,代碼如下:
rlist, wlist, xlist = select.select(_current_in_list, [], [])
當(dāng)select返回時,說明rlist上有可讀的socket了,這里又有兩種情況:
1.如果返回的是service socket(服務(wù)器創(chuàng)建的socket,用來監(jiān)聽客戶端是否連接的),表示有新的客戶端連接了,調(diào)用socket.accept()方法獲取新的客戶端socket對象和地址(ip和port組成的元組),將新的客戶端socket加入到_current_in_list。
2.如果返回的是其他socket(客戶端socket),表示有客戶端發(fā)送數(shù)據(jù)到服務(wù)端了,調(diào)用socket.recv()方法獲取數(shù)據(jù)。
為了實現(xiàn)用戶分組,我規(guī)定每個客戶端在連接服務(wù)器之前都要先輸入聊天室的房間號,并且每次發(fā)送到服務(wù)器的數(shù)據(jù)都要帶上房間號,最后定義了一個dict類型的變量_room用來存儲用戶和房間的對應(yīng)關(guān)系,客戶端傳遞的房間號就是_room的key,而它的value則是一個客戶端socket的列表。數(shù)據(jù)格式如下:
<RID:111>Welcome to Chat Room</RID:111>
對于接收到的數(shù)據(jù),首先通過正則表達(dá)式檢查是否符合規(guī)定,然后提取房間號和用戶發(fā)送的消息。
判斷是否新加入到聊天室的用戶,如果是則發(fā)送廣播通知聊天室的其他用戶有新人加入,否則發(fā)送用戶消息給聊天室的其他用戶。關(guān)鍵代碼如下
rgx_message = CONFORM_MSG.match(raw_message)
if rgx_message:
room_id = rgx_message.group(1)
message = rgx_message.group(2)
if sock not in _room.setdefault(room_id, []):
_room[room_id].append(sock)
broadcast_message(room_id, sock, '\n[%s:%s] entered room.\n'\
% sock.getpeername())
else:
broadcast_message(room_id, sock, \
"\n<" +str(sock.getpeername()) + ">" + message)
根據(jù)房間號將消息廣播給聊天室中除發(fā)送用戶之外的所有其他用戶
def broadcast_message(room_id, sock, message):
for member in _room[room_id]:
if member is not sock:
try:
member.send(message)
except socket.error:
member.close()
_current_in_list .remove(member)
_room[room_id].remove(member)
如果發(fā)送報錯,可能socket已經(jīng)被關(guān)閉,所以將它從_current_in_list和_room中刪除,因為socket已經(jīng)被關(guān)閉了,但還保留在_current_in_list中,select會報錯。
完整的聊天室服務(wù)端代碼如下:
import socket
import select
import re
HOST = "localhost"
PORT = 9898
ADDR = (HOST, PORT)
BUFSIZE = 1024
CONFORM_MSG = re.compile(r'^<RID:(\d+)>([\s\S]*?)</RID:\1>')
_service_socket = socket.socket()
_service_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
_service_socket.bind(ADDR)
_service_socket.listen(10)
_current_in_list = [_service_socket]
_room = dict()
def broadcast_message(room_id, sock, message):
for member in _room[room_id]:
if member is not sock:
try:
member.send(message)
except socket.error:
member.close()
_current_in_list .remove(member)
_room[room_id].remove(member)
def main():
while True:
rlist, wlist, xlist = select.select(_current_in_list, [], [])
for sock in rlist:
if sock is _service_socket:
client, addr = sock.accept()
_current_in_list.append(client)
print "Client (%s:%s) connected." % addr
else:
try:
raw_message = sock.recv(BUFSIZE)
if raw_message:
rgx_message = CONFORM_MSG.match(raw_message)
if rgx_message:
room_id = rgx_message.group(1)
message = rgx_message.group(2)
if sock not in _room.setdefault(room_id, []):
_room[room_id].append(sock)
broadcast_message(room_id, sock, '\n[%s:%s] entered room.\n'\
% sock.getpeername())
else:
broadcast_message(room_id, sock, \
"\n<" +str(sock.getpeername()) + ">" + message)
else:
print "Invalid format message,", raw_message
except socket.error:
print "Client (%s, %s) is offline" % sock.getpeername()
sock.close()
_current_in_list .remove(member)
for room_id, socks in _room.iteritems():
for _ in socks:
if _ is sock:
_room[room_id].remove(_)
break
else:
continue
break
if __name__ == '__main__':
main()
聊天室客戶端
客戶端也要實現(xiàn)三個功能:
- 確定房間號
- 根據(jù)規(guī)定的協(xié)議規(guī)則組合房間號和消息并發(fā)送給服務(wù)器
- 接收服務(wù)器廣播的消息
客戶端相對服務(wù)端的代碼邏輯來的簡單,房間號直接用raw_input來讓用戶輸入獲取。
用到了select I/O多路復(fù)用來實現(xiàn)全雙工,_current_in_list中加入sys.stdin和socket,一旦用戶輸入或者socket接到服務(wù)器廣播的消息,就返回rlist。
遍歷rlist,如果是socket就通過socket.recv()接收廣播消息,如果是sys.stdin則通過sys.stdin.readline()從標(biāo)準(zhǔn)輸入流中獲取用戶輸入的消息。
完整的客戶單代碼:
import socket
import select
import sys
HOST = "localhost"
PORT = 9898
ADDR = (HOST, PORT)
BUFSIZE = 1024
_current_in_list = [sys.stdin]
def prompt():
sys.stdout.write('<You> ')
sys.stdout.flush()
def gen_message(room_id, raw_message):
return '<RID:{}>{}</RID:{}>'.format(room_id, raw_message, room_id)
def main():
room_id = raw_input('<Room ID> ')
client_socket = socket.socket()
client_socket.settimeout(2)
try:
client_socket.connect(ADDR)
_current_in_list.append(client_socket)
# notify all room's user that new client is entered
client_socket.send(gen_message(room_id, ''))
except socket.error:
print "Unable to connect"
sys.exit()
print 'Connected to remote host. Start sending messages'
prompt()
while True:
rlist, wlist, xlist = select.select(_current_in_list, [], [])
for sock in rlist:
if sock is client_socket:
message = sock.recv(BUFSIZE)
if not message:
print '\nDisconnected from chat server.'
sys.exit()
else:
sys.stdout.write(message)
prompt()
else:
raw_message = sys.stdin.readline()
client_socket.send(gen_message(room_id, raw_message))
prompt()
if __name__ == '__main__':
main()