Redispy 源碼學(xué)習(xí)(一) --- 概覽

Redis是一個高性能的Nosql內(nèi)存數(shù)據(jù)庫。代碼精簡,性能和擴(kuò)展性強(qiáng),被廣泛用于互聯(lián)網(wǎng)應(yīng)用之中。許多語言也都支持redis,并實(shí)現(xiàn)了其客戶端驅(qū)動。Python的redis驅(qū)動寫得非常好(以下簡稱redis.py),通過閱讀redis.py可以學(xué)習(xí)redis的通信協(xié)議,網(wǎng)絡(luò)客戶端的編程以及連接池管理等技術(shù),我們也將通過對這三部分的逐一解析來學(xué)習(xí)redis.py。

目錄:

  1. Redispy 源碼學(xué)習(xí)(一) --- 概覽
  2. Redispy 源碼學(xué)習(xí)(二) --- RESP協(xié)議簡介
  3. Redispy 源碼學(xué)習(xí)(三) --- RESP協(xié)議實(shí)現(xiàn)--編碼
  4. Redispy 源碼學(xué)習(xí)(四) --- 創(chuàng)建連接
  5. Redispy 源碼學(xué)習(xí)(五) --- RESP協(xié)議實(shí)現(xiàn)--解碼
  6. Redispy 源碼學(xué)習(xí)(六) --- 連接池
  7. Redispy 源碼學(xué)習(xí)(七) --- 客戶端接口
  8. Redispy 源碼學(xué)習(xí)(八) --- 多線程和阻塞連接池

Redis 通信協(xié)議

redis設(shè)計了一個非常簡單高效的通信協(xié)議RESP---REdis Serialization Protocol,該協(xié)議基于TCP的應(yīng)用層協(xié)議。在編碼RESP協(xié)議的時候,我們需要學(xué)習(xí)字符的編碼與解碼。由于TCP是流(stream)模式,并沒有邊界,因此關(guān)于抽象出來的的邊界,將會是RESP協(xié)議的重點(diǎn)處理方式。同時也會發(fā)現(xiàn)RESP設(shè)計比較巧妙。

網(wǎng)絡(luò)通信

redis.py是redis的python客戶端驅(qū)動,因此我們只需要實(shí)現(xiàn)客戶端的邏輯,服務(wù)端當(dāng)然就是redis服務(wù)器本身。簡而言之,就是我們需要使用python實(shí)現(xiàn)一個redis-cli。雖然客戶端的socket編碼不比服務(wù)端的復(fù)雜,可是要是處理不當(dāng),同樣也會帶來諸多問題。構(gòu)建一個健壯的客戶端是寫好服務(wù)端的基礎(chǔ)。

學(xué)習(xí)了RESP的編碼與解碼之后,我們就需要借助socket把網(wǎng)絡(luò)數(shù)據(jù)發(fā)送給redis服務(wù)器,同時介紹服務(wù)器的應(yīng)答,完成客戶端對數(shù)據(jù)庫的操作。

連接池

redis.py是一個redis的客戶端,主要任務(wù)是發(fā)送命令到redis服務(wù)器。對于客戶端而言,與服務(wù)器的通信基于tcp的socket,客戶端的生命周期,自然而然就需要創(chuàng)建連接,發(fā)送數(shù)據(jù),關(guān)閉連接等基本操作。

連接的頻繁創(chuàng)建與銷毀也會消耗資源,引入連接池管理連接將會是一種比較好的解決方式。redis.py的連接池寫得很不錯,我們也會從中受益良多。

redis.py的軟件架構(gòu)

一個大型的系統(tǒng)需要一個良好的設(shè)計和架構(gòu)。小的軟件或者腳本也離不開好的設(shè)計結(jié)構(gòu)。redis.py作為python的客戶端,封裝了很多redis命令的接口。因此在python中使用redis將非常方便和優(yōu)雅。

分布式

redis提供分布式功能,我們也會針對其分布式實(shí)現(xiàn)和使用解釋其原理。

閱讀方式

