Thrift

架構(gòu)圖

業(yè)務(wù)層:根據(jù)業(yè)務(wù)邏輯,實(shí)現(xiàn)thrift文件中接口
接口層:根據(jù)thrift文件,生成框架代碼
協(xié)議層:對(duì)數(shù)據(jù)流進(jìn)行序列化(二進(jìn)制、json)
傳輸層:負(fù)責(zé)網(wǎng)絡(luò)傳輸

C/S模型

Client端

在Client端,用戶(hù)首先需要依次指定transport類(lèi)型、protocol類(lèi)型和client對(duì)象。client對(duì)象負(fù)責(zé)將函數(shù)名以及參數(shù)發(fā)送給server端,并且解析server端返回的結(jié)果。

Server端
  1. 在Processor的初始化過(guò)程,綁定用戶(hù)實(shí)現(xiàn)的handler。
class MyService:
  def func(self, n1, n2):
        pass
handler = MyService()
processor = MyService.Processor(handler)
  1. Server端建立transport,并設(shè)置protocol。
transport = TSocket.TServerSocket(port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
  1. 創(chuàng)建server對(duì)象并開(kāi)始服務(wù)。
server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
server.serve()

Thrift內(nèi)部流程

序列化支持
  1. ttypes.py
    thrift文件中定義的數(shù)據(jù)格式,編譯后的存放在ttypes.py中。Thrift將用戶(hù)自定義的struct、enum和exception都轉(zhuǎn)換為python class。
class SharedStruct {
  thrift_spec = (
    None, #0
    (1, TType.I32, 'key', None, None, ) #1
    (2, TType.STRING, 'value', None, None, ) # 2
  )
  
  def __init__(self, key=None, value=None):
    self.key = key
    self.value = value

  def read(self, iprot):
    .....
    iprot.readStructBegin()
    while True:
      (fname, ftype, fid) = iprot.readFieldBegin()
      if ftype == TType.STOP:
        break
      if fid == 1:
        if ftype == TType.I32:
          self.key = iprot.readI32()
        else:
          iprot.skip(ftype)
      elif:
        if ftype == TType.STRING:
          self.value = iprot.readString()
        else:
          iport.skip(ftype)  
      else:
        iprot.skip(ftype)
      iprot.readFieldEnd()
    iprort.readStructEnd()

    def write(self, oprot):
      oprot.writeStructBegin('SharedStruct')
      if self.key is not  None:
        oprot.writeFieldBegin('key', TType.I32, 1)
        oprot.wirteI32(self.key)
        oprot.writeFieldEnd()
      ....

可以看出,Thrift為自定義類(lèi)型生成了spec結(jié)構(gòu)、read方法和write方法。在之后的分析中,spec結(jié)構(gòu)對(duì)應(yīng)于自定義類(lèi)型的內(nèi)部數(shù)據(jù)參數(shù),
對(duì)數(shù)據(jù)結(jié)構(gòu)的解析將起到指導(dǎo)性的作用。read方法和write方法是互逆的過(guò)程,write方法按照struct name、'key'、key_type、key_value、value_tag、value_type、value_value的順序?qū)haredStruct的內(nèi)容寫(xiě)入output protocol中,而read方法按照相應(yīng)的順序,從input protocol中讀取各個(gè)字段。
注意:在TBinaryProtocol中,readStructBegin和writeStructBegin都是空操作,所以雖然傳參不同,但是實(shí)際上是互斥空操作。

Client端
client.open()

連接指定的ip地址、端口。

sum_ = client.add(1, 1)

調(diào)用client對(duì)象的send_xxx和recv_xxx方法。在send_xxx中,使用xxx_args的write方法,將參數(shù)發(fā)送給Server端;在recv_xxx中,使用xxx_result的read方法來(lái)解析Server端返回的結(jié)果。Server端會(huì)在返回結(jié)果的末尾設(shè)置TType.Stop標(biāo)識(shí)來(lái)表征消息的結(jié)束。

Server端
  • 當(dāng)設(shè)置了Server端的server對(duì)象后,會(huì)調(diào)用其serve方法,該方法會(huì)建立監(jiān)聽(tīng),等待Client端的連接。
    self.serverTransport.listen()
  • 當(dāng)檢測(cè)到Client端發(fā)起rpc調(diào)用后,會(huì)建立相應(yīng)的數(shù)據(jù)對(duì)象,并調(diào)動(dòng)Processor的process進(jìn)行處理。
    while True:
      client = self.serverTransport.accept()
      if not client:
        continue
      itrans = self.inputTransportFactory.getTransport(client)
      otrans = self.outputTransportFactory.getTransport(client)
      iprot = self.inputProtocolFactory.getProtocol(itrans)
      oprot = self.outputProtocolFactory.getProtocol(otrans)
      try:
        while True:
          self.processor.process(iprot, oprot)
      itrans.close()
      otrans.close()

itrans、otrans、iprot、oprot都是通過(guò)工廠方法實(shí)例化。itrans、otrans負(fù)責(zé)Server端與Client端的數(shù)據(jù)傳輸,iprot、oprot負(fù)責(zé)解碼工作(應(yīng)該相同)。

  • process會(huì)依據(jù)Client端發(fā)送的函數(shù)標(biāo)識(shí)(add)進(jìn)行分發(fā),交由process_add方法進(jìn)行處理。process_add方法會(huì)解析參數(shù)并調(diào)用handler的add方法,并使用xxx_result的write方法來(lái)返回結(jié)果。
序列化的流程
  1. 先writeMessageBegin表示開(kāi)始傳輸消息了,寫(xiě)消息頭。Message里面定義了方法名,調(diào)用的類(lèi)型,消息seqId
  2. 寫(xiě)消息體。如果參數(shù)是一個(gè)類(lèi),就writeStructBegin
  3. 接下來(lái)寫(xiě)字段,writeFieldBegin, 這個(gè)方法會(huì)寫(xiě)接下來(lái)的字段的數(shù)據(jù)類(lèi)型和順序號(hào)。這個(gè)順序號(hào)是Thrfit對(duì)要傳輸?shù)淖侄蔚囊粋€(gè)編碼,從1開(kāi)始
  4. 如果是一個(gè)集合就writeListBegin/writeMapBegin,如果是一個(gè)基本數(shù)據(jù)類(lèi)型,比如int, 就直接writeI32
  5. 每個(gè)復(fù)雜數(shù)據(jù)類(lèi)型寫(xiě)完都調(diào)用writeXXXEnd,直到writeMessageEnd結(jié)束
  6. 讀消息時(shí)根據(jù)數(shù)據(jù)類(lèi)型讀取相應(yīng)的長(zhǎng)度
