Redispy 源碼學(xué)習(xí)(五) --- RESP協(xié)議實(shí)現(xiàn)--解碼

編碼發(fā)送數(shù)據(jù)到redis服務(wù),客戶端完成了第一個(gè)交互過(guò)程,即請(qǐng)求的過(guò)程。接下來(lái)客戶端還要接受并解析服務(wù)端的響應(yīng)回復(fù)。這個(gè)過(guò)程我們需要將RESP協(xié)議編碼的字節(jié)串解析成python的字串。

由于響應(yīng)回復(fù)有多種,并且有多行的存在。因此解析響應(yīng)的時(shí)候要注意對(duì)CRLF的處理,即tcp包的數(shù)據(jù)分界方式。在我們尚為進(jìn)行真正的網(wǎng)絡(luò)通信的時(shí)候,我們創(chuàng)建一個(gè)變量用于表示redis服務(wù)器返回的進(jìn)入的socket緩沖區(qū)。此時(shí)的代碼邏輯與讀取真實(shí)的socket數(shù)據(jù)很像,后面我們?cè)俳榻Bredis.py的socket交互。

read_response

redispy中,調(diào)用PythonParser類的read_response方法來(lái)讀取redis的數(shù)據(jù)。該方法又會(huì)相繼調(diào)用_buffer對(duì)象的readline和read方法。后兩者分別調(diào)用SocketBuffer類的_read_from_socket方法來(lái)讀取socket。為了模擬從socket中讀取數(shù)據(jù),我們會(huì)修改_read_from_socket方法,使其讀socket的數(shù)據(jù)改成從我們假設(shè)的緩沖區(qū)變量讀取。

class Socket(object):

    def __init__(self, data):
        self.data = data

    def recv(self, length):
        data = self.data[:length]
        self.data=self.data[length:]
        return data

用我們定義的Socket類模擬網(wǎng)絡(luò)數(shù)據(jù)流,其中recv方法則從data中返回?cái)?shù)據(jù)。為了簡(jiǎn)化學(xué)習(xí),我們暫時(shí)把所有錯(cuò)誤的處理都忽略。

狀況回復(fù)

從前面的RESP協(xié)議可以得知,狀態(tài)回復(fù)以+開(kāi)頭,后面跟著狀態(tài)消息,最后以CRLF結(jié)束。

測(cè)試的代碼如下:

    data = b'+OK\r\n'
    pp = PythonParser(socket_read_size=65536)
    pp.on_connect(data)
    print(pp.read_response())

打印的結(jié)果為b'OK'。我們先看下PythonParser類的定義。

class PythonParser(object):
    encoding = None

    def __init__(self, socket_read_size):
        self.socket_read_size = socket_read_size
        self._sock = None
        self._buffer = None

PythonParser類定義了讀取socket的數(shù)據(jù)大小,已經(jīng)socket對(duì)象和buffer對(duì)象。

再看on_connect方法,主要是初始化了我們假定的Socket對(duì)象和SocketBuffer對(duì)象。

    def on_connect(self, data):
        self._sock = Socket(data)
        self._buffer = SocketBuffer(self._sock, self.socket_read_size)

SocketBuffer

SocketBuffer類的主要職能就是把從socket中讀取的數(shù)據(jù),以bytes的方式存儲(chǔ)到內(nèi)存中。然后從內(nèi)存中解析該數(shù)據(jù)。通過(guò)控制buffer的寫(xiě)入和寫(xiě)出的值,可以精確的設(shè)置什么時(shí)候從socket中讀數(shù)據(jù)。

class SocketBuffer(object):
    def __init__(self, socket, socket_read_size):
        self._sock = socket
        self.socket_read_size = socket_read_size
        self._buffer = BytesIO()
        self.bytes_written = 0
        self.bytes_read = 0

    @property
    def length(self):
        return self.bytes_written - self.bytes_read

    def _read_from_socket(self, length=None):
        pass

    def purge(self):
        pass
        
    def read(self, length):
        pass
        
    def readline(self):
        pass

該類實(shí)例化的時(shí)候會(huì)初始化socket對(duì)象和_buffer對(duì)象,后者是BytesIO的實(shí)例,用于讀取寫(xiě)入內(nèi)存字節(jié)數(shù)據(jù)。

回到我們的測(cè)試代碼中,一旦調(diào)用了on_connect方法,下面就是調(diào)用read_response方法。在該方法中,首先會(huì)調(diào)用_buffer對(duì)象的readline方法:

    def readline(self):
        buf = self._buffer
        buf.seek(self.bytes_read)
        data = buf.readline()
        # 處理包結(jié)束
        while not data.endswith(SYM_CRLF):
            self._read_from_socket()
            buf.seek(self.bytes_read)
            data = buf.readline()

        self.bytes_read += len(data)
        if self.bytes_read == self.bytes_written:
            self.purge()

        return data[:-2]