在閱讀redis.py源碼的時候,嘗試自己實(shí)現(xiàn)一個驅(qū)動將會對學(xué)習(xí)理解提供莫大幫助,同時也能帶來成就感。因此我們將使用Python3的編碼環(huán)境,以單文件為基礎(chǔ),實(shí)現(xiàn)一個簡易的redis-like.py。

redis.py 的架構(gòu)概覽

軟件結(jié)構(gòu)

下面我們就先對redis.py做一個簡單地概覽。redis.py已經(jīng)2.10版本。其文件結(jié)構(gòu)如下:

?  redis  tree
.
├── __init__.py
├── _compat.py
├── client.py
├── connection.py
├── exceptions.py
├── lock.py
├── sentinel.py
└── utils.py
  • _compat.py 用于處理python2和python3不兼容的函數(shù),封裝并提供統(tǒng)一的接口。
  • client.py 該文件提供接口給python代碼使用。
  • connection.py 該文件非常重要,實(shí)現(xiàn)了對redis服務(wù)器的連接創(chuàng)建銷毀和socket收發(fā)過程。
  • exceptions.py 自定義異常
  • lock.py sentinel.py 用于分布式相關(guān)的操作
  • utils.py 工具函數(shù)庫

redis.py的作者把軟件設(shè)計很清晰。不過我們可以先忽略這些結(jié)構(gòu),基于一個文件實(shí)現(xiàn)上面的功能。把核心功能實(shí)現(xiàn)之后,再拆分和組織代碼結(jié)構(gòu)。

創(chuàng)建客戶端,初始化連接池

下面一段簡單的使用代碼:

import redis

rc = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
print(rc.ping())
print(rc.get('hello'))

StrictRedis創(chuàng)建一個客戶端 rc(redis_cli),其內(nèi)部創(chuàng)建一個連接池,當(dāng)調(diào)用ping方法的時候,rc才會創(chuàng)建連接。再次調(diào)用get方法的時候,rc會從連接池中讀取連接,執(zhí)行命令。當(dāng)連接池沒有可用連接,rc又會自動創(chuàng)建連接。總之,redis在執(zhí)行命令的時候,一旦連接壞了,就會清理釋放連接,然后重建新連接,并重新執(zhí)行命令。

與服務(wù)器通信入口

無論ping還是get方法,調(diào)用的都是StrictRedisexecute_command方法。

def execute_command(self, *args, **options):
       
        pool = self.connection_pool
        command_name = args[0]
        connection = pool.get_connection(command_name, **options)
        try:
            connection.send_command(*args)
            return self.parse_response(connection, command_name, **options)
        except (ConnectionError, TimeoutError) as e:
            connection.disconnect()
            if not connection.retry_on_timeout and isinstance(e, TimeoutError):
                raise
            connection.send_command(*args)
            return self.parse_response(connection, command_name, **options)
        finally:
            pool.release(connection)

execute_command方法中,先從連接池中g(shù)et一條連接,然后調(diào)用send_command發(fā)送命令給redis服務(wù)器,接著調(diào)用parse_response方法讀取redis的返回結(jié)果。

如果執(zhí)行發(fā)送命令或者讀取結(jié)果的時候發(fā)生異常,將會主動disconnect,即釋放客戶端的連接資源(如果連接已經(jīng)斷開,就清理對象)。然后再重新發(fā)送。等到通信完畢之后,再把連接釋放回連接池。

之所以要清理連接對象,是因?yàn)樵趐ython代碼上下文中,邏輯連接還是正常,只不過實(shí)際上的tcp連接已經(jīng)close了。此時要同步邏輯連接和實(shí)際的連接。

發(fā)送命令

send_command是連接對象的方法,執(zhí)行該方法之前將會把命令參數(shù)按照RESP協(xié)議編碼。并調(diào)用send_packed_comand方法,后者會檢查連接是是否存在,如果不存在,將會創(chuàng)建連接。這一步就是rc的惰性創(chuàng)建連接入口。