Protocol

用于信息的序列化過(guò)程,分為write和read對(duì)稱(chēng)的兩部分。

方法/字段 含義 內(nèi)容
Message 消息傳輸?shù)念^部 name(方法名)+ type + seqid
FieldB/E/Stop type + id(IDL中的索引) / None / STOP
Struct name
Map ktype + vtype + size
List etype + size
Set etype + size
Bool/Byte/I16/I32/I64/Double/String/Binary bool_val
skip ttype

STRUCT

struct_begin
while True:  
      field_begin
      if ttype == STOP:
          break
     skip(ttype)
    field_end
struct_end

MAP

map_begin
for i in range(size):
  skip(ktype)
  skip(vtype)
map_end

SET / LIST

begin
for i in range(size):
  skip(ttype)
end
HelloService.py
  • Iface
    service的接口描述類(lèi);

  • Client
    客戶(hù)端發(fā)送請(qǐng)求 + 接收回饋的方法類(lèi);

class Client(Iface):
  def __init__(self, iprot, oprot=None):
    self._iport = self._oprot = None
    if oport is not None:
      self._oprot = oprot
    self._seqid = 0
  def getStruct(self, key):
    self.send_getStruct(key)
    return self.recv_getStruct()

  def send_getStruct(self, key):
    self._oprot.writeMessageBegin('getStruct', TMessageType.CALL, self._seqid)
    args = getStruct_args()
    args.key = key
    args.write(self._oprot)
    self._oprot.writeMessageEnd()
    self._oprot.trans.flush()

  def recv_getStruct(self):
    iprot = self._iprot
    (fname, mtype, rseqid) = iprot.readMessageBegin()
    if mtype == TMessageType.EXCEPTION:
      ...
    result = getStruct_result()
    result.read(iprot)
    iprot.readMessageEnd()
    if result.success is not None:
      return result.success
    raise TApplicationException(...)

Client實(shí)現(xiàn)的用戶(hù)聲明方法的方式為send_xxx然后recv_xxx,這個(gè)模式符合rpc的調(diào)用思想,先將請(qǐng)求發(fā)出去,然后等待接收遠(yuǎn)端的回應(yīng)。