readline方法的主要功能就是從socket中讀取一行數(shù)據(jù)。首先將bytes的指針seek到起始的位置。然后判斷是否以CRLF結(jié)尾,即表示是否讀取了redis的一個(gè)編碼單位。如果尚未讀取,就會(huì)調(diào)用_read_from_socket方法從socket緩沖區(qū)讀取數(shù)據(jù)到內(nèi)存緩沖區(qū)中。最后再?gòu)膬?nèi)存中讀取一行數(shù)據(jù)到data變量中。

例如我們的例子中,redis返回的數(shù)據(jù)是b'+OK\r\n',此時(shí)會(huì)將所有數(shù)據(jù)都讀取到BytesIO中,然后從BytesIO讀取到data,最后返回+OK

下面再看read_response方法:

    def read_response(self):
        response = self._buffer.readline()

        byte, response = byte_to_chr(response[0]), response[1:]

        if byte not in ('-', '+', ':', '$', '*'):
            raise RedisError

        # server returned an error
        if byte == '-':
            response = nativestr(response)
            # 處理錯(cuò)誤
            return response
        # single value
        elif byte == '+':
            pass
        # int value
        elif byte == ':':
            response = int(response)
        # bulk response
        elif byte == '$':
            length = int(response)
            if length == -1:
                return None
            response = self._buffer.read(length)
        # multi-bulk response
        elif byte == '*':
            length = int(response)
            if length == -1:
                return None
            response = [self.read_response() for i in range(length)]
        if isinstance(response, bytes) and self.encoding:
            response = response.decode(self.encoding)
        return response

該方法會(huì)讀取stocketbuffer對(duì)象的返回,即上面的+OK。通過(guò)判斷第一個(gè)字節(jié)的類型來(lái)判斷回復(fù)的類型。此時(shí)比較簡(jiǎn)單,直接返回OK。錯(cuò)誤回復(fù)也類似,直接把錯(cuò)誤類型和錯(cuò)誤信息返回即可。

分段讀取

上面的例子中,socket的recv一次調(diào)用的字節(jié)是65536。可以把socket緩沖區(qū)的數(shù)據(jù)全部讀取。如果設(shè)定的大小是每次只讀取一個(gè)字節(jié)呢?

修改測(cè)試代碼再運(yùn)行,我們看見(jiàn)輸入依然正常。因?yàn)樵趓eadline代碼中,while not data.endswith(SYM_CRLF)的判斷可以幫我們斷定什么時(shí)候讀取完。無(wú)論一次讀多少個(gè)字節(jié),data的數(shù)據(jù)從BytesIO讀取都是一行,因此最后總會(huì)讀到CRLF中的\n。此時(shí)data的數(shù)據(jù)就是以\r\n結(jié)尾,結(jié)束從socket中讀數(shù)據(jù)。由此可以,tcp的讀取數(shù)據(jù)是沒(méi)有界限的,就像流水一樣,除非我們?cè)趨f(xié)議中規(guī)定以什么字符標(biāo)記作為分界。上面描述的過(guò)程大致錄制了一個(gè)小視頻,點(diǎn)擊下載

數(shù)字回復(fù)

數(shù)字回復(fù)和狀態(tài)回復(fù)類似,只不過(guò)回復(fù)的token類型以:開(kāi)頭,其他過(guò)程和狀態(tài)回復(fù)類似。不同在于客戶端的解析要轉(zhuǎn)換成數(shù)字類型。

批量回復(fù)

狀態(tài)回復(fù)很簡(jiǎn)單,redis操作中,批量回復(fù)也很常見(jiàn)。并且會(huì)比較復(fù)雜?;谏厦娴拇a運(yùn)行原理。我們首先也是讀取一行,然后接觸回復(fù)類型。因?yàn)榕炕貜?fù)的token會(huì)告訴我們返回的字串的長(zhǎng)度??梢愿鶕?jù)該信息確定我們r(jià)ead_byte位置,然后將剩余的socket全部讀取。