def send_packed_command(self, command):
        if not self._sock:
            self.connect()
        try:
            if isinstance(command, str):
                command = [command]
            for item in command:
                self._sock.sendall(item)
        except socket.timeout:
            self.disconnect()
            raise TimeoutError("Timeout writing to socket")
        except socket.error:
            e = sys.exc_info()[1]
            self.disconnect()
            if len(e.args) == 1:
                errno, errmsg = 'UNKNOWN', e.args[0]
            else:
                errno = e.args[0]
                errmsg = e.args[1]
            raise ConnectionError("Error %s while writing to socket. %s." % (errno, errmsg))
        except:
            self.disconnect()
            raise

該方法會調(diào)用self._sock.sendall(item)將redis命令發(fā)送到服務(wù)器。

連接與連接池

調(diào)用send_packed_command之前就從連接池中讀取連接。

def get_connection(self, command_name, *keys, **options):
        "Get a connection from the pool"
        self._checkpid()
        try:
            connection = self._available_connections.pop()
        except IndexError:
            connection = self.make_connection()
        self._in_use_connections.add(connection)
        return connection

可以該方法會從可用的連接池對象中pop一個連接,如果連接不存在,那么就調(diào)用make_connection創(chuàng)建連接并返回。然后才能使用send_packed_command發(fā)送數(shù)據(jù)。

讀取響應(yīng)

發(fā)送的過程并不復(fù)雜,接收的過程則比較講究。后面的我們會詳細(xì)分析,在此只需要有個大概認(rèn)識就行了。

parse_response方法會調(diào)用連接對象的read_response方法,后者會調(diào)用self._parser.read_response()。這個 _parser對象為了兼容Hiredis而做的一個適配器。主要功能就是封裝hiredis,提供統(tǒng)一的處理連接管理和數(shù)據(jù)緩沖的接口。默認(rèn)使用PythonParse類。

_parser對象有一個_buffer屬性,后者是一個SocketBuffer類,主要封裝了對socket的接收功能,即從socket的緩沖區(qū)讀取數(shù)據(jù),通過BytesIO寫入到內(nèi)存,然后從內(nèi)存中讀取數(shù)據(jù)。通過計算對比內(nèi)存中的數(shù)據(jù)和讀取的數(shù)據(jù),控制從socket中讀取的數(shù)據(jù)。這個精妙的設(shè)計我們后面會詳細(xì)介紹。

再看_parse對象read_response方法:

def read_response(self):
        response = self._buffer.readline()
        if not response:
            raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)

        byte, response = byte_to_chr(response[0]), response[1:]

        if byte not in ('-', '+', ':', '$', '*'):
            raise InvalidResponse("Protocol Error: %s, %s" %
                                  (str(byte), str(response)))

        if byte == '-':
            response = nativestr(response)
            error = self.parse_error(response)
            if isinstance(error, ConnectionError):
                raise error
                return error
        # single value
        elif byte == '+':
            pass
        # int value
        elif byte == ':':
            response = long(response)
        # bulk response
        elif byte == '$':
            length = int(response)
            if length == -1:
                return None
            response = self._buffer.read(length)
        # multi-bulk response
        elif byte == '*':
            length = int(response)
            if length == -1:
                return None
            response = [self._buffer.read(length)() for i in xrange(length)]
        if isinstance(response, bytes) and self.encoding:
            response = response.decode(self.encoding)
        return response

調(diào)用self._buffer.readline()方法從socket讀取數(shù)據(jù)。然后根據(jù)RESP協(xié)議處理redis的回復(fù)類型,接著逐一解析返回的數(shù)據(jù),并返回。注意,遇到RESP中的批量回復(fù)(bulk response)和多批量回復(fù)(multi-bulk response),還需要調(diào)用self._buffer.read()和遞歸調(diào)用self._buffer.read(length)解析。

