六、RocketMQ-Producer-Send深入一丟丟

一、概述

本篇主要跟蹤下producer的發(fā)送流程,以SYNC同步模式為例,假定producer已經(jīng)start了

二、主線流程圖

主線流程圖

三、流程深入一丟丟

再次說明下,這里是以SYNC為例子
只對邏輯相對較多的幾個方法做講解

方法2:

增加了一個timeout,發(fā)送超時時間,默認時間 3 秒

SendResult send(Message msg,long timeout)

方法3:sendDefaultImpl()

方法流程圖

方法內(nèi),會根據(jù)策略獲取待發(fā)送的隊列,然后調(diào)用sendKernelImpl發(fā)送消息,如果發(fā)送失敗,會嘗試 1 + 重試次數(shù)(默認為2) = 3次

方法4 sendKernelImpl()

sendKernelImpl
  • 首先為消息添加主鍵,格式如下:
    UNIQ_KEY : 0BCDF1716BEC18B4AAC27F26B89A0000
  • 壓縮消息
  • 執(zhí)行hookbefore方法(如果有的話)
  • 組織requestHeader作為下個方法的參數(shù)

方法6 invokeSync

這個方法在調(diào)用 invokeSyncImpl 的前后,分別調(diào)用了doBeforeRpcHooksdoAfterRpcHooks的hooks方法,切入RPC調(diào)用

方法7 invokeSyncImpl

這個是最終和broker通訊的代碼,通過netty的channel.writeAndFlush(request)方法將消息發(fā)送給broker,并通過ChannelFutureListener回調(diào)函數(shù)獲取broker的反饋
通過下面的代碼讓阻塞線程,其實內(nèi)部就是一個length=1的CountDownLatch

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

然后在ChannelFutureListener回調(diào)函數(shù)的putResponse方法中釋放,latch - 1,保證獲取到回饋再返回
具體的源代碼如下:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });
            // 在這里阻塞 等待響應
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容