例如返回的數(shù)據(jù)如果是 $6\r\nfoobar\r\n, 經(jīng)過(guò)第一次readline的數(shù)據(jù),我們得到的response為$6\r\n。當(dāng)確定了返回類型是批量回復(fù),將會(huì)繼續(xù)調(diào)用read方法,將剩下的數(shù)據(jù)(foobar\r\n)讀取。read的代碼如下:

    def read(self, length):
        length = length + 2
        if length > self.length:
            self._read_from_socket(length - self.length)

        self._buffer.seek(self.bytes_read)
        data = self._buffer.read(length)
        self.bytes_read += len(data)

        if self.bytes_read == self.bytes_written:
            self.purge()

        return data[:-2]

read方法比readline簡(jiǎn)單。它只需要判斷BytesIO中的數(shù)據(jù)是否是所有redis的數(shù)據(jù)。對(duì)于$6\r\nfoobar\r\n而言,如果一次讀5個(gè)字節(jié),那么readline調(diào)用之后,BytesIO中還有一個(gè)f字符,即長(zhǎng)度為1。因?yàn)榉祷亓俗址?+2個(gè)字節(jié)(最后的CRLF),因此8>1,說(shuō)明還要從socket中讀取7個(gè)字節(jié)。即再次調(diào)用_read_from_socket方法,與readline類似,讀取到CRLF結(jié)束并返回。當(dāng)讀取完畢之后,需要調(diào)用self.purge情況buffer對(duì)象。為了更好的展示這個(gè)過(guò)程,也錄制了一個(gè)小視頻。

由于每次讀取5個(gè)socket字節(jié),因此在從socket中讀取了兩次。如果多讀了呢。多讀了也沒(méi)有關(guān)系,即使BytesIO多讀了socket的數(shù)據(jù)。在buffer對(duì)象讀取的時(shí)候還有一個(gè)length參數(shù),這個(gè)參數(shù)會(huì)保證以CRLF結(jié)尾。這也是redis設(shè)計(jì)協(xié)議的時(shí)候,為什么字符串返回要在$后加上字節(jié)的長(zhǎng)度。

多批量回復(fù)

多批量回復(fù)以*開(kāi)頭,這個(gè)編碼格式和請(qǐng)求的命令一樣。多個(gè)字節(jié)串分別編碼,然后再和*參數(shù)數(shù)結(jié)合。例如下面一個(gè)回復(fù)樣式:

*3\r\n$3\r\n777\r\n$6\r\n\xe4\xbd\xa0\xe5\xa5\xbd\r\n$5\r\nhello\r\n

再看read_response中解析多批量回復(fù)的代碼:

elif byte == '*':
            length = int(response)
            if length == -1:
                return None
            response = [self.read_response() for i in range(length)]

一旦是多批量回復(fù),因?yàn)?code>*后跟著返回的參數(shù)個(gè)數(shù),而這些參數(shù)個(gè)數(shù)的編碼和批量回復(fù)的一模一樣。既然如此,那么遞歸調(diào)用read_response,再解析出來(lái)的批量回復(fù)組合起來(lái)即可。

特殊類型回復(fù)

RESP的回復(fù)我們都介紹了,所謂的特殊。是數(shù)據(jù)情況特別的時(shí)候,比如返回空字符串的時(shí)候,token會(huì)是0,返回Nil值的時(shí)候,token可能是-1。具體這些情況,可以參考官方文檔的案例。

總結(jié)

經(jīng)過(guò)上面的分析,我們了解了redispy是如何解析redis服務(wù)器返回的RESP編碼的數(shù)據(jù)。解碼的關(guān)鍵在于對(duì)socket數(shù)據(jù)的讀取。盡管我們是模擬了socket對(duì)象。上面的代碼和實(shí)際socket交互是完全一樣的。因?yàn)檎鎸?shí)的socket.recv調(diào)用也只是應(yīng)用層的程序代碼從socket的緩沖區(qū)讀取數(shù)據(jù)。緩存區(qū)直接的IO則是內(nèi)核在tcp層處理的內(nèi)容。

我們把真實(shí)的socket.recv讀取數(shù)據(jù)從內(nèi)核轉(zhuǎn)移到一個(gè)Socket類,這樣的模擬也是合理的,并且易于調(diào)試。不然還得先模擬發(fā)送命令給redis,然后打斷點(diǎn)等待回復(fù)。

盡管我們的模擬抽象很好,可是真實(shí)的編碼還是需要處理socket的數(shù)據(jù)流,尤其是對(duì)于通信錯(cuò)誤的處理。完整的代碼可以閱讀redis.py項(xiàng)目。

簽名我們介紹了編碼,創(chuàng)建連接和現(xiàn)在接受數(shù)據(jù)并解碼。接下來(lái)將會(huì)實(shí)現(xiàn)redis.py中的另外一個(gè)特性,連接池的實(shí)現(xiàn)。

文中相關(guān)代碼

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容