根據(jù)self._buffer.readline()只能讀取返回一行的數(shù)據(jù),因?yàn)閞edis是用\r\n區(qū)分?jǐn)?shù)據(jù),因此調(diào)用readline的時候,在批量回復(fù)和多批量回復(fù)的情況下,只能讀取最前面的參數(shù),后面的socket數(shù)據(jù)還在socket的緩沖區(qū),所以需要繼續(xù)調(diào)用read方法讀取解析。后面的分析我們將會了解,多批量回復(fù)可以分解為多個批量回復(fù),因此就與了迭代response的遞歸調(diào)用response = [self.read_response() for i in xrange(length)]。這也是經(jīng)典的tcp流無邊界問題的處理方式。

斷開連接

讀取響應(yīng)之后,交互就完成了使命。redis維護(hù)的是一個連接,也就是根據(jù)redis的timeout參數(shù)來決定連接的空閑時間。默認(rèn)配置是0,即如果redis不主動close這個連接,連接將會一直存在。所以客戶端不會主動disconnect連接,而是釋放其回到連接池pool.release(connection)。前面我們已經(jīng)提到,只要交互過程中發(fā)生了異常,客戶端才會主動調(diào)用disconnect方法釋放與連接相關(guān)的資源和對象。

如果設(shè)置了redis連接的最大空閑時間: CONFIG SET TIMEOUT 30。那么每個redis的連接在30s之后,服務(wù)器都會主動close。此時的客戶端還認(rèn)為連接是正常的,執(zhí)行收發(fā)數(shù)據(jù)的時候?qū)伄惓!_@時就需要同步客戶端和服務(wù)器的連接狀態(tài)。

