【Azure 事件中心】Event Hub 無(wú)法連接,出現(xiàn) Did not observe any item or terminal signal within 60000ms in 'flat...

問(wèn)題描述

使用Java SDK連接Azure Event Hub,一直出現(xiàn) java.util.concurrent.TimeoutException 異常, 消息為:java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in 'flatMapMany' (and no fallback has been configured)。

且消息體中并沒(méi)有更多有效消息。

ERROR .e.r.OpportunityResourceEventhubReceiver []: com.cbs.message.facade.eventhub.receive.OpportunityResourceEventhubReceiver.onError.partition:NONE. Exception:{}
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in 'flatMapMany' (and no fallback has been configured)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:294)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:279)
    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:418)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

問(wèn)題解答

如果使用Azure Event Hub官方的SDK, 可以通過(guò)設(shè)置日志輸出級(jí)別為Info來(lái)查看更多詳細(xì)的日志,這樣就可以查看更詳細(xì)的日志輸出。比如:

2022-11-03 10:57:49.410  INFO  ---  [           main] c.a.m.eventhubs.EventHubClientBuilder    []: {"az.sdk.message":"Emitting a single connection.","connectionId":"MF_22f21s_6940767444216"}
2022-11-03 10:57:49.602  INFO  ---  [           main] c.a.m.e.i.EventHubConnectionProcessor    []: {"az.sdk.message":"Setting next AMQP channel.","entityPath":"eh01"}
2022-11-03 10:57:49.603  INFO  ---  [           main] c.a.m.e.i.EventHubConnectionProcessor    []: {"az.sdk.message":"Next AMQP channel received, updating 0 current subscribers","entityPath":"eh01"}
2022-11-03 10:57:49.612  INFO  ---  [           main] c.a.m.eventhubs.EventProcessorClient     []: {"az.sdk.message":"Starting a new event processor instance.","eventProcessorId":"02690c22-21be-4b39-b976-efcf3ce3819a"}
2022-11-03 10:57:49.629  INFO  ---  [           main] c.a.m.eventhubs.EventHubClientBuilder    []: {"az.sdk.message":"Emitting a single connection.","connectionId":"MF_93d32o_12dcdedeadcfe33"}
2022-11-03 10:57:49.630  INFO  ---  [           main] c.a.m.e.i.EventHubConnectionProcessor    []: {"az.sdk.message":"Setting next AMQP channel.","entityPath":"eh02"}
2022-11-03 10:57:49.698  INFO  ---  [           main] c.a.m.e.i.EventHubConnectionProcessor    []: {"az.sdk.message":"Next AMQP channel received, updating 0 current subscribers","entityPath":"eh02"}
2022-11-03 10:57:49.699  INFO  ---  [           main] c.a.m.eventhubs.EventProcessorClient     []: {"az.sdk.message":"Starting a new event processor instance.","eventProcessorId":"c8d655c2-d12d-4d14-a85e-e333273293d9"}
2022-11-03 10:57:49.712  INFO  ---  [           main] c.a.m.eventhubs.EventHubClientBuilder    []: {"az.sdk.message":"Emitting a single connection.","connectionId":"MF_0346b3_1667444269712"}
2022-11-03 10:57:49.713  INFO  ---  [           main] c.a.m.e.i.EventHubConnectionProcessor    []: {"az.sdk.message":"Setting next AMQP channel.","entityPath":"eh02"}
2022-11-03 10:57:49.713  INFO  ---  [           main] c.a.m.e.i.EventHubConnectionProcessor    []: {"az.sdk.message":"Next AMQP channel received, updating 0 current subscribers","entityPath":"eh02"}
2022-11-03 10:57:49.714  INFO  ---  [           main] c.a.m.eventhubs.EventProcessorClient     []: {"az.sdk.message":"Starting a new event processor instance.","eventProcessorId":"2fd3a905-c39c-47ff-bc8d-e4b21301eeb3"}
2022-11-03 10:57:50.127  INFO  ---  [pool-6-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Starting load balancer.","ownerId":"02690c22-21be-4b39-b976-efcf3ce3819a"}
2022-11-03 10:57:50.136  INFO  ---  [pool-6-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Getting partitions from Event Hubs service.","entityPath":"eh01"}
2022-11-03 10:57:51.016  INFO  ---  [pool-6-thread-1] c.a.c.a.i.ReactorConnection              []: {"az.sdk.message":"Creating and starting connection.","connectionId":"MF_22f21s_6940767444216","hostName":"test-eventhub.servicebus.chinacloudapi.cn","port":5671}
2022-11-03 10:57:51.052  INFO  ---  [pool-6-thread-1] c.a.c.a.implementation.ReactorExecutor   []: {"az.sdk.message":"Starting reactor.","connectionId":"MF_22f21s_6940767444216"}
2022-11-03 10:57:51.060  INFO  ---  [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionInit","connectionId":"MF_22f21s_6940767444216","hostName":"test-eventhub.servicebus.chinacloudapi.cn","namespace":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:57:51.061  INFO  ---  [ctor-executor-1] c.a.c.a.i.handler.ReactorHandler         []: {"az.sdk.message":"reactor.onReactorInit","connectionId":"MF_22f21s_6940767444216"}
2022-11-03 10:57:51.061  INFO  ---  [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionLocalOpen","connectionId":"MF_22f21s_6940767444216","errorCondition":null,"errorDescription":null,"hostName":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:57:51.220  INFO  ---  [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionBound","connectionId":"MF_22f21s_6940767444216","hostName":"test-eventhub.servicebus.chinacloudapi.cn","peerDetails":"test-eventhub.servicebus.chinacloudapi.cn:5671"}
2022-11-03 10:57:51.257  INFO  ---  [ctor-executor-1] c.a.c.a.i.handler.StrictTlsContextSpi    []: SSLv2Hello was an enabled protocol. Filtering out.
2022-11-03 10:57:51.360  INFO  ---  [pool-8-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Starting load balancer.","ownerId":"2fd3a905-c39c-47ff-bc8d-e4b21301eeb3"}
2022-11-03 10:57:51.360  INFO  ---  [pool-8-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Getting partitions from Event Hubs service.","entityPath":"eh02"}
2022-11-03 10:57:51.361  INFO  ---  [pool-8-thread-1] c.a.c.a.i.ReactorConnection              []: {"az.sdk.message":"Creating and starting connection.","connectionId":"MF_0346b3_1667444269712","hostName":"test-eventhub.servicebus.chinacloudapi.cn","port":5671}
2022-11-03 10:57:51.363  INFO  ---  [pool-8-thread-1] c.a.c.a.implementation.ReactorExecutor   []: {"az.sdk.message":"Starting reactor.","connectionId":"MF_0346b3_1667444269712"}
2022-11-03 10:57:51.364  INFO  ---  [ctor-executor-2] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionInit","connectionId":"MF_0346b3_1667444269712","hostName":"test-eventhub.servicebus.chinacloudapi.cn","namespace":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:57:51.366  INFO  ---  [ctor-executor-2] c.a.c.a.i.handler.ReactorHandler         []: {"az.sdk.message":"reactor.onReactorInit","connectionId":"MF_0346b3_1667444269712"}
2022-11-03 10:57:51.366  INFO  ---  [ctor-executor-2] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionLocalOpen","connectionId":"MF_0346b3_1667444269712","errorCondition":null,"errorDescription":null,"hostName":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:57:51.368  INFO  ---  [ctor-executor-2] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionBound","connectionId":"MF_0346b3_1667444269712","hostName":"test-eventhub.servicebus.chinacloudapi.cn","peerDetails":"test-eventhub.servicebus.chinacloudapi.cn:5671"}
2022-11-03 10:57:51.369  INFO  ---  [ctor-executor-2] c.a.c.a.i.handler.StrictTlsContextSpi    []: SSLv2Hello was an enabled protocol. Filtering out.
2022-11-03 10:57:51.470  INFO  ---  [pool-7-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Starting load balancer.","ownerId":"c8d655c2-d12d-4d14-a85e-e333273293d9"}
2022-11-03 10:57:51.470  INFO  ---  [pool-7-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Getting partitions from Event Hubs service.","entityPath":"eh02"}
2022-11-03 10:57:51.471  INFO  ---  [pool-7-thread-1] c.a.c.a.i.ReactorConnection              []: {"az.sdk.message":"Creating and starting connection.","connectionId":"MF_93d32o_12dcdedeadcfe33","hostName":"test-eventhub.servicebus.chinacloudapi.cn","port":5671}
2022-11-03 10:57:51.472  INFO  ---  [pool-7-thread-1] c.a.c.a.implementation.ReactorExecutor   []: {"az.sdk.message":"Starting reactor.","connectionId":"MF_93d32o_12dcdedeadcfe33"}
2022-11-03 10:57:51.473  INFO  ---  [ctor-executor-3] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionInit","connectionId":"MF_93d32o_12dcdedeadcfe33","hostName":"test-eventhub.servicebus.chinacloudapi.cn","namespace":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:57:51.473  INFO  ---  [ctor-executor-3] c.a.c.a.i.handler.ReactorHandler         []: {"az.sdk.message":"reactor.onReactorInit","connectionId":"MF_93d32o_12dcdedeadcfe33"}
2022-11-03 10:57:51.473  INFO  ---  [ctor-executor-3] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionLocalOpen","connectionId":"MF_93d32o_12dcdedeadcfe33","errorCondition":null,"errorDescription":null,"hostName":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:57:51.475  INFO  ---  [ctor-executor-3] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionBound","connectionId":"MF_93d32o_12dcdedeadcfe33","hostName":"test-eventhub.servicebus.chinacloudapi.cn","peerDetails":"test-eventhub.servicebus.chinacloudapi.cn:5671"}
2022-11-03 10:57:51.475  INFO  ---  [ctor-executor-3] c.a.c.a.i.handler.StrictTlsContextSpi    []: SSLv2Hello was an enabled protocol. Filtering out.
2022-11-03 10:58:21.099  INFO  ---  [pool-6-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: Load balancer already running
2022-11-03 10:58:21.367  INFO  ---  [pool-8-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: Load balancer already running
2022-11-03 10:58:21.474  INFO  ---  [pool-7-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: Load balancer already running
2022-11-03 10:58:51.014  WARN  ---  [     parallel-2] c.a.m.e.PartitionBasedLoadBalancer       []: Load balancing for event processor failed. Did not observe any item or terminal signal within 60000ms in 'flatMapMany' (and no fallback has been configured)
2022-11-03 10:58:51.017 ERROR  ---  [     parallel-2] c.c.m.f.e.r.OpportunityEventhubReceiver  []: com.cbs.message.facade.eventhub.receive.OpportunityEventhubReceiver.onError.partition:NONE. Exception:{}
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in 'flatMapMany' (and no fallback has been configured)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:294)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:279)
    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:418)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2022-11-03 10:58:51.058  WARN  ---  [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onTransportError","connectionId":"MF_93d32o_12dcdedeadcfe33","errorCondition":"amqp:connection:framing-error","errorDescription":"org.apache.qpid.proton.engine.TransportException: connection aborted","hostName":"test-eventhub.servicebus.chinacloudapi.cn"}
2022-11-03 10:58:51.062  INFO  ---  [ctor-executor-1] c.a.c.a.i.ReactorConnection              []: {"az.sdk.message":"Disposing of ReactorConnection.","connectionId":"MF_93d32o_12dcdedeadcfe33","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"org.apache.qpid.proton.engine.TransportException: connection aborted, errorContext[NAMESPACE: test-eventhub.servicebus.chinacloudapi.cn. ERROR CONTEXT: N/A]"}
2022-11-03 10:58:51.075  INFO  ---  [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      []: {"az.sdk.message":"onConnectionUnbound","connectionId":"MF_93d32o_12dcdedeadcfe33","hostName":"test-eventhub.servicebus.chinacloudapi.cn","state":"CLOSED","remoteState":"UNINITIALIZED"}
2022-11-03 10:58:51.077  INFO  ---  [ctor-executor-1] c.a.c.a.i.ReactorConnection              []: {"az.sdk.message":"Closing executor.","connectionId":"MF_93d32o_12dcdedeadcfe33"}
2022-11-03 10:58:51.099  INFO  ---  [pool-6-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Starting load balancer.","ownerId":"02690c22-21be-4b39-b976-efcf3ce3819a"}
2022-11-03 10:58:51.100  INFO  ---  [pool-6-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: {"az.sdk.message":"Getting partitions from Event Hubs service.","entityPath":"eh02"}
2022-11-03 10:58:51.101 ERROR  ---  [pool-6-thread-1] c.a.m.e.i.EventHubReactorAmqpConnection  []: {"az.sdk.message":"connectionId[MF_93d32o_12dcdedeadcfe33]: Connection is disposed. Cannot get management instance","exception":"connectionId[MF_22f21s_6940767444216]: Connection is disposed. Cannot get management instance","connectionId":"MF_22f21s_6940767444216"}
2022-11-03 10:58:51.104  WARN  ---  [pool-6-thread-1] c.a.m.e.PartitionBasedLoadBalancer       []: Load balancing for event processor failed.connectionId[MF_93d32o_12dcdedeadcfe33]: Connection is disposed. Cannot get management instance
2022-11-03 10:58:51.105 ERROR  ---  [pool-6-thread-1] c.c.m.f.e.r.OpportunityEventhubReceiver  []: com.cbs.message.facade.eventhub.receive.OpportunityEventhubReceiver.onError.partition:NONE. Exception:{}
java.lang.IllegalStateException: connectionId[MF_22f21s_6940767444216]: Connection is disposed. Cannot get management instance
    at com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection.getManagementNode(EventHubReactorAmqpConnection.java:90)
    at com.azure.messaging.eventhubs.EventHubAsyncClient.lambda$getProperties$0(EventHubAsyncClient.java:73)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
    at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:267)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
    at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
    at reactor.core.publisher.MonoIgnorePublisher.subscribe(MonoIgnorePublisher.java:56)
    at reactor.core.publisher.FluxRepeatPredicate$RepeatPredicateSubscriber.resubscribe(FluxRepeatPredicate.java:119)
    at reactor.core.publisher.MonoRepeatPredicate.subscribeOrReturn(MonoRepeatPredicate.java:47)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8221)
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:8408)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8205)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8129)
    at com.azure.messaging.eventhubs.PartitionBasedLoadBalancer.loadBalance(PartitionBasedLoadBalancer.java:154)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

根據(jù)以上的日志,可以看出:

1)在發(fā)生異常之前,有WARN日志輸出,顯示 Load balancing for event processor failed.

2)在連接的信息中,查看到應(yīng)用連接的端口為 5671

由于一直連接不上,就應(yīng)該參考官網(wǎng)的客戶端連接問(wèn)題排查訪問(wèn)進(jìn)行排查,第一步就是檢查端口 5671 是否能從執(zhí)行 Java代碼的主機(jī)上Ping 通。參考:https://docs.azure.cn/zh-cn/event-hubs/troubleshooting-guide#run-the-command-to-check-dropped-packets

請(qǐng)運(yùn)行以下命令,檢查是否存在任何丟棄的數(shù)據(jù)包或者無(wú)法ping通的問(wèn)題:

.\psping.exe -n 25 -i 1 -q <yournamespacename>.servicebus.chinacloudapi.cn:5671 -nobanner    

在實(shí)際驗(yàn)證中,以上錯(cuò)誤就是發(fā)現(xiàn) 客戶端環(huán)境無(wú)法ping通 5671,5672端口。當(dāng)在防火墻中放開這兩個(gè)端口后。 連接成功!

參考資料

排查連接問(wèn)題 - Azure 事件中心: https://docs.azure.cn/zh-cn/event-hubs/troubleshooting-guide#run-the-command-to-check-dropped-packets

Load balancing for event processor failed ****#12525 : https://github.com/Azure/azure-sdk-for-java/issues/12525

image.png

當(dāng)在復(fù)雜的環(huán)境中面臨問(wèn)題,格物之道需:濁而靜之徐清,安以動(dòng)之徐生。 云中,恰是如此!

分類: 【Azure 事件中心】

標(biāo)簽: Did not observe any item or terminal signal, within 60000ms in 'flatMapMany', java.util.concurrent.TimeoutException

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容