Redis是一個高性能的Nosql內(nèi)存數(shù)據(jù)庫。代碼精簡,性能和擴(kuò)展性強(qiáng),被廣泛用于互聯(lián)網(wǎng)應(yīng)用之中。許多語言也都支持redis,并實(shí)現(xiàn)了其客戶端驅(qū)動。Python的redis驅(qū)動寫得非常好(以下簡稱redis.py),通過閱讀redis.py可以學(xué)習(xí)redis的通信協(xié)議,網(wǎng)絡(luò)客戶端的編程以及連接池管理等技術(shù),我們也將通過對這三部分的逐一解析來學(xué)習(xí)redis.py。
目錄:
- Redispy 源碼學(xué)習(xí)(一) --- 概覽
- Redispy 源碼學(xué)習(xí)(二) --- RESP協(xié)議簡介
- Redispy 源碼學(xué)習(xí)(三) --- RESP協(xié)議實(shí)現(xiàn)--編碼
- Redispy 源碼學(xué)習(xí)(四) --- 創(chuàng)建連接
- Redispy 源碼學(xué)習(xí)(五) --- RESP協(xié)議實(shí)現(xiàn)--解碼
- Redispy 源碼學(xué)習(xí)(六) --- 連接池
- Redispy 源碼學(xué)習(xí)(七) --- 客戶端接口
- Redispy 源碼學(xué)習(xí)(八) --- 多線程和阻塞連接池
Redis 通信協(xié)議
redis設(shè)計了一個非常簡單高效的通信協(xié)議RESP---REdis Serialization Protocol,該協(xié)議基于TCP的應(yīng)用層協(xié)議。在編碼RESP協(xié)議的時候,我們需要學(xué)習(xí)字符的編碼與解碼。由于TCP是流(stream)模式,并沒有邊界,因此關(guān)于抽象出來的包的邊界,將會是RESP協(xié)議的重點(diǎn)處理方式。同時也會發(fā)現(xiàn)RESP設(shè)計比較巧妙。
網(wǎng)絡(luò)通信
redis.py是redis的python客戶端驅(qū)動,因此我們只需要實(shí)現(xiàn)客戶端的邏輯,服務(wù)端當(dāng)然就是redis服務(wù)器本身。簡而言之,就是我們需要使用python實(shí)現(xiàn)一個redis-cli。雖然客戶端的socket編碼不比服務(wù)端的復(fù)雜,可是要是處理不當(dāng),同樣也會帶來諸多問題。構(gòu)建一個健壯的客戶端是寫好服務(wù)端的基礎(chǔ)。
學(xué)習(xí)了RESP的編碼與解碼之后,我們就需要借助socket把網(wǎng)絡(luò)數(shù)據(jù)發(fā)送給redis服務(wù)器,同時介紹服務(wù)器的應(yīng)答,完成客戶端對數(shù)據(jù)庫的操作。
連接池
redis.py是一個redis的客戶端,主要任務(wù)是發(fā)送命令到redis服務(wù)器。對于客戶端而言,與服務(wù)器的通信基于tcp的socket,客戶端的生命周期,自然而然就需要創(chuàng)建連接,發(fā)送數(shù)據(jù),關(guān)閉連接等基本操作。
連接的頻繁創(chuàng)建與銷毀也會消耗資源,引入連接池管理連接將會是一種比較好的解決方式。redis.py的連接池寫得很不錯,我們也會從中受益良多。
redis.py的軟件架構(gòu)
一個大型的系統(tǒng)需要一個良好的設(shè)計和架構(gòu)。小的軟件或者腳本也離不開好的設(shè)計結(jié)構(gòu)。redis.py作為python的客戶端,封裝了很多redis命令的接口。因此在python中使用redis將非常方便和優(yōu)雅。
分布式
redis提供分布式功能,我們也會針對其分布式實(shí)現(xiàn)和使用解釋其原理。
閱讀方式
在閱讀redis.py源碼的時候,嘗試自己實(shí)現(xiàn)一個驅(qū)動將會對學(xué)習(xí)理解提供莫大幫助,同時也能帶來成就感。因此我們將使用Python3的編碼環(huán)境,以單文件為基礎(chǔ),實(shí)現(xiàn)一個簡易的redis-like.py。
redis.py 的架構(gòu)概覽
軟件結(jié)構(gòu)
下面我們就先對redis.py做一個簡單地概覽。redis.py已經(jīng)2.10版本。其文件結(jié)構(gòu)如下:
? redis tree
.
├── __init__.py
├── _compat.py
├── client.py
├── connection.py
├── exceptions.py
├── lock.py
├── sentinel.py
└── utils.py
- _compat.py 用于處理python2和python3不兼容的函數(shù),封裝并提供統(tǒng)一的接口。
- client.py 該文件提供接口給python代碼使用。
- connection.py 該文件非常重要,實(shí)現(xiàn)了對redis服務(wù)器的連接創(chuàng)建銷毀和socket收發(fā)過程。
- exceptions.py 自定義異常
- lock.py sentinel.py 用于分布式相關(guān)的操作
- utils.py 工具函數(shù)庫
redis.py的作者把軟件設(shè)計很清晰。不過我們可以先忽略這些結(jié)構(gòu),基于一個文件實(shí)現(xiàn)上面的功能。把核心功能實(shí)現(xiàn)之后,再拆分和組織代碼結(jié)構(gòu)。
創(chuàng)建客戶端,初始化連接池
下面一段簡單的使用代碼:
import redis
rc = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
print(rc.ping())
print(rc.get('hello'))
StrictRedis創(chuàng)建一個客戶端 rc(redis_cli),其內(nèi)部創(chuàng)建一個連接池,當(dāng)調(diào)用ping方法的時候,rc才會創(chuàng)建連接。再次調(diào)用get方法的時候,rc會從連接池中讀取連接,執(zhí)行命令。當(dāng)連接池沒有可用連接,rc又會自動創(chuàng)建連接。總之,redis在執(zhí)行命令的時候,一旦連接壞了,就會清理釋放連接,然后重建新連接,并重新執(zhí)行命令。
與服務(wù)器通信入口
無論ping還是get方法,調(diào)用的都是StrictRedis的execute_command方法。
def execute_command(self, *args, **options):
pool = self.connection_pool
command_name = args[0]
connection = pool.get_connection(command_name, **options)
try:
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
except (ConnectionError, TimeoutError) as e:
connection.disconnect()
if not connection.retry_on_timeout and isinstance(e, TimeoutError):
raise
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
finally:
pool.release(connection)
execute_command方法中,先從連接池中g(shù)et一條連接,然后調(diào)用send_command發(fā)送命令給redis服務(wù)器,接著調(diào)用parse_response方法讀取redis的返回結(jié)果。
如果執(zhí)行發(fā)送命令或者讀取結(jié)果的時候發(fā)生異常,將會主動disconnect,即釋放客戶端的連接資源(如果連接已經(jīng)斷開,就清理對象)。然后再重新發(fā)送。等到通信完畢之后,再把連接釋放回連接池。
之所以要清理連接對象,是因?yàn)樵趐ython代碼上下文中,邏輯連接還是正常,只不過實(shí)際上的tcp連接已經(jīng)close了。此時要同步邏輯連接和實(shí)際的連接。
發(fā)送命令
send_command是連接對象的方法,執(zhí)行該方法之前將會把命令參數(shù)按照RESP協(xié)議編碼。并調(diào)用send_packed_comand方法,后者會檢查連接是是否存在,如果不存在,將會創(chuàng)建連接。這一步就是rc的惰性創(chuàng)建連接入口。
def send_packed_command(self, command):
if not self._sock:
self.connect()
try:
if isinstance(command, str):
command = [command]
for item in command:
self._sock.sendall(item)
except socket.timeout:
self.disconnect()
raise TimeoutError("Timeout writing to socket")
except socket.error:
e = sys.exc_info()[1]
self.disconnect()
if len(e.args) == 1:
errno, errmsg = 'UNKNOWN', e.args[0]
else:
errno = e.args[0]
errmsg = e.args[1]
raise ConnectionError("Error %s while writing to socket. %s." % (errno, errmsg))
except:
self.disconnect()
raise
該方法會調(diào)用self._sock.sendall(item)將redis命令發(fā)送到服務(wù)器。
連接與連接池
調(diào)用send_packed_command之前就從連接池中讀取連接。
def get_connection(self, command_name, *keys, **options):
"Get a connection from the pool"
self._checkpid()
try:
connection = self._available_connections.pop()
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
return connection
可以該方法會從可用的連接池對象中pop一個連接,如果連接不存在,那么就調(diào)用make_connection創(chuàng)建連接并返回。然后才能使用send_packed_command發(fā)送數(shù)據(jù)。
讀取響應(yīng)
發(fā)送的過程并不復(fù)雜,接收的過程則比較講究。后面的我們會詳細(xì)分析,在此只需要有個大概認(rèn)識就行了。
parse_response方法會調(diào)用連接對象的read_response方法,后者會調(diào)用self._parser.read_response()。這個 _parser對象為了兼容Hiredis而做的一個適配器。主要功能就是封裝hiredis,提供統(tǒng)一的處理連接管理和數(shù)據(jù)緩沖的接口。默認(rèn)使用PythonParse類。
_parser對象有一個_buffer屬性,后者是一個SocketBuffer類,主要封裝了對socket的接收功能,即從socket的緩沖區(qū)讀取數(shù)據(jù),通過BytesIO寫入到內(nèi)存,然后從內(nèi)存中讀取數(shù)據(jù)。通過計算對比內(nèi)存中的數(shù)據(jù)和讀取的數(shù)據(jù),控制從socket中讀取的數(shù)據(jù)。這個精妙的設(shè)計我們后面會詳細(xì)介紹。
再看_parse對象read_response方法:
def read_response(self):
response = self._buffer.readline()
if not response:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
byte, response = byte_to_chr(response[0]), response[1:]
if byte not in ('-', '+', ':', '$', '*'):
raise InvalidResponse("Protocol Error: %s, %s" %
(str(byte), str(response)))
if byte == '-':
response = nativestr(response)
error = self.parse_error(response)
if isinstance(error, ConnectionError):
raise error
return error
# single value
elif byte == '+':
pass
# int value
elif byte == ':':
response = long(response)
# bulk response
elif byte == '$':
length = int(response)
if length == -1:
return None
response = self._buffer.read(length)
# multi-bulk response
elif byte == '*':
length = int(response)
if length == -1:
return None
response = [self._buffer.read(length)() for i in xrange(length)]
if isinstance(response, bytes) and self.encoding:
response = response.decode(self.encoding)
return response
調(diào)用self._buffer.readline()方法從socket讀取數(shù)據(jù)。然后根據(jù)RESP協(xié)議處理redis的回復(fù)類型,接著逐一解析返回的數(shù)據(jù),并返回。注意,遇到RESP中的批量回復(fù)(bulk response)和多批量回復(fù)(multi-bulk response),還需要調(diào)用self._buffer.read()和遞歸調(diào)用self._buffer.read(length)解析。
根據(jù)self._buffer.readline()只能讀取返回一行的數(shù)據(jù),因?yàn)閞edis是用\r\n區(qū)分?jǐn)?shù)據(jù),因此調(diào)用readline的時候,在批量回復(fù)和多批量回復(fù)的情況下,只能讀取最前面的參數(shù),后面的socket數(shù)據(jù)還在socket的緩沖區(qū),所以需要繼續(xù)調(diào)用read方法讀取解析。后面的分析我們將會了解,多批量回復(fù)可以分解為多個批量回復(fù),因此就與了迭代response的遞歸調(diào)用response = [self.read_response() for i in xrange(length)]。這也是經(jīng)典的tcp流無邊界問題的處理方式。
斷開連接
讀取響應(yīng)之后,交互就完成了使命。redis維護(hù)的是一個長連接,也就是根據(jù)redis的timeout參數(shù)來決定連接的空閑時間。默認(rèn)配置是0,即如果redis不主動close這個連接,連接將會一直存在。所以客戶端不會主動disconnect連接,而是釋放其回到連接池pool.release(connection)。前面我們已經(jīng)提到,只要交互過程中發(fā)生了異常,客戶端才會主動調(diào)用disconnect方法釋放與連接相關(guān)的資源和對象。
如果設(shè)置了redis連接的最大空閑時間: CONFIG SET TIMEOUT 30。那么每個redis的連接在30s之后,服務(wù)器都會主動close。此時的客戶端還認(rèn)為連接是正常的,執(zhí)行收發(fā)數(shù)據(jù)的時候?qū)伄惓!_@時就需要同步客戶端和服務(wù)器的連接狀態(tài)。
上面的過程可以用下面的流程圖簡要的說明:
+-----------------------+
+------------+ | pool |
| | |-----------------------|
| client | 1 | |
| ++-------------> | connection_pool |
+-----+------+ | |
| 2 | |
+---------------------> | execute_command |
+----------+------------+
|
|3
|
|
v
+-----------------------+ +---------------------+ +--------------------+
| pool | | pool | | connection |
|-----------------------| 4 |---------------------| |--------------------|
| get_conncetion ++-------->| | 6 | |
| | | pop connection | <-----------+| init a conn |
| send_command +-----+ | | 5 | |
16 | | | | make_connection |+------------>| conect |
+-------------------+ parse_response | | | | | |
| | | | +---------------------+ +------------+-------+
| | release | | ^ |
| +-----------------------+ +7 | |
| | | |10
| | | |
| | | |
+--------------------------+ v | v
| connection | +-------------------+ +---------------------------------+ | +-------------------+ +---------------+
|--------------------------| | | | connection | | | connection | | connection |
| | | | |---------------------------------| | |-------------------| |---------------|
| _parse.read_response | | | 8 | | | | | | |
| | | pack command |<------+| pack_command | | | | 11 | |
+-----------+--------------+ | |------->| | | | create socket +--------| on_connect |
| +-------------------+ +----------------+----------------+ | | | | |
| | | | | | |
|17 v | +--------+----------+ +------+--------+
v +---------------------------------+ | | +
+--------------------------+ | connection | | | |
| pythonparse | |---------------------------------| | | |
|--------------------------| | | 9 | | |12
| | 18 | check sock connect |+--------+ | |
| _buffer.readline +------------+ | |<---------------------+ |
| | | + sock sendall | v
| | | | |<--------------------------------+ +----------------+
| handle response | | +-----------------+---------------+ | | PythonParse |
| | | | | |----------------|
| | | |15 | | |
+----------+---------------+ | | | | on_connect |
| | v | | |
| | +-----------------+----------------+ | | |
| | | | | +-------+--------+
| | | end | | |
|19 | | | |14 |
| | +----------------------------------+ | |
v v | |13
+---------------------------+ +----------------------------+ +-------------------------------+ | |
| pythonparse | | ScoketBuffer | | SocketBuffer | | v
|---------------------------| |----------------------------| |-------------------------------| | +------------------+
| | | | | | | | |
| _buffer.read | | readline | 21 | | | |------------------|
| | 20 | |+---------------->| read from socket | | | |
| |------>| read | | | | |
| | | | | | +-------+ init SocketBuffer|
+---------------------------+ +----------------------------+ +-------------------------------+ | |
+------------------+
- StrictRedis創(chuàng)建客戶端對象,并初始化連接池
- 執(zhí)行ping命令
- 調(diào)用execute_command 從使用 get_connection 從連接池讀取連接,然后發(fā)送命令,接著解析返回的響應(yīng),最后釋放連接。
- get_connection 調(diào)用,要不重連接池中pop一個連接,如果連接不存在,則調(diào)用make_connection 方法創(chuàng)建連接。
- 初始化連接對象。
- 返回連接對象
- 執(zhí)行send_command 函數(shù),打包編碼resp協(xié)議的命令。
- 編碼resp過程。
- 檢查連接是否存在,如果存在則發(fā)送socket數(shù)據(jù)。如果不存在,則調(diào)用connect方法創(chuàng)建連接對象。
- 創(chuàng)建socket,用于網(wǎng)絡(luò)通信。
- 連接創(chuàng)建之后,調(diào)用連接的on_connect方法。
- 調(diào)用 pythonparse的on connect方法。初始化socketbuffer對象,用于接受數(shù)據(jù)時候的socket通信。
- 初始化 socketbuffer對象。
- 逐步返回連接對象,直到可以sendall數(shù)據(jù)到服務(wù)器。
- 結(jié)束發(fā)送過程。
- 調(diào)用parse_response 方法,用于讀取服務(wù)器返回的響應(yīng)數(shù)據(jù)
- 逐步回溯調(diào)用pythonparse封裝的方法讀取一行數(shù)據(jù)。
- 通過socketbuffer讀取一行數(shù)據(jù)
- 遇到批量回復(fù)或多批量回復(fù),調(diào)用read讀取除token之后的數(shù)據(jù)。
- 與19類似,遞歸處理多批量回復(fù)。
- 從socket讀取數(shù)據(jù)。
總結(jié)
經(jīng)過上面的簡述,我們對redis.py的大致框架和功能有了初步的了解,接下來就是針對上面所提及的三個方面深入解析。
RESP協(xié)議的學(xué)習(xí)比較簡單,連接池的設(shè)計也不會很難,比較核心的關(guān)鍵是網(wǎng)絡(luò)通信相關(guān)的處理。收發(fā)數(shù)據(jù)是我們的核心重點(diǎn),連接管理也是舉足輕重。一個經(jīng)典的問題就是客戶端的代碼邏輯上的連接還存在,可是實(shí)際的tcp連接已經(jīng)close,此時的收發(fā)數(shù)據(jù)該如何處理和管理呢?這將成為我們接下來閱讀redispy的關(guān)鍵。
接下來將會在分別介紹redispy源碼的時,提供文中使用的客戶端測試代碼,于文末的gist提供。