上面的過程可以用下面的流程圖簡要的說明:


                                    +-----------------------+
      +------------+                |         pool          |
      |            |                |-----------------------|
      |  client    |    1           |                       |
      |            ++-------------> |    connection_pool    |
      +-----+------+                |                       |
            |           2           |                       |
            +---------------------> |  execute_command      |
                                    +----------+------------+
                                               |
                                               |3
                                               |
                                               |
                                               v
                                    +-----------------------+          +---------------------+              +--------------------+
                                    |       pool            |          |       pool          |              |       connection   |
                                    |-----------------------|   4      |---------------------|              |--------------------|
                                    |     get_conncetion    ++-------->|                     |      6       |                    |
                                    |                       |          |     pop connection  | <-----------+|    init a conn     |
                                    |     send_command      +-----+    |                     |     5        |                    |
                      16            |                       |     |    |     make_connection |+------------>|    conect          |
                +-------------------+     parse_response    |     |    |                     |              |                    |
                |                   |                       |     |    +---------------------+              +------------+-------+
                |                   |     release           |     |                                          ^           |
                |                   +-----------------------+     +7                                         |           |
                |                                                 |                                          |           |10
                |                                                 |                                          |           |
                |                                                 |                                          |           |
    +--------------------------+                                  v                                          |           v
    |     connection           |    +-------------------+        +---------------------------------+         |   +-------------------+        +---------------+
    |--------------------------|    |                   |        |           connection            |         |   |      connection   |        |    connection |
    |                          |    |                   |        |---------------------------------|         |   |-------------------|        |---------------|
    |     _parse.read_response |    |                   |  8     |                                 |         |   |                   |        |               |
    |                          |    |    pack command   |<------+|        pack_command             |         |   |                   |   11   |               |
    +-----------+--------------+    |                   |------->|                                 |         |   |     create socket +--------|    on_connect |
                |                   +-------------------+        +----------------+----------------+         |   |                   |        |               |
                |                                                                 |                          |   |                   |        |               |
                |17                                                               v                          |   +--------+----------+        +------+--------+
                v                                                +---------------------------------+         |            |                          +
    +--------------------------+                                 |          connection             |         |            |                          |
    |       pythonparse        |                                 |---------------------------------|         |            |                          |
    |--------------------------|                                 |                                 |   9     |            |                          |12
    |                          |      18                         |        check sock connect       |+--------+            |                          |
    |      _buffer.readline    +------------+                    |                                 |<---------------------+                          |
    |                          |            |                    +        sock sendall             |                                                 v
    |                          |            |                    |                                 |<--------------------------------+        +----------------+
    |      handle response     |            |                    +-----------------+---------------+                                 |        | PythonParse    |
    |                          |            |                                      |                                                 |        |----------------|
    |                          |            |                                      |15                                               |        |                |
    +----------+---------------+            |                                      |                                                 |        |   on_connect   |
               |                            |                                      v                                                 |        |                |
               |                            |                    +-----------------+----------------+                                |        |                |
               |                            |                    |                                  |                                |        +-------+--------+
               |                            |                    |           end                    |                                |                |
               |19                          |                    |                                  |                                |14              |
               |                            |                    +----------------------------------+                                |                |
               v                            v                                                                                        |                |13
    +---------------------------+       +----------------------------+                  +-------------------------------+            |                |
    |       pythonparse         |       |      ScoketBuffer          |                  |         SocketBuffer          |            |                v
    |---------------------------|       |----------------------------|                  |-------------------------------|            |       +------------------+
    |                           |       |                            |                  |                               |            |       |                  |
    |       _buffer.read        |       |    readline                |       21         |                               |            |       |------------------|
    |                           |  20   |                            |+---------------->|      read from socket         |            |       |                  |
    |                           |------>|    read                    |                  |                               |            |       |
    |                           |       |                            |                  |                               |            +-------+ init SocketBuffer|
    +---------------------------+       +----------------------------+                  +-------------------------------+                    |                  |
                                                                                                                                             +------------------+
  1. StrictRedis創(chuàng)建客戶端對象,并初始化連接池
  2. 執(zhí)行ping命令
  3. 調(diào)用execute_command 從使用 get_connection 從連接池讀取連接,然后發(fā)送命令,接著解析返回的響應(yīng),最后釋放連接。
  4. get_connection 調(diào)用,要不重連接池中pop一個連接,如果連接不存在,則調(diào)用make_connection 方法創(chuàng)建連接。
  5. 初始化連接對象。
  6. 返回連接對象
  7. 執(zhí)行send_command 函數(shù),打包編碼resp協(xié)議的命令。
  8. 編碼resp過程。
  9. 檢查連接是否存在,如果存在則發(fā)送socket數(shù)據(jù)。如果不存在,則調(diào)用connect方法創(chuàng)建連接對象。
  10. 創(chuàng)建socket,用于網(wǎng)絡(luò)通信。
  11. 連接創(chuàng)建之后,調(diào)用連接的on_connect方法。
  12. 調(diào)用 pythonparse的on connect方法。初始化socketbuffer對象,用于接受數(shù)據(jù)時候的socket通信。
  13. 初始化 socketbuffer對象。
  14. 逐步返回連接對象,直到可以sendall數(shù)據(jù)到服務(wù)器。
  15. 結(jié)束發(fā)送過程。
  16. 調(diào)用parse_response 方法,用于讀取服務(wù)器返回的響應(yīng)數(shù)據(jù)
  17. 逐步回溯調(diào)用pythonparse封裝的方法讀取一行數(shù)據(jù)。
  18. 通過socketbuffer讀取一行數(shù)據(jù)
  19. 遇到批量回復(fù)或多批量回復(fù),調(diào)用read讀取除token之后的數(shù)據(jù)。
  20. 與19類似,遞歸處理多批量回復(fù)。
  21. 從socket讀取數(shù)據(jù)。

總結(jié)

經(jīng)過上面的簡述,我們對redis.py的大致框架和功能有了初步的了解,接下來就是針對上面所提及的三個方面深入解析。

RESP協(xié)議的學(xué)習(xí)比較簡單,連接池的設(shè)計也不會很難,比較核心的關(guān)鍵是網(wǎng)絡(luò)通信相關(guān)的處理。收發(fā)數(shù)據(jù)是我們的核心重點(diǎn),連接管理也是舉足輕重。一個經(jīng)典的問題就是客戶端的代碼邏輯上的連接還存在,可是實(shí)際的tcp連接已經(jīng)close,此時的收發(fā)數(shù)據(jù)該如何處理和管理呢?這將成為我們接下來閱讀redispy的關(guān)鍵。

接下來將會在分別介紹redispy源碼的時,提供文中使用的客戶端測試代碼,于文末的gist提供。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容