問題描述
在 Python 項(xiàng)目中使用 gRPC 進(jìn)行通信,跨進(jìn)程使用時(shí),會(huì)出現(xiàn)阻塞或報(bào)錯(cuò)的情況(根據(jù) gRPC.io 的版本不同,現(xiàn)象不同)。下面代碼展示了一個(gè)跨進(jìn)程使用的 DEMO,主進(jìn)程向 30001 端口上的 gRPC 服務(wù)器發(fā)送請(qǐng)求,子進(jìn)程也向相同的服務(wù)器發(fā)送請(qǐng)求。
def send():
channel = grpc.insecure_channel('localhost:30001')
stub = message_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(message_pb2.HelloRequest(name='you'))
print(f"Greeter client received 1: " + response.message)
def main():
channel = grpc.insecure_channel('localhost:30001')
stub = message_pb2_grpc.GreeterStub(channel)
response = stub.SayHello2(message_pb2.HelloRequest(name='you'))
print("Greeter client received 2: " + response.message)
p = multiprocessing.Process(target=send)
p.start()
p.join()
if __name__ == '__main__':
main()
使用 gRPC.io 1.28.1 的情況下,會(huì)發(fā)生報(bào)錯(cuò),主進(jìn)程可以正常收到服務(wù)器的返回,但是子進(jìn)程報(bào) Socket operation on non-socket。
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "Socket operation on non-socket"
debug_error_string = "{"created":"@1587481625.192071231","description":"Error received from peer ipv6:[::1]:50051","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Socket operation on non-socket","grpc_status":14}"
>
排查過程
根據(jù)代碼,主進(jìn)程和子進(jìn)程分別創(chuàng)建了自己的 Channel,看上去邏輯沒什么問題,沒有什么思路,所以多嘗試幾種情況先測(cè)試一下吧。首先嘗試了一下主進(jìn)程和子進(jìn)程請(qǐng)求不同的server,在 30001 和 30002 端口分別啟動(dòng)兩個(gè) gRPC Server,然后將客戶端代碼改為主進(jìn)程請(qǐng)求 30001 端口,子進(jìn)程請(qǐng)求 30002 端口,代碼可以正常運(yùn)行。測(cè)試到這里就更摸不著頭腦了,代碼明明寫的是主進(jìn)程子進(jìn)程分別創(chuàng)建 Channel,現(xiàn)在的現(xiàn)象看上去像是在請(qǐng)求相同服務(wù)器的情況下,子進(jìn)程復(fù)用了主進(jìn)程的socket連接。gRPC 底層使用的是 HTTP2,而 HTTP2 使用了長(zhǎng)連接,會(huì)不會(huì)是這個(gè)原因?
有了新的分幀機(jī)制后,HTTP/2 不再依賴多個(gè) TCP 連接去并行復(fù)用數(shù)據(jù)流;每個(gè)數(shù)據(jù)流都拆分成很多幀,而這些幀可以交錯(cuò),還可以分別設(shè)定優(yōu)先級(jí)。 因此,所有 HTTP/2 連接都是永久的,而且僅需要每個(gè)來源一個(gè)連接,隨之帶來諸多性能優(yōu)勢(shì)。 —— HTTP/2 簡(jiǎn)介
從 HTTP2 原理上來看還是說的過去的,恰好 gRPC 項(xiàng)目中有 Issue 提到了跨進(jìn)程使用的問題,參見 Failed to run grpc python on multiprocessing #18321,開發(fā)者在其中說明了像 Demo 那樣使用報(bào)錯(cuò)的原因。
gRPC Core's API for fork support
A process may fork after invoking grpc_init() and use gRPC in the child if and only if the child process first destroys all gRPC resources inherited from the parent process and invokes grpc_shutdown().
Subsequent to this, the child will be able to re-initialize and use gRPC. After fork, the parent process will be able to continue to use existing gRPC resources such as channels and calls without interference
from the child process.
gRPC Python behavior at fork()
To facilitate gRPC Python applications meeting the above constraints, gRPC Python will automatically destroy and shutdown all gRPC Core resources in the child's post-fork handler, including cancelling in-flight calls. From the client's perspective, the child process is now free to create new channels and use gRPC.
簡(jiǎn)化的說,在 gRPC Core API 的層面,子進(jìn)程使用 gRPC 需要先銷毀掉從父進(jìn)程 fork 過來的 gRPC 資源,重新創(chuàng)建連接才可以正常使用,否則可能陷入死鎖。
同時(shí),gRPC 對(duì)于 fork 行為的支持也有一個(gè)專門的文檔。https://github.com/grpc/grpc/blob/master/doc/fork_support.md
The background Python thread was removed entirely. This allows forking after creating a channel. However, the channel must not have issued any RPCs prior to the fork. Attempting to fork with an active channel that has been used can result in deadlocks/corrupted wire data.
從文檔和 Issue 的描述看,當(dāng)主進(jìn)程有活動(dòng)狀態(tài)的 gRPC 連接時(shí),是不可以 fork 的,會(huì)引發(fā)死鎖或者報(bào)錯(cuò)(可能和 HTTP2 的長(zhǎng)連接機(jī)制有關(guān)系),如果要 fork,需要先關(guān)閉掉活動(dòng)的連接,在 fork 出的子進(jìn)程中重新建立 gRPC 連接(也就是主子進(jìn)程各自持有各自的 HTTP2 連接)。
實(shí)踐方案
綜合文檔和開發(fā)者在 Issue 中提到的方法,要想讓 Demo 可以運(yùn)行有如下三種方法。
在環(huán)境變量中設(shè)置
GRPC_ENABLE_FORK_SUPPORT=1(參見https://github.com/grpc/grpc/blob/master/doc/fork_support.md#111)在 fork 子進(jìn)程前使用
channel.close()關(guān)閉活動(dòng)的 gRPC 連接(參見https://grpc.github.io/grpc/python/grpc.html#grpc.Channel.close)
def main():
channel = grpc.insecure_channel('localhost:30001')
stub = message_pb2_grpc.GreeterStub(channel)
response = stub.SayHello2(message_pb2.HelloRequest(name='you'))
print("Greeter client received 2: " + response.message)
channel.close() # 關(guān)閉 channel,再 fork
p = multiprocessing.Process(target=send)
p.start()
p.join()
- 使用
with語句,語句結(jié)束后會(huì)自動(dòng)關(guān)閉活動(dòng)的 gRPC 連接(參見https://github.com/grpc/grpc/blob/master/examples/python/helloworld/greeter_client.py#L29)
def main():
# 使用 with 語句
with grpc.insecure_channel('localhost:30001') as channel:
stub = message_pb2_grpc.GreeterStub(channel)
response = stub.SayHello2(message_pb2.HelloRequest(name='you'))
print("Greeter client received 2: " + response.message)
p = multiprocessing.Process(target=send)
p.start()
p.join()
參考資料
https://grpc.github.io/grpc/python/grpc.html#channel-object
https://developers.google.com/web/fundamentals/performance/http2?hl=zh-cn
https://github.com/grpc/grpc/issues/18321
https://github.com/grpc/grpc/pull/16264
https://github.com/grpc/grpc/blob/master/doc/fork_support.md#111
https://grpc.github.io/grpc/python/grpc.html#grpc.Channel.close