grpc python 源碼分析(1):server 的創(chuàng)建和啟動(dòng)

grpc python 源碼分為三部分:python——cython——c++ , 本系列文章分析的是 python 部分代碼,其它部分不涉及(其實(shí)是我看不懂??)

版本:1.24.3

  • helloworld
    先來(lái)看官方的一個(gè)例子
from concurrent import futures
import logging
import grpc
import helloworld_pb2
import helloworld_pb2_grpc

class Greeter(helloworld_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # 1?? 創(chuàng)建 server
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)  # 2?? 注冊(cè)接口方法
    server.add_insecure_port('[::]:50051')  # 3?? 綁定監(jiān)聽(tīng)端口
    server.start()  # 4?? 啟動(dòng) server
    server.wait_for_termination()  # 5?? 接受終止信號(hào)

if __name__ == '__main__':
    logging.basicConfig()
    serve()

1?? 創(chuàng)建 server
這里我們傳了一個(gè)線(xiàn)程池給 grpc 的 server ,這個(gè)線(xiàn)程池用來(lái)處理請(qǐng)求。
經(jīng)過(guò)重重調(diào)用,最后我們得到的 server 是 _Server 的實(shí)例

class _Server(grpc.Server):

    # pylint: disable=too-many-arguments
    def __init__(self, thread_pool, generic_handlers, interceptors, options,
                 maximum_concurrent_rpcs, compression):
        completion_queue = cygrpc.CompletionQueue()
        server = cygrpc.Server(_augment_options(options, compression))
        server.register_completion_queue(completion_queue)
        self._state = _ServerState(completion_queue, server, generic_handlers,
                                   _interceptor.service_pipeline(interceptors),
                                   thread_pool, maximum_concurrent_rpcs)

cygrpc.CompletionQueuecygrpc.Server 都是調(diào)用底層的 c++ core ,我們不去管它。
再來(lái)看看這個(gè) _ServerState 的代碼

class _ServerState(object):

    # pylint: disable=too-many-arguments
    def __init__(self, completion_queue, server, generic_handlers,
                 interceptor_pipeline, thread_pool, maximum_concurrent_rpcs):
        self.lock = threading.RLock()
        self.completion_queue = completion_queue
        self.server = server
        self.generic_handlers = list(generic_handlers)
        self.interceptor_pipeline = interceptor_pipeline
        self.thread_pool = thread_pool
        self.stage = _ServerStage.STOPPED
        self.termination_event = threading.Event()
        self.shutdown_events = [self.termination_event]
        self.maximum_concurrent_rpcs = maximum_concurrent_rpcs
        self.active_rpc_count = 0

        # TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields.
        self.rpc_states = set()
        self.due = set()

        # A "volatile" flag to interrupt the daemon serving thread
        self.server_deallocated = False

從這里我們可以看到,python 的 server 只是對(duì)底層的簡(jiǎn)單封裝,關(guān)于網(wǎng)絡(luò)IO的處理完全是底層的 c++ core 負(fù)責(zé),python 主要負(fù)責(zé)調(diào)用開(kāi)發(fā)者的接口處理請(qǐng)求。

2?? 注冊(cè)接口方法
這步負(fù)責(zé)將我們開(kāi)發(fā)好的接口注冊(cè)到服務(wù)器上,調(diào)用的是編譯 proto 文件生成的 _pb2_grpc 后綴文件的函數(shù)。

def add_GreeterServicer_to_server(servicer, server):
  rpc_method_handlers = {
      'SayHello': grpc.unary_unary_rpc_method_handler(
          servicer.SayHello,  # 接口方法
          request_deserializer=helloworld__pb2.HelloRequest.FromString,  # 反序列化方法
          response_serializer=helloworld__pb2.HelloReply.SerializeToString,  # 序列化方法
      ),
  }
  generic_handler = grpc.method_handlers_generic_handler(
      'helloworld.Greeter', rpc_method_handlers)
  server.add_generic_rpc_handlers((generic_handler,))

請(qǐng)求的路由分發(fā)使用的是字典,key 是我們定義的接口名,value 則是一個(gè)命名元組,里面保存的我們的接口方法、序列化方法和反序列化。

3?? 綁定監(jiān)聽(tīng)端口
這個(gè)最后是調(diào)用 c++ core 的代碼,直接忽略

4?? 服務(wù)啟動(dòng)
serverstart 方法只是調(diào)用 _start 函數(shù)

class _Server(grpc.Server):
    def start(self):
        _start(self._state)

def _start(state):
    with state.lock:
        if state.stage is not _ServerStage.STOPPED:
            raise ValueError('Cannot start already-started server!')
        state.server.start()  # 調(diào)用的 c++ 
        state.stage = _ServerStage.STARTED
        _request_call(state)  # 調(diào)用的 c++

        thread = threading.Thread(target=_serve, args=(state,))
        thread.daemon = True
        thread.start()