send_xxx && recv_xxx

在send_xxx中,首先會(huì)寫(xiě)入name、ttyp和seqid的信息。然后通過(guò)獲取getStruct方法的參數(shù)信息,并寫(xiě)入到輸出流中。
獲取參數(shù)信息的方式如下:

class getStruct_args:
  thrift_spec = (
    None, # 0
    (1, TType.I32, 'key', None, None, ), # 1
  )

  def __init__(self, key=None,):
    self.key = key

  def read(self, iprot):
    ....
    iprot.readStructBegin()
    while True:
      (fname, ftype, fid) = iprot.readFieldBegin()
      if ftype == TType.STOP:
        break
      if fid == 1:
        if ftype == TType.I32:
          self.key = iprot.readI32()
        else:
          iprot.skip(ftype)
      else:
        iprot.skip(ftype)
      iprot.readFieldEnd()
    iprot.readStructEnd()

  def write(self, oprot):
    ....
    oprot.writeStructBegin('getStruct_args')
    if self.key is not None:
      oprot.writeFieldBegin('key', TType.I32, 1)
      oprot.writeI32(self.key)
      oprot.writeFieldEnd()
    oprot.writeFieldStop()
    oprot.writeStructEnd()

send_xxx通過(guò)調(diào)用getStruct_args的write方法來(lái)發(fā)送參數(shù)信息到遠(yuǎn)端,可以猜想到,server端會(huì)調(diào)用getStruct_args的read方法來(lái)解析參數(shù)信息。
當(dāng)server端處理完成后,會(huì)發(fā)送結(jié)果到client端。對(duì)結(jié)果的處理類(lèi)似于參數(shù)處理的逆過(guò)程。由getStruct_result負(fù)責(zé):

class getStruct_result:
  thrift_spec = (
    (0, TType.STRUCT, 'success', (SharedStruct, SharedStruct.thrift_spec), None, ), # 0
  )

  def __init__(self, success=None,):
    self.success = success

  def read(self, iprot):
    ....
    iprot.readStructBegin()
    while True:
      (fname, ftype, fid) = iprot.readFieldBegin()
      if ftype == TType.STOP:
        break
      if fid == 0:
        if ftype == TType.STRUCT:
          self.success = SharedStruct()
          self.success.read(iprot)
        else:
          iprot.skip(ftype)
      else:
        iprot.skip(ftype)
      iprot.readFieldEnd()
    iprot.readStructEnd()

  def write(self, oprot):
    ....
    oprot.writeStructBegin('getStruct_result')
    if self.success is not None:
      oprot.writeFieldBegin('success', TType.STRUCT, 0)
      self.success.write(oprot)
      oprot.writeFieldEnd()
    oprot.writeFieldStop()
    oprot.writeStructEnd()

server端使用getStruct_result的write來(lái)發(fā)送結(jié)果,client端會(huì)調(diào)用read來(lái)響應(yīng)的解析結(jié)果。從代碼中可以看出,結(jié)果success為T(mén)Type.STRUCT類(lèi)型,也就是SharedStruct結(jié)構(gòu)體,而對(duì)SharedStruct結(jié)構(gòu)體的解釋就位于SharedStruct類(lèi)中的thrift_spec字段。

  • Processor
    服務(wù)端接收請(qǐng)求 + 調(diào)用處理函數(shù) + 返回結(jié)果的方法類(lèi);
    在Processor中注冊(cè)有函數(shù)名所對(duì)應(yīng)的內(nèi)部處理方法,當(dāng)調(diào)用process時(shí),會(huì)執(zhí)行預(yù)先給定的handler并返回處理結(jié)果。
    對(duì)于執(zhí)行被調(diào)用函數(shù)的情況,會(huì)返回INTERNAL_ERROR的錯(cuò)誤。

    1. process function
      name, type, seqid = readMessageBegin
      校驗(yàn)name是否存在
      call process_name
    
    1. process_xxx
      調(diào)用Get_args來(lái)讀取輸入
      調(diào)用處理函數(shù)并獲取返回值
      作為T(mén)MessageType.REPLY寫(xiě)回
    
  • Get_args

    1. write
      writeStructBeigin('Get_args')
      writeField(參數(shù)名稱(chēng),TType.STRUCT, 1(索引))
      自定義類(lèi)型的write
    
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容