編碼發(fā)送數(shù)據(jù)到redis服務(wù),客戶端完成了第一個(gè)交互過(guò)程,即請(qǐng)求的過(guò)程。接下來(lái)客戶端還要接受并解析服務(wù)端的響應(yīng)回復(fù)。這個(gè)過(guò)程我們需要將RESP協(xié)議編碼的字節(jié)串解析成python的字串。
由于響應(yīng)回復(fù)有多種,并且有多行的存在。因此解析響應(yīng)的時(shí)候要注意對(duì)CRLF的處理,即tcp包的數(shù)據(jù)分界方式。在我們尚為進(jìn)行真正的網(wǎng)絡(luò)通信的時(shí)候,我們創(chuàng)建一個(gè)變量用于表示redis服務(wù)器返回的進(jìn)入的socket緩沖區(qū)。此時(shí)的代碼邏輯與讀取真實(shí)的socket數(shù)據(jù)很像,后面我們?cè)俳榻Bredis.py的socket交互。
read_response
redispy中,調(diào)用PythonParser類的read_response方法來(lái)讀取redis的數(shù)據(jù)。該方法又會(huì)相繼調(diào)用_buffer對(duì)象的readline和read方法。后兩者分別調(diào)用SocketBuffer類的_read_from_socket方法來(lái)讀取socket。為了模擬從socket中讀取數(shù)據(jù),我們會(huì)修改_read_from_socket方法,使其讀socket的數(shù)據(jù)改成從我們假設(shè)的緩沖區(qū)變量讀取。
class Socket(object):
def __init__(self, data):
self.data = data
def recv(self, length):
data = self.data[:length]
self.data=self.data[length:]
return data
用我們定義的Socket類模擬網(wǎng)絡(luò)數(shù)據(jù)流,其中recv方法則從data中返回?cái)?shù)據(jù)。為了簡(jiǎn)化學(xué)習(xí),我們暫時(shí)把所有錯(cuò)誤的處理都忽略。
狀況回復(fù)
從前面的RESP協(xié)議可以得知,狀態(tài)回復(fù)以+開(kāi)頭,后面跟著狀態(tài)消息,最后以CRLF結(jié)束。
測(cè)試的代碼如下:
data = b'+OK\r\n'
pp = PythonParser(socket_read_size=65536)
pp.on_connect(data)
print(pp.read_response())
打印的結(jié)果為b'OK'。我們先看下PythonParser類的定義。
class PythonParser(object):
encoding = None
def __init__(self, socket_read_size):
self.socket_read_size = socket_read_size
self._sock = None
self._buffer = None
PythonParser類定義了讀取socket的數(shù)據(jù)大小,已經(jīng)socket對(duì)象和buffer對(duì)象。
再看on_connect方法,主要是初始化了我們假定的Socket對(duì)象和SocketBuffer對(duì)象。
def on_connect(self, data):
self._sock = Socket(data)
self._buffer = SocketBuffer(self._sock, self.socket_read_size)
SocketBuffer
SocketBuffer類的主要職能就是把從socket中讀取的數(shù)據(jù),以bytes的方式存儲(chǔ)到內(nèi)存中。然后從內(nèi)存中解析該數(shù)據(jù)。通過(guò)控制buffer的寫(xiě)入和寫(xiě)出的值,可以精確的設(shè)置什么時(shí)候從socket中讀數(shù)據(jù)。
class SocketBuffer(object):
def __init__(self, socket, socket_read_size):
self._sock = socket
self.socket_read_size = socket_read_size
self._buffer = BytesIO()
self.bytes_written = 0
self.bytes_read = 0
@property
def length(self):
return self.bytes_written - self.bytes_read
def _read_from_socket(self, length=None):
pass
def purge(self):
pass
def read(self, length):
pass
def readline(self):
pass
該類實(shí)例化的時(shí)候會(huì)初始化socket對(duì)象和_buffer對(duì)象,后者是BytesIO的實(shí)例,用于讀取寫(xiě)入內(nèi)存字節(jié)數(shù)據(jù)。
回到我們的測(cè)試代碼中,一旦調(diào)用了on_connect方法,下面就是調(diào)用read_response方法。在該方法中,首先會(huì)調(diào)用_buffer對(duì)象的readline方法:
def readline(self):
buf = self._buffer
buf.seek(self.bytes_read)
data = buf.readline()
# 處理包結(jié)束
while not data.endswith(SYM_CRLF):
self._read_from_socket()
buf.seek(self.bytes_read)
data = buf.readline()
self.bytes_read += len(data)
if self.bytes_read == self.bytes_written:
self.purge()
return data[:-2]
readline方法的主要功能就是從socket中讀取一行數(shù)據(jù)。首先將bytes的指針seek到起始的位置。然后判斷是否以CRLF結(jié)尾,即表示是否讀取了redis的一個(gè)編碼單位。如果尚未讀取,就會(huì)調(diào)用_read_from_socket方法從socket緩沖區(qū)讀取數(shù)據(jù)到內(nèi)存緩沖區(qū)中。最后再?gòu)膬?nèi)存中讀取一行數(shù)據(jù)到data變量中。
例如我們的例子中,redis返回的數(shù)據(jù)是b'+OK\r\n',此時(shí)會(huì)將所有數(shù)據(jù)都讀取到BytesIO中,然后從BytesIO讀取到data,最后返回+OK。
下面再看read_response方法:
def read_response(self):
response = self._buffer.readline()
byte, response = byte_to_chr(response[0]), response[1:]
if byte not in ('-', '+', ':', '$', '*'):
raise RedisError
# server returned an error
if byte == '-':
response = nativestr(response)
# 處理錯(cuò)誤
return response
# single value
elif byte == '+':
pass
# int value
elif byte == ':':
response = int(response)
# bulk response
elif byte == '$':
length = int(response)
if length == -1:
return None
response = self._buffer.read(length)
# multi-bulk response
elif byte == '*':
length = int(response)
if length == -1:
return None
response = [self.read_response() for i in range(length)]
if isinstance(response, bytes) and self.encoding:
response = response.decode(self.encoding)
return response
該方法會(huì)讀取stocketbuffer對(duì)象的返回,即上面的+OK。通過(guò)判斷第一個(gè)字節(jié)的類型來(lái)判斷回復(fù)的類型。此時(shí)比較簡(jiǎn)單,直接返回OK。錯(cuò)誤回復(fù)也類似,直接把錯(cuò)誤類型和錯(cuò)誤信息返回即可。
分段讀取
上面的例子中,socket的recv一次調(diào)用的字節(jié)是65536。可以把socket緩沖區(qū)的數(shù)據(jù)全部讀取。如果設(shè)定的大小是每次只讀取一個(gè)字節(jié)呢?
修改測(cè)試代碼再運(yùn)行,我們看見(jiàn)輸入依然正常。因?yàn)樵趓eadline代碼中,while not data.endswith(SYM_CRLF)的判斷可以幫我們斷定什么時(shí)候讀取完。無(wú)論一次讀多少個(gè)字節(jié),data的數(shù)據(jù)從BytesIO讀取都是一行,因此最后總會(huì)讀到CRLF中的\n。此時(shí)data的數(shù)據(jù)就是以\r\n結(jié)尾,結(jié)束從socket中讀數(shù)據(jù)。由此可以,tcp的讀取數(shù)據(jù)是沒(méi)有界限的,就像流水一樣,除非我們?cè)趨f(xié)議中規(guī)定以什么字符標(biāo)記作為分界。上面描述的過(guò)程大致錄制了一個(gè)小視頻,點(diǎn)擊下載。
數(shù)字回復(fù)
數(shù)字回復(fù)和狀態(tài)回復(fù)類似,只不過(guò)回復(fù)的token類型以:開(kāi)頭,其他過(guò)程和狀態(tài)回復(fù)類似。不同在于客戶端的解析要轉(zhuǎn)換成數(shù)字類型。
批量回復(fù)
狀態(tài)回復(fù)很簡(jiǎn)單,redis操作中,批量回復(fù)也很常見(jiàn)。并且會(huì)比較復(fù)雜?;谏厦娴拇a運(yùn)行原理。我們首先也是讀取一行,然后接觸回復(fù)類型。因?yàn)榕炕貜?fù)的token會(huì)告訴我們返回的字串的長(zhǎng)度??梢愿鶕?jù)該信息確定我們r(jià)ead_byte位置,然后將剩余的socket全部讀取。
例如返回的數(shù)據(jù)如果是 $6\r\nfoobar\r\n, 經(jīng)過(guò)第一次readline的數(shù)據(jù),我們得到的response為$6\r\n。當(dāng)確定了返回類型是批量回復(fù),將會(huì)繼續(xù)調(diào)用read方法,將剩下的數(shù)據(jù)(foobar\r\n)讀取。read的代碼如下:
def read(self, length):
length = length + 2
if length > self.length:
self._read_from_socket(length - self.length)
self._buffer.seek(self.bytes_read)
data = self._buffer.read(length)
self.bytes_read += len(data)
if self.bytes_read == self.bytes_written:
self.purge()
return data[:-2]
read方法比readline簡(jiǎn)單。它只需要判斷BytesIO中的數(shù)據(jù)是否是所有redis的數(shù)據(jù)。對(duì)于$6\r\nfoobar\r\n而言,如果一次讀5個(gè)字節(jié),那么readline調(diào)用之后,BytesIO中還有一個(gè)f字符,即長(zhǎng)度為1。因?yàn)榉祷亓俗址?+2個(gè)字節(jié)(最后的CRLF),因此8>1,說(shuō)明還要從socket中讀取7個(gè)字節(jié)。即再次調(diào)用_read_from_socket方法,與readline類似,讀取到CRLF結(jié)束并返回。當(dāng)讀取完畢之后,需要調(diào)用self.purge情況buffer對(duì)象。為了更好的展示這個(gè)過(guò)程,也錄制了一個(gè)小視頻。
由于每次讀取5個(gè)socket字節(jié),因此在從socket中讀取了兩次。如果多讀了呢。多讀了也沒(méi)有關(guān)系,即使BytesIO多讀了socket的數(shù)據(jù)。在buffer對(duì)象讀取的時(shí)候還有一個(gè)length參數(shù),這個(gè)參數(shù)會(huì)保證以CRLF結(jié)尾。這也是redis設(shè)計(jì)協(xié)議的時(shí)候,為什么字符串返回要在$后加上字節(jié)的長(zhǎng)度。
多批量回復(fù)
多批量回復(fù)以*開(kāi)頭,這個(gè)編碼格式和請(qǐng)求的命令一樣。多個(gè)字節(jié)串分別編碼,然后再和*參數(shù)數(shù)結(jié)合。例如下面一個(gè)回復(fù)樣式:
*3\r\n$3\r\n777\r\n$6\r\n\xe4\xbd\xa0\xe5\xa5\xbd\r\n$5\r\nhello\r\n
再看read_response中解析多批量回復(fù)的代碼:
elif byte == '*':
length = int(response)
if length == -1:
return None
response = [self.read_response() for i in range(length)]
一旦是多批量回復(fù),因?yàn)?code>*后跟著返回的參數(shù)個(gè)數(shù),而這些參數(shù)個(gè)數(shù)的編碼和批量回復(fù)的一模一樣。既然如此,那么遞歸調(diào)用read_response,再解析出來(lái)的批量回復(fù)組合起來(lái)即可。
特殊類型回復(fù)
RESP的回復(fù)我們都介紹了,所謂的特殊。是數(shù)據(jù)情況特別的時(shí)候,比如返回空字符串的時(shí)候,token會(huì)是0,返回Nil值的時(shí)候,token可能是-1。具體這些情況,可以參考官方文檔的案例。
總結(jié)
經(jīng)過(guò)上面的分析,我們了解了redispy是如何解析redis服務(wù)器返回的RESP編碼的數(shù)據(jù)。解碼的關(guān)鍵在于對(duì)socket數(shù)據(jù)的讀取。盡管我們是模擬了socket對(duì)象。上面的代碼和實(shí)際socket交互是完全一樣的。因?yàn)檎鎸?shí)的socket.recv調(diào)用也只是應(yīng)用層的程序代碼從socket的緩沖區(qū)讀取數(shù)據(jù)。緩存區(qū)直接的IO則是內(nèi)核在tcp層處理的內(nèi)容。
我們把真實(shí)的socket.recv讀取數(shù)據(jù)從內(nèi)核轉(zhuǎn)移到一個(gè)Socket類,這樣的模擬也是合理的,并且易于調(diào)試。不然還得先模擬發(fā)送命令給redis,然后打斷點(diǎn)等待回復(fù)。
盡管我們的模擬抽象很好,可是真實(shí)的編碼還是需要處理socket的數(shù)據(jù)流,尤其是對(duì)于通信錯(cuò)誤的處理。完整的代碼可以閱讀redis.py項(xiàng)目。
簽名我們介紹了編碼,創(chuàng)建連接和現(xiàn)在接受數(shù)據(jù)并解碼。接下來(lái)將會(huì)實(shí)現(xiàn)redis.py中的另外一個(gè)特性,連接池的實(shí)現(xiàn)。
文中相關(guān)代碼