架構(gòu)圖

業(yè)務(wù)層:根據(jù)業(yè)務(wù)邏輯,實(shí)現(xiàn)thrift文件中接口
接口層:根據(jù)thrift文件,生成框架代碼
協(xié)議層:對(duì)數(shù)據(jù)流進(jìn)行序列化(二進(jìn)制、json)
傳輸層:負(fù)責(zé)網(wǎng)絡(luò)傳輸
C/S模型
Client端
在Client端,用戶(hù)首先需要依次指定transport類(lèi)型、protocol類(lèi)型和client對(duì)象。client對(duì)象負(fù)責(zé)將函數(shù)名以及參數(shù)發(fā)送給server端,并且解析server端返回的結(jié)果。
Server端
- 在Processor的初始化過(guò)程,綁定用戶(hù)實(shí)現(xiàn)的handler。
class MyService:
def func(self, n1, n2):
pass
handler = MyService()
processor = MyService.Processor(handler)
- Server端建立transport,并設(shè)置protocol。
transport = TSocket.TServerSocket(port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
- 創(chuàng)建server對(duì)象并開(kāi)始服務(wù)。
server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
server.serve()
Thrift內(nèi)部流程
序列化支持
- ttypes.py
thrift文件中定義的數(shù)據(jù)格式,編譯后的存放在ttypes.py中。Thrift將用戶(hù)自定義的struct、enum和exception都轉(zhuǎn)換為python class。
class SharedStruct {
thrift_spec = (
None, #0
(1, TType.I32, 'key', None, None, ) #1
(2, TType.STRING, 'value', None, None, ) # 2
)
def __init__(self, key=None, value=None):
self.key = key
self.value = value
def read(self, iprot):
.....
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.I32:
self.key = iprot.readI32()
else:
iprot.skip(ftype)
elif:
if ftype == TType.STRING:
self.value = iprot.readString()
else:
iport.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprort.readStructEnd()
def write(self, oprot):
oprot.writeStructBegin('SharedStruct')
if self.key is not None:
oprot.writeFieldBegin('key', TType.I32, 1)
oprot.wirteI32(self.key)
oprot.writeFieldEnd()
....
可以看出,Thrift為自定義類(lèi)型生成了spec結(jié)構(gòu)、read方法和write方法。在之后的分析中,spec結(jié)構(gòu)對(duì)應(yīng)于自定義類(lèi)型的內(nèi)部數(shù)據(jù)參數(shù),
對(duì)數(shù)據(jù)結(jié)構(gòu)的解析將起到指導(dǎo)性的作用。read方法和write方法是互逆的過(guò)程,write方法按照struct name、'key'、key_type、key_value、value_tag、value_type、value_value的順序?qū)haredStruct的內(nèi)容寫(xiě)入output protocol中,而read方法按照相應(yīng)的順序,從input protocol中讀取各個(gè)字段。
注意:在TBinaryProtocol中,readStructBegin和writeStructBegin都是空操作,所以雖然傳參不同,但是實(shí)際上是互斥空操作。
Client端
client.open()
連接指定的ip地址、端口。
sum_ = client.add(1, 1)
調(diào)用client對(duì)象的send_xxx和recv_xxx方法。在send_xxx中,使用xxx_args的write方法,將參數(shù)發(fā)送給Server端;在recv_xxx中,使用xxx_result的read方法來(lái)解析Server端返回的結(jié)果。Server端會(huì)在返回結(jié)果的末尾設(shè)置TType.Stop標(biāo)識(shí)來(lái)表征消息的結(jié)束。
Server端
- 當(dāng)設(shè)置了Server端的server對(duì)象后,會(huì)調(diào)用其serve方法,該方法會(huì)建立監(jiān)聽(tīng),等待Client端的連接。
self.serverTransport.listen()
- 當(dāng)檢測(cè)到Client端發(fā)起rpc調(diào)用后,會(huì)建立相應(yīng)的數(shù)據(jù)對(duì)象,并調(diào)動(dòng)Processor的process進(jìn)行處理。
while True:
client = self.serverTransport.accept()
if not client:
continue
itrans = self.inputTransportFactory.getTransport(client)
otrans = self.outputTransportFactory.getTransport(client)
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
try:
while True:
self.processor.process(iprot, oprot)
itrans.close()
otrans.close()
itrans、otrans、iprot、oprot都是通過(guò)工廠方法實(shí)例化。itrans、otrans負(fù)責(zé)Server端與Client端的數(shù)據(jù)傳輸,iprot、oprot負(fù)責(zé)解碼工作(應(yīng)該相同)。
- process會(huì)依據(jù)Client端發(fā)送的函數(shù)標(biāo)識(shí)(add)進(jìn)行分發(fā),交由process_add方法進(jìn)行處理。process_add方法會(huì)解析參數(shù)并調(diào)用handler的add方法,并使用xxx_result的write方法來(lái)返回結(jié)果。
序列化的流程
- 先writeMessageBegin表示開(kāi)始傳輸消息了,寫(xiě)消息頭。Message里面定義了方法名,調(diào)用的類(lèi)型,消息seqId
- 寫(xiě)消息體。如果參數(shù)是一個(gè)類(lèi),就writeStructBegin
- 接下來(lái)寫(xiě)字段,writeFieldBegin, 這個(gè)方法會(huì)寫(xiě)接下來(lái)的字段的數(shù)據(jù)類(lèi)型和順序號(hào)。這個(gè)順序號(hào)是Thrfit對(duì)要傳輸?shù)淖侄蔚囊粋€(gè)編碼,從1開(kāi)始
- 如果是一個(gè)集合就writeListBegin/writeMapBegin,如果是一個(gè)基本數(shù)據(jù)類(lèi)型,比如int, 就直接writeI32
- 每個(gè)復(fù)雜數(shù)據(jù)類(lèi)型寫(xiě)完都調(diào)用writeXXXEnd,直到writeMessageEnd結(jié)束
- 讀消息時(shí)根據(jù)數(shù)據(jù)類(lèi)型讀取相應(yīng)的長(zhǎng)度
Protocol
用于信息的序列化過(guò)程,分為write和read對(duì)稱(chēng)的兩部分。
| 方法/字段 | 含義 | 內(nèi)容 |
|---|---|---|
| Message | 消息傳輸?shù)念^部 | name(方法名)+ type + seqid |
| FieldB/E/Stop | type + id(IDL中的索引) / None / STOP | |
| Struct | name | |
| Map | ktype + vtype + size | |
| List | etype + size | |
| Set | etype + size | |
| Bool/Byte/I16/I32/I64/Double/String/Binary | bool_val | |
| skip | ttype |
STRUCT
struct_begin
while True:
field_begin
if ttype == STOP:
break
skip(ttype)
field_end
struct_end
MAP
map_begin
for i in range(size):
skip(ktype)
skip(vtype)
map_end
SET / LIST
begin
for i in range(size):
skip(ttype)
end
HelloService.py
Iface
service的接口描述類(lèi);Client
客戶(hù)端發(fā)送請(qǐng)求 + 接收回饋的方法類(lèi);
class Client(Iface):
def __init__(self, iprot, oprot=None):
self._iport = self._oprot = None
if oport is not None:
self._oprot = oprot
self._seqid = 0
def getStruct(self, key):
self.send_getStruct(key)
return self.recv_getStruct()
def send_getStruct(self, key):
self._oprot.writeMessageBegin('getStruct', TMessageType.CALL, self._seqid)
args = getStruct_args()
args.key = key
args.write(self._oprot)
self._oprot.writeMessageEnd()
self._oprot.trans.flush()
def recv_getStruct(self):
iprot = self._iprot
(fname, mtype, rseqid) = iprot.readMessageBegin()
if mtype == TMessageType.EXCEPTION:
...
result = getStruct_result()
result.read(iprot)
iprot.readMessageEnd()
if result.success is not None:
return result.success
raise TApplicationException(...)
Client實(shí)現(xiàn)的用戶(hù)聲明方法的方式為send_xxx然后recv_xxx,這個(gè)模式符合rpc的調(diào)用思想,先將請(qǐng)求發(fā)出去,然后等待接收遠(yuǎn)端的回應(yīng)。
send_xxx && recv_xxx
在send_xxx中,首先會(huì)寫(xiě)入name、ttyp和seqid的信息。然后通過(guò)獲取getStruct方法的參數(shù)信息,并寫(xiě)入到輸出流中。
獲取參數(shù)信息的方式如下:
class getStruct_args:
thrift_spec = (
None, # 0
(1, TType.I32, 'key', None, None, ), # 1
)
def __init__(self, key=None,):
self.key = key
def read(self, iprot):
....
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.I32:
self.key = iprot.readI32()
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
....
oprot.writeStructBegin('getStruct_args')
if self.key is not None:
oprot.writeFieldBegin('key', TType.I32, 1)
oprot.writeI32(self.key)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
send_xxx通過(guò)調(diào)用getStruct_args的write方法來(lái)發(fā)送參數(shù)信息到遠(yuǎn)端,可以猜想到,server端會(huì)調(diào)用getStruct_args的read方法來(lái)解析參數(shù)信息。
當(dāng)server端處理完成后,會(huì)發(fā)送結(jié)果到client端。對(duì)結(jié)果的處理類(lèi)似于參數(shù)處理的逆過(guò)程。由getStruct_result負(fù)責(zé):
class getStruct_result:
thrift_spec = (
(0, TType.STRUCT, 'success', (SharedStruct, SharedStruct.thrift_spec), None, ), # 0
)
def __init__(self, success=None,):
self.success = success
def read(self, iprot):
....
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 0:
if ftype == TType.STRUCT:
self.success = SharedStruct()
self.success.read(iprot)
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
....
oprot.writeStructBegin('getStruct_result')
if self.success is not None:
oprot.writeFieldBegin('success', TType.STRUCT, 0)
self.success.write(oprot)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
server端使用getStruct_result的write來(lái)發(fā)送結(jié)果,client端會(huì)調(diào)用read來(lái)響應(yīng)的解析結(jié)果。從代碼中可以看出,結(jié)果success為T(mén)Type.STRUCT類(lèi)型,也就是SharedStruct結(jié)構(gòu)體,而對(duì)SharedStruct結(jié)構(gòu)體的解釋就位于SharedStruct類(lèi)中的thrift_spec字段。
-
Processor
服務(wù)端接收請(qǐng)求 + 調(diào)用處理函數(shù) + 返回結(jié)果的方法類(lèi);
在Processor中注冊(cè)有函數(shù)名所對(duì)應(yīng)的內(nèi)部處理方法,當(dāng)調(diào)用process時(shí),會(huì)執(zhí)行預(yù)先給定的handler并返回處理結(jié)果。
對(duì)于執(zhí)行被調(diào)用函數(shù)的情況,會(huì)返回INTERNAL_ERROR的錯(cuò)誤。- process function
name, type, seqid = readMessageBegin 校驗(yàn)name是否存在 call process_name- process_xxx
調(diào)用Get_args來(lái)讀取輸入 調(diào)用處理函數(shù)并獲取返回值 作為T(mén)MessageType.REPLY寫(xiě)回 -
Get_args
- write
writeStructBeigin('Get_args') writeField(參數(shù)名稱(chēng),TType.STRUCT, 1(索引)) 自定義類(lèi)型的write