這里拉起了一個(gè)線(xiàn)程調(diào)用 _serve 函數(shù),下面則是 _server 的代碼

def _serve(state):
    while True:
        timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S
        event = state.completion_queue.poll(timeout)
        if state.server_deallocated:
            _begin_shutdown_once(state)
        if event.completion_type != cygrpc.CompletionType.queue_timeout:
            if not _process_event_and_continue(state, event):
                return
        # We want to force the deletion of the previous event
        # ~before~ we poll again; if the event has a reference
        # to a shutdown Call object, this can induce spinlock.
        event = None

服務(wù)器便是在這里接受請(qǐng)求,并調(diào)用接口方法處理請(qǐng)求的。

5?? 接受終止信號(hào)
這里是為了防止主線(xiàn)程掛掉,以前的寫(xiě)法是這樣的

        try:
          while True:
            time.sleep(86400)
        except KeyboardInterrupt:
            self.grpc_server.stop(0)

當(dāng)時(shí)還奇怪為什么不封裝成一個(gè)方法,這次最新版的則是調(diào)用一個(gè)方法。
不過(guò)這個(gè)方法跟之前的邏輯不一樣了,具體看代碼

class _Server(grpc.Server):

    def wait_for_termination(self, timeout=None):
        # NOTE(https://bugs.python.org/issue35935)
        # Remove this workaround once threading.Event.wait() is working with
        # CTRL+C across platforms.
        return _common.wait(
            self._state.termination_event.wait,
            self._state.termination_event.is_set,
            timeout=timeout)

順著鏈接看了一下,說(shuō)是在一些版本的 python 調(diào)用 Event.waitwin10 無(wú)法用 Ctrl+C 中斷。
我用 python2.7.17python3.7.4 試了一下,是可以用 Ctrl+C 中斷的,看來(lái)問(wèn)題在新版解決了(o_ _)?
python2Event.wait 還有另外一個(gè)問(wèn)題,如果 Event.wait 沒(méi)有傳一個(gè)時(shí)間,那么信號(hào)處理函數(shù)無(wú)法被觸發(fā)。如下所示

import threading
import signal


def main():
    event = threading.Event()

    def handler(sig, stack):
        print(sig, stack)
        event.set()

    signal.signal(signal.SIGINT, handler)
    event.wait()


if __name__ == '__main__':
    main()

關(guān)于這點(diǎn),在 grpc_common.wait 代碼注釋里也有說(shuō)到

def wait(wait_fn, wait_complete_fn, timeout=None, spin_cb=None):
    """Blocks waiting for an event without blocking the thread indefinitely.

    See https://github.com/grpc/grpc/issues/19464 for full context. CPython's
    `threading.Event.wait` and `threading.Condition.wait` methods, if invoked
    without a timeout kwarg, may block the calling thread indefinitely. If the
    call is made from the main thread, this means that signal handlers may not
    run for an arbitrarily long period of time.

    ......
    """

令我感到奇怪的是,為什么要另起一個(gè)線(xiàn)程負(fù)責(zé)接收請(qǐng)求,而不是在主線(xiàn)程進(jìn)行。
為此,我準(zhǔn)備改寫(xiě)一下代碼,將接收請(qǐng)求的邏輯寫(xiě)在主線(xiàn)程中,看看會(huì)有什么問(wèn)題;同時(shí)還要記得將 _start 方法中起線(xiàn)程的代碼注釋掉(推薦在 virtualenv 的環(huán)境下嘗試,免得搞錯(cuò))

from concurrent import futures
import logging

import grpc
import grpc._server

import helloworld_pb2
import helloworld_pb2_grpc

class Greeter(helloworld_pb2_grpc.GreeterServicer):

    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    grpc._server._serve(server._state)
    # server.wait_for_termination()

if __name__ == '__main__':
    logging.basicConfig()
    serve()

如果像上面這樣運(yùn)行的話(huà),使用 CTRL+C 線(xiàn)程并不會(huì)馬上停止,而是要好幾次 CTRL+C ,
或者對(duì)這個(gè)服務(wù)發(fā)起調(diào)用。
這樣看來(lái),這個(gè)主線(xiàn)程主要使用來(lái)接收控制信號(hào)的。
感覺(jué)跟 _serve 函數(shù)里的 state.completion_queue.poll 有關(guān),前面我們分析過(guò)這個(gè) completion_queue 是屬于 c++ 部分的

def _serve(state):
    while True:
        timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S
        event = state.completion_queue.poll(timeout)
        if state.server_deallocated:
            _begin_shutdown_once(state)
        if event.completion_type != cygrpc.CompletionType.queue_timeout:
            if not _process_event_and_continue(state, event):
                return
        # We want to force the deletion of the previous event
        # ~before~ we poll again; if the event has a reference
        # to a shutdown Call object, this can induce spinlock.
        event = None
  • 總結(jié)


    temp.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容