grpc python 源碼分為三部分:python——cython——c++ , 本系列文章分析的是 python 部分代碼,其它部分不涉及(其實(shí)是我看不懂??)
版本:1.24.3
- helloworld
先來(lái)看官方的一個(gè)例子
from concurrent import futures
import logging
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # 1?? 創(chuàng)建 server
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) # 2?? 注冊(cè)接口方法
server.add_insecure_port('[::]:50051') # 3?? 綁定監(jiān)聽(tīng)端口
server.start() # 4?? 啟動(dòng) server
server.wait_for_termination() # 5?? 接受終止信號(hào)
if __name__ == '__main__':
logging.basicConfig()
serve()
1?? 創(chuàng)建 server
這里我們傳了一個(gè)線(xiàn)程池給 grpc 的 server ,這個(gè)線(xiàn)程池用來(lái)處理請(qǐng)求。
經(jīng)過(guò)重重調(diào)用,最后我們得到的 server 是 _Server 的實(shí)例
class _Server(grpc.Server):
# pylint: disable=too-many-arguments
def __init__(self, thread_pool, generic_handlers, interceptors, options,
maximum_concurrent_rpcs, compression):
completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server(_augment_options(options, compression))
server.register_completion_queue(completion_queue)
self._state = _ServerState(completion_queue, server, generic_handlers,
_interceptor.service_pipeline(interceptors),
thread_pool, maximum_concurrent_rpcs)
cygrpc.CompletionQueue 和 cygrpc.Server 都是調(diào)用底層的 c++ core ,我們不去管它。
再來(lái)看看這個(gè) _ServerState 的代碼
class _ServerState(object):
# pylint: disable=too-many-arguments
def __init__(self, completion_queue, server, generic_handlers,
interceptor_pipeline, thread_pool, maximum_concurrent_rpcs):
self.lock = threading.RLock()
self.completion_queue = completion_queue
self.server = server
self.generic_handlers = list(generic_handlers)
self.interceptor_pipeline = interceptor_pipeline
self.thread_pool = thread_pool
self.stage = _ServerStage.STOPPED
self.termination_event = threading.Event()
self.shutdown_events = [self.termination_event]
self.maximum_concurrent_rpcs = maximum_concurrent_rpcs
self.active_rpc_count = 0
# TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields.
self.rpc_states = set()
self.due = set()
# A "volatile" flag to interrupt the daemon serving thread
self.server_deallocated = False
從這里我們可以看到,python 的 server 只是對(duì)底層的簡(jiǎn)單封裝,關(guān)于網(wǎng)絡(luò)IO的處理完全是底層的 c++ core 負(fù)責(zé),python 主要負(fù)責(zé)調(diào)用開(kāi)發(fā)者的接口處理請(qǐng)求。
2?? 注冊(cè)接口方法
這步負(fù)責(zé)將我們開(kāi)發(fā)好的接口注冊(cè)到服務(wù)器上,調(diào)用的是編譯 proto 文件生成的 _pb2_grpc 后綴文件的函數(shù)。
def add_GreeterServicer_to_server(servicer, server):
rpc_method_handlers = {
'SayHello': grpc.unary_unary_rpc_method_handler(
servicer.SayHello, # 接口方法
request_deserializer=helloworld__pb2.HelloRequest.FromString, # 反序列化方法
response_serializer=helloworld__pb2.HelloReply.SerializeToString, # 序列化方法
),
}
generic_handler = grpc.method_handlers_generic_handler(
'helloworld.Greeter', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
請(qǐng)求的路由分發(fā)使用的是字典,key 是我們定義的接口名,value 則是一個(gè)命名元組,里面保存的我們的接口方法、序列化方法和反序列化。
3?? 綁定監(jiān)聽(tīng)端口
這個(gè)最后是調(diào)用 c++ core 的代碼,直接忽略
4?? 服務(wù)啟動(dòng)
server 的 start 方法只是調(diào)用 _start 函數(shù)
class _Server(grpc.Server):
def start(self):
_start(self._state)
def _start(state):
with state.lock:
if state.stage is not _ServerStage.STOPPED:
raise ValueError('Cannot start already-started server!')
state.server.start() # 調(diào)用的 c++
state.stage = _ServerStage.STARTED
_request_call(state) # 調(diào)用的 c++
thread = threading.Thread(target=_serve, args=(state,))
thread.daemon = True
thread.start()
這里拉起了一個(gè)線(xiàn)程調(diào)用 _serve 函數(shù),下面則是 _server 的代碼
def _serve(state):
while True:
timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S
event = state.completion_queue.poll(timeout)
if state.server_deallocated:
_begin_shutdown_once(state)
if event.completion_type != cygrpc.CompletionType.queue_timeout:
if not _process_event_and_continue(state, event):
return
# We want to force the deletion of the previous event
# ~before~ we poll again; if the event has a reference
# to a shutdown Call object, this can induce spinlock.
event = None
服務(wù)器便是在這里接受請(qǐng)求,并調(diào)用接口方法處理請(qǐng)求的。
5?? 接受終止信號(hào)
這里是為了防止主線(xiàn)程掛掉,以前的寫(xiě)法是這樣的
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
self.grpc_server.stop(0)
當(dāng)時(shí)還奇怪為什么不封裝成一個(gè)方法,這次最新版的則是調(diào)用一個(gè)方法。
不過(guò)這個(gè)方法跟之前的邏輯不一樣了,具體看代碼
class _Server(grpc.Server):
def wait_for_termination(self, timeout=None):
# NOTE(https://bugs.python.org/issue35935)
# Remove this workaround once threading.Event.wait() is working with
# CTRL+C across platforms.
return _common.wait(
self._state.termination_event.wait,
self._state.termination_event.is_set,
timeout=timeout)
順著鏈接看了一下,說(shuō)是在一些版本的 python 調(diào)用 Event.wait 在 win10 無(wú)法用 Ctrl+C 中斷。
我用 python2.7.17 和 python3.7.4 試了一下,是可以用 Ctrl+C 中斷的,看來(lái)問(wèn)題在新版解決了(o_ _)?
python2 的 Event.wait 還有另外一個(gè)問(wèn)題,如果 Event.wait 沒(méi)有傳一個(gè)時(shí)間,那么信號(hào)處理函數(shù)無(wú)法被觸發(fā)。如下所示
import threading
import signal
def main():
event = threading.Event()
def handler(sig, stack):
print(sig, stack)
event.set()
signal.signal(signal.SIGINT, handler)
event.wait()
if __name__ == '__main__':
main()
關(guān)于這點(diǎn),在 grpc 的 _common.wait 代碼注釋里也有說(shuō)到
def wait(wait_fn, wait_complete_fn, timeout=None, spin_cb=None):
"""Blocks waiting for an event without blocking the thread indefinitely.
See https://github.com/grpc/grpc/issues/19464 for full context. CPython's
`threading.Event.wait` and `threading.Condition.wait` methods, if invoked
without a timeout kwarg, may block the calling thread indefinitely. If the
call is made from the main thread, this means that signal handlers may not
run for an arbitrarily long period of time.
......
"""
令我感到奇怪的是,為什么要另起一個(gè)線(xiàn)程負(fù)責(zé)接收請(qǐng)求,而不是在主線(xiàn)程進(jìn)行。
為此,我準(zhǔn)備改寫(xiě)一下代碼,將接收請(qǐng)求的邏輯寫(xiě)在主線(xiàn)程中,看看會(huì)有什么問(wèn)題;同時(shí)還要記得將 _start 方法中起線(xiàn)程的代碼注釋掉(推薦在 virtualenv 的環(huán)境下嘗試,免得搞錯(cuò))
from concurrent import futures
import logging
import grpc
import grpc._server
import helloworld_pb2
import helloworld_pb2_grpc
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:50051')
server.start()
grpc._server._serve(server._state)
# server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig()
serve()
如果像上面這樣運(yùn)行的話(huà),使用 CTRL+C 線(xiàn)程并不會(huì)馬上停止,而是要好幾次 CTRL+C ,
或者對(duì)這個(gè)服務(wù)發(fā)起調(diào)用。
這樣看來(lái),這個(gè)主線(xiàn)程主要使用來(lái)接收控制信號(hào)的。
感覺(jué)跟 _serve 函數(shù)里的 state.completion_queue.poll 有關(guān),前面我們分析過(guò)這個(gè) completion_queue 是屬于 c++ 部分的
def _serve(state):
while True:
timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S
event = state.completion_queue.poll(timeout)
if state.server_deallocated:
_begin_shutdown_once(state)
if event.completion_type != cygrpc.CompletionType.queue_timeout:
if not _process_event_and_continue(state, event):
return
# We want to force the deletion of the previous event
# ~before~ we poll again; if the event has a reference
# to a shutdown Call object, this can induce spinlock.
event = None
-
總結(jié)
temp.png
