Seata 高性能 RPC 通信的實現(xiàn)- 巧用 reactor 模式

一、Reactor 模式

reactor 模式是一種事件驅動的應用層 I/O 處理模式,基于分而治之和事件驅動的思想,致力于構建一個高性能的可伸縮的 I/O 處理模式。維基百科對 Reactor pattern 的解釋:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers

大致意思是說,reactor設計模式是一種事件處理模式,用于同時有一個或多個請求發(fā)送到事件處理器(service handler),這個事件處理器會采用多路分離(demultiplexes )的方式,同步的將這些請求分發(fā)到請求處理器(request handlers)。

不難看出,上邊介紹的 reactor 模式是一種抽象;從實現(xiàn)角度說,reactor 模式有許多變種,不同編程語言中的實現(xiàn)也有差異。就 java 而言,大師 Doug Lea 在其【Scalable IO in Java】中就講述了幾個reactor模式的演進,如單線程版本多線程版 ,閱讀此文后,筆者對大師所講reactor模式演進的理解與網(wǎng)絡中一些描述稍有差異。

reactor 單線程版中,只有一個reactor線程,線程中通過 select (I/O 多路復用接口) 監(jiān)聽所有 I/O 事件,收到 I/O 事件后通過 dispatch 進行分發(fā)給 Handlers 處理,此版本容易實現(xiàn),也容易理解,但性能不高。為了適配多處理器,充分利用多核并行處理的優(yōu)勢,實現(xiàn)高性能的網(wǎng)絡服務,可以采用分治策略,關鍵環(huán)節(jié)采用多線程模式,于是就出現(xiàn)了reactor多線程版本,而多線程的應用體現(xiàn)為worder線程和reactor線程,多線程應該被池化管理,這樣才容易被調整和控制。線程池中的線程數(shù)會比客戶端的數(shù)量少很多,實際數(shù)量可以根據(jù)程序本身是 CPU 密集型還是 I/O 密集型操作來進行合理的分配。

  • 多個 worder 線程(池化管理)

    • 屬于網(wǎng)絡 I/O 操作與業(yè)務處理的拆分,因為 reactors 監(jiān)聽到 I/O 事件后應該快速分發(fā)給 handlers 來處理程序;但如果 handler 中的非 I/O 操作慢了就會減慢 reactor 中的 I/O 事件響應速度,所以把非 I/O 操作從 reactors 的 I/O 線程轉移到其他線程中,即由worker線程來分擔非 I/O 邏輯的操作處理。
  • 多個 reactor 線程(池化管理)

    • 屬于網(wǎng)絡建連操作與網(wǎng)絡 I/O 讀寫操作的拆分,因為由一個reactor在一個線程中完成所有 I/O 操作也會遇到性能瓶頸,可采取拆分并增加reactor策略,將 I/O 負載分配給多個 reactor(每個reactor都有自己的線程、選擇器和調度循環(huán))以達到負載平衡。這看起來挺不錯,但誰來執(zhí)行分配以達到負載均衡呢?或許是因為這個問題,將reactor拆分為兩類角色,mainReactor負責接收連接,之后采用一定的負載均衡策略將新連接分配給其他subReactor來處理 I/O 讀寫,這樣的拆分自然流暢。

如此就演進出如上圖中的主從reactor多線程模型。請注意,結合【Scalable IO in Java】原文中的用詞和描述看,上圖中的mainReactorsubReactor可以有多個并做池化管理,所有也有一些文章中會看到如主ReactorGroupmainReactorGroup、從ReactorGroupsubReactorGroup等這類名詞用 Group 后綴來強調 Reactor 是池化管理。 或許是不好布局,也或許是為了凸顯主從reactor角色的協(xié)作關系,上圖中都只展示了一個,另外服務端應用通常只暴露一個服務端口時,只需用一個 mainReactor 來監(jiān)聽端口上的連接事件并處理。

二、Netty 主從 reactor 多線程模型

Nettyreactor所對應的實現(xiàn)類是NioEventLoop,其核心邏輯如下:

  • 不同類型的 channel 向 Selector 注冊所感興趣的事件
  • 掃描是否有感興趣的事件發(fā)生
  • 事件發(fā)生后做相應的處理

客戶端和服務端分別會有不同類型的channel,客戶端創(chuàng)建SocketChannel向服務端發(fā)起連接請求,服務端創(chuàng)建ServerSocketChannel監(jiān)聽客戶端連接,建連后創(chuàng)建SocketChannel與客戶端的SocketChannel互相收發(fā)數(shù)據(jù),這些channel分工不同,向 Selector 注冊所感興趣的事件情況也不同:

客戶端/服務端 channel OP_ACCEPT OP_CONNECT OP_WRITE OP_READ
客戶端 SocketChannel YES
服務端 ServerSocketChannel YES
服務端 SocketChannel YES YES

Netty中 Nio 方式實現(xiàn)幾種 reactor 模型如下:

mainReactor 對應 Netty 中配置的 bossGroup 線程組(下圖中的主ReactorGroup),主要負責接受客戶端連接的建立。每 bind 一個端口就用掉一個bossGroup中的線程。

subReactor 對應 Netty 中配置的 workerGroup 線程組(下圖中的 reactorGroup),bossGroup 線程組接受完客戶端的連接后,將 channel 轉交給 workerGroup 線程組,在 workerGroup 線程組內選擇一個線程,執(zhí)行 I/O 讀寫的處理,workerGroup 線程組默認是 2 * CPU 核數(shù)個線程。

主從 reactor 模式的核心流程:

  1. 如果只監(jiān)聽一個端口,那么只需一個主reactor干活兒,所以通??吹?code>boosGroup只配置一個線程。主reactor運行在獨立的線程中 ,該線程中只負責與客戶端的連接請求

  2. reactor在服務器端可以不止一個, 通常運行多個從 reactor , 每個從 reactor 也運行在一個獨立的線程中 ,負責與客戶端的讀寫操作

  3. reactor 檢測到客戶端的鏈接后,創(chuàng)建 NioSocketChannel,按照一定的算法循環(huán)選?。ㄘ撦d均衡)一個從reactor,并把剛創(chuàng)建的NioSocketChannel 注冊到這個從 reactor 中,這樣建連和讀寫事件互不影響。

  4. 一個 reactor 中可被注冊多個NioSocketChannel,這個 reactor 監(jiān)聽所有的被分配的 NioSocketChannel 的讀寫事件 , 如果監(jiān)聽到客戶端的數(shù)據(jù)發(fā)送事件 , 將對應的業(yè)務邏輯轉發(fā)給 NioSocketChannel 中的pipeline 里的 handler 鏈進行處理

  5. handler 最好只負責響應 I/O 事件,不處理具體的與客戶端交互的業(yè)務邏輯 , 這樣不會長時間阻塞 , 其 read 方法讀取客戶端數(shù)據(jù)后 , 將消息數(shù)據(jù)交給業(yè)務線程池去處理相關業(yè)務邏輯

  6. 業(yè)務線程池完成相關業(yè)務邏輯的處理后,將結果返回,通過NioSocketChannel的的pipeline 里的 handler 鏈將結果消息寫回給客戶端

  7. buffer不滿足將結果消息寫回給客戶端時的條件時,注冊寫事件,等待可寫時再寫

三、Seata Server 端 的 reactor 模式應用

Seata Server 采用了 主從 reactor 多線程模型,對應這個模型的話是有四個線程池,其中自定義業(yè)務線程池是兩個。

功能 線程池對象 備注
接收客戶端連接 NettyServerBootstrap#eventLoopGroupBoss
處理 IO 事件 NettyServerBootstrap#eventLoopGroupWorker 部分 RPC 消息在這里處理
處理客戶端的 request 消息 AbstractNettyRemoting#messageExecutor 客戶端主動發(fā)給的消息
處理客戶端的 response 消息 NettyRemotingServer#branchResultMessageExecutor 服務端主動發(fā)給客戶端消息,客戶端處理后給服務端響應

3.1、NettyServerBootstrap#eventLoopGroupBoss

筆者的環(huán)境未啟用 epoll,關鍵信息如下:

  • 線程數(shù):1,只監(jiān)聽一個端口
  • 線程名前綴:“NettyBoss”

this.eventLoopGroupBoss = new NioEventLoopGroup(
        //CONFIG.getInt("transport.threadFactory.bossThreadSize", 1);
        nettyServerConfig.getBossThreadSize(),
    new NamedThreadFactory(
            // CONFIG.getConfig("transport.threadFactory.bossThreadPrefix", "NettyBoss");
            nettyServerConfig.getBossThreadPrefix(),
            //CONFIG.getConfig("transport.threadFactory.bossThreadSize", 1);
            nettyServerConfig.getBossThreadSize())
);

3.2、NettyServerBootstrap#eventLoopGroupWorker

筆者的環(huán)境未啟用 epoll,關鍵信息如下:

  • 線程數(shù):默認值是 cpu 核數(shù) * 2
  • 線程名前綴:“NettyServerNIOWorker”
this.eventLoopGroupWorker = new NioEventLoopGroup(
        // System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));//默認值cpu核數(shù)*2
        nettyServerConfig.getServerWorkerThreads(),
    new NamedThreadFactory(
            // CONFIG.getConfig("transport.threadFactory.workerThreadPrefix",
            //            enableEpoll() ? EPOLL_WORKER_THREAD_PREFIX : DEFAULT_NIO_WORKER_THREAD_PREFIX);
            // 默認值 NettyServerNIOWorker ,沒有啟用 epoll
            nettyServerConfig.getWorkerThreadPrefix(),
            //System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));//默認值 cpu核數(shù)*2
            nettyServerConfig.getServerWorkerThreads())
);

3.3、AbstractNettyRemoting#messageExecutor

此線程池處理客戶端的 request 消息,關鍵參數(shù)信息如下:

  • 線程數(shù):50 ~ 500
  • keepAlive:500 秒
  • 線程名字前綴: "ServerHandlerThread"
  • 隊列長度: 500
  • 拒絕策略:CallerRunsPolicy(),飽和的情況下,調用者來執(zhí)行該任務,即 Netty 的 I/O 線程
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(
        //Integer.parseInt(System.getProperty("transport.minServerPoolSize", "50"));
        NettyServerConfig.getMinServerPoolSize(),
        //Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500"));
        NettyServerConfig.getMaxServerPoolSize(),
        //Integer.parseInt(System.getProperty("transport.keepAliveTime", "500"));
        NettyServerConfig.getKeepAliveTime(),
        TimeUnit.SECONDS,
        //Integer.parseInt(System.getProperty("transport.maxTaskQueueSize", "500"));
        new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
        new NamedThreadFactory(
                "ServerHandlerThread",
                //Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500"));
                NettyServerConfig.getMaxServerPoolSize()),
                //飽和的情況下,調用者來執(zhí)行該任務,即Netty的IO線程
                new ThreadPoolExecutor.CallerRunsPolicy()
);

3.4、NettyRemotingServer#branchResultMessageExecutor

此線程池處理客戶端的 response 消息,關鍵參數(shù)信息如下:

  • 線程數(shù):cpu 核數(shù)2 ~ cpu 核數(shù)2
  • keepAlive:500 秒
  • 線程名字前綴: "BranchResultHandlerThread"
  • 隊列長度: 20000
  • 拒絕策略:CallerRunsPolicy(),飽和的情況下,調用者來執(zhí)行該任務,即 Netty 的 IO 線程
private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(
        //System.getProperty("transport.minBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默認值 cpu核數(shù)*2
        NettyServerConfig.getMinBranchResultPoolSize(),
        //System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默認值 cpu核數(shù)*2
        NettyServerConfig.getMaxBranchResultPoolSize(),
        // System.getProperty("transport.keepAliveTime", "500"),默認值500
        NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(
                //System.getProperty("transport.maxTaskQueueSize", "20000"),默認值 20000
                NettyServerConfig.getMaxTaskQueueSize()),
        new NamedThreadFactory(
                // 分支響應消息的處理線程的名字前綴  BranchResultHandlerThread
                "BranchResultHandlerThread",
                // System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默認值 cpu核數(shù)*2
                NettyServerConfig.getMaxBranchResultPoolSize()
        ),
        //飽和的情況下,調用者來執(zhí)行該任務,即Netty的IO線程
        new ThreadPoolExecutor.CallerRunsPolicy()
);

3.5、業(yè)務線程池如何處理消息

3.5.1、登記消息處理器

Seata 消息處理的核心邏輯是:定義好什么類型的消息,使用哪個消息處理器,這個消息處理器的消息處理邏輯在哪個線程池中執(zhí)行。這個映射關系通過AbstractNettyRemoting#processorTable來存儲。

/**
 * 可以接收什么類型的消息,以及使用哪個消息處理器和線程池來處理消息
 * HashMap<消息類型, Pair<消息處理器, 線程池>>
 * processor type {@link MessageType}
 */
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);

各模塊 Netty 組件啟動前,通過AbstractNettyRemotingServer#registerProcessor方法登記到這個結構中。

public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
    Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
    this.processorTable.put(messageType, pair);
}

拿 Seata Server 來說,如在ServerBootStrap啟動前,通過NettyRemotingServer#registerProcessor注冊好消息處理器。不同消息對應的處理器的線程池也不同,也有一些消息沒有指定業(yè)務線程池(沒必要),情況如下:

private void registerProcessor() {
    // 1\. registry on request message processor
    ServerOnRequestProcessor onRequestProcessor =
        new ServerOnRequestProcessor(this, getHandler());
    ShutdownHook.getInstance().addDisposable(onRequestProcessor);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
    // 2\. registry on response message processor
    ServerOnResponseProcessor onResponseProcessor =
        new ServerOnResponseProcessor(getHandler(), getFutures());
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
    // 3\. registry rm message processor
    RegRmProcessor regRmProcessor = new RegRmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
    // 4\. registry tm message processor
    RegTmProcessor regTmProcessor = new RegTmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
    // 5\. registry heartbeat message processor
    ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

3.5.2、處理消息

當 Seata Server 收到客戶端發(fā)送的 RPC 消息后,會進入AbstractNettyRemotingServer.ServerHandler#channelRead中,在這里對消息類型簡單判斷后,委托給processMessage處理。

public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    if (!(msg instanceof RpcMessage)) {
        return;
    }
    // 收到消息后,委托給 processMessage 處理
    processMessage(ctx, (RpcMessage) msg);
}

processMessage中通過消息類型找到消息處理器進行業(yè)務層處理:

  1. 如果消息處理器有指定的業(yè)務線程池,在指定的業(yè)務線程池中處理消息
  2. 若消息處理器沒有指定的業(yè)務線程池,則在 I/O 線程中直接處理。
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    ...
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        // 通過消息類型找到消息處理器
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            // 如果消息處理器有指定的業(yè)務線程池
            if (pair.getSecond() != null) {
                try {
                    // 在指定的業(yè)務線程池中處理消息
                    pair.getSecond().execute(() -> {
                        ...
                        pair.getFirst().process(ctx, rpcMessage);
                        ...
                    });
                } catch (RejectedExecutionException e) {
                    ...
                }
            } else {
                try {
                    //若消息處理器沒有指定的業(yè)務線程池,則在I/O現(xiàn)成中直接處理。
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                   ...
                }
            }
        } else {
          ...
        }
    } else {
        ...
    }
}

四、Seata client 端的 reactor 模式應用

Seata client 端也采用了 reactor 多線程模型,在初始化的時候有RmNettyRemotingClientTmNettyRemotingClient兩個對象,分別會創(chuàng)建各自的 Bootstrap,RM 和 TM 各有自己的 I/O 線程池和業(yè)務線程池。

功能 線程池對象 備注
處理 IO 事件 NettyClientBootstrap#eventLoopGroupWorker
處理業(yè)務消息 AbstractNettyRemoting#messageExecutor

源碼里還有個NettyClientBootstrap#defaultEventExecutorGroup,沒看出來哪里有用。TmNettyRemotingClient#getInstance()中構建了 TM 的業(yè)務線程池,賦值給NettyClientBootstrap#messageExecutor,同樣RmNettyRemotingClient#getInstance()中構建了 RM 的業(yè)務線程池

4.1、NettyClientBootstrap#eventLoopGroupWorker

客戶端此線程池關鍵信息如下:

  • 線程數(shù):1
  • 線程名字前綴:
    • TM:"NettyClientSelector_TMROLE"
    • RM:"NettyClientSelector_RMROLE"
// 單I/O線程
this.eventLoopGroupWorker = new NioEventLoopGroup(
//CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)
selectorThreadSizeThreadSize,
new NamedThreadFactory(
    // CONFIG.getConfig("transport.threadFactory.clientSelectorThreadPrefix", "NettyClientSelector");
    // 再拼上角色后默認值為:"NettyClientSelector_TMROLE"
    getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
    //CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)
    selectorThreadSizeThreadSize)
);

4.2、AbstractNettyRemoting#messageExecutor

TmNettyRemotingClient#getInstance()RmNettyRemotingClient#getInstance()創(chuàng)建各自的線程池,配置并不相同。

1)TmNettyRemotingClient#getInstance()中所創(chuàng)建線程池的關鍵信息如下:

  • 線程數(shù):默認值是 cpu 核數(shù) _ 2 ~ cpu 核數(shù) _ 2
  • keepAlive:Integer.MAX_VALUE 秒
  • 線程名字前綴:rpcDispatch_TMROLE
  • 隊列長度: 2000
  • 拒絕策略:runsOldestTaskPolicy(),飽和的情況下,添加新任務并由投遞任務的線程運行最早的任務。
public static TmNettyRemotingClient getInstance() {
    if (instance == null) {
        synchronized (TmNettyRemotingClient.class) {
            if (instance == null) {
                NettyClientConfig nettyClientConfig = new NettyClientConfig();
                // 自定義TM業(yè)務線程池
                final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                        nettyClientConfig.getClientWorkerThreads(), // 默認是cpu核數(shù) * 2
                        nettyClientConfig.getClientWorkerThreads(), // 默認是cpu核數(shù) * 2
                        KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE;
                        new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//2000
                        new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),// TM的線程名是:rpcDispatch_TMROLE
                                nettyClientConfig.getClientWorkerThreads()),// 默認是cpu核數(shù) * 2
                        RejectedPolicies.runsOldestTaskPolicy());//添加新任務并由主線程運行最早的任務。
                instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
            }
        }
    }
    return instance;
}

2)RmNettyRemotingClient#getInstance() 中所創(chuàng)建線程池的關鍵信息如下:

  • 線程數(shù):默認是 cpu 核數(shù) _ 2 ~ cpu 核數(shù) _ 2
  • keepAlive:Integer.MAX_VALUE 秒
  • 線程名字前綴:rpcDispatch_RMROLE
  • 隊列長度: 20000
  • 拒絕策略:CallerRunsPolicy(),飽和的情況下,調用者來執(zhí)行該任務,即 Netty 的 IO 線程。
public static RmNettyRemotingClient getInstance() {
    if (instance == null) {
        synchronized (RmNettyRemotingClient.class) {
            if (instance == null) {
                NettyClientConfig nettyClientConfig = new NettyClientConfig();
                // 自定義RM業(yè)務線程池
                final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                        nettyClientConfig.getClientWorkerThreads(), // 默認是cpu核數(shù) * 2
                        nettyClientConfig.getClientWorkerThreads(), // 默認是cpu核數(shù) * 2
                        KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE;
                        new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//20000
                        new NamedThreadFactory(
                            nettyClientConfig.getRmDispatchThreadPrefix(),// RM的線程名是:rpcDispatch_RMROLE,
                            nettyClientConfig.getClientWorkerThreads()),// 默認是cpu核數(shù) * 2
                    new ThreadPoolExecutor.CallerRunsPolicy());////飽和的情況下,調用者來執(zhí)行該任務,即Netty的IO線程
                instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
            }
        }
    }
    return instance;
}

4.3、消息處理

TmNettyRemotingClientRmNettyRemotingClientinit()方法中會調用registerProcessor()方法注冊各自的 RPC 消息處理器。收到 RPC 消息后就由這些處理器+對應的線程池做后續(xù)處理,消息的相關業(yè)務屬性在后續(xù)的事務流程中介紹。

五、支撐特殊能力的業(yè)務線程池

1)AbstractNettyRemotingClient#mergeSendExecutorService

用于批量發(fā)送請求,多個消息合并,減少通信次數(shù)。實現(xiàn)邏輯比較清晰,當允許發(fā)送批量消息時,消息首先分桶保存到 basketMap,在一個周期性的無線循環(huán)中,把 basketMap 中的消息隊列取出來,把每個隊列的消息都放到 mergeMessage 中,最后把 mergeMessage 發(fā)送出去。

  • 線程數(shù):1
  • 線程名前綴:”rpcMergeMessageSend“
  • AbstractNettyRemotingClient中功能相關的屬性介紹:
    • Object mergeLock:發(fā)送請求的鎖對象。
    • Map<Integer, MergeMessage> mergeMsgMap:當發(fā)送消息的類型是 MergeMessage,那么就將消息保存到 mergeMsgMap。
    • ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap:當允許發(fā)送批量消息時,消息首先分桶保存到 basketMap,然后通過定時任務將保存 basketMap 的消息發(fā)送出去。basketMap 的是服務器的地址,value 是保存的發(fā)送個服務器的消息。按照地址分桶是將要發(fā)給同一個服務器的多個消息合并到一個MergedWarpMessage后發(fā)送。
  • 有配置開關,默認值如下:
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false

對應的關鍵代碼邏輯如下:

  1. AbstractNettyRemotingClient#sendSyncRequest中,同步發(fā)送時將消息緩存起來,默認配置看只有 RM 開啟了消息合并發(fā)送,另外同步發(fā)送超時設定,默認 TM 30 秒,RM 15 秒。按照 IP 地址分桶,同一個目標實例的消息才可以合并發(fā)送
public Object sendSyncRequest(Object msg) throws TimeoutException {
    String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
    // 同步發(fā)送超時設定,默認 TM 30秒,RM 15秒
    long timeoutMillis = this.getRpcRequestTimeout();
    RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

    // send batch message
    // put message into basketMap, @see MergedSendRunnable
    // 默認只有RM開啟了消息合并發(fā)送,TM 并未開啟批發(fā)送
    if (this.isEnableClientBatchSendRequest()) {

        // send batch message is sync request, needs to create messageFuture and put it in futures.
        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeoutMillis);
        futures.put(rpcMessage.getId(), messageFuture);

        // put message into basketMap
        // 按照目標地址分桶,同一個TC實例的消息才可以合并發(fā)送
        BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
            key -> new LinkedBlockingQueue<>());
        if (!basket.offer(rpcMessage)) {
            LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                    serverAddress, rpcMessage);
            return null;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("offer message: {}", rpcMessage.getBody());
        }
        // 通知合并發(fā)送線程 有消息要發(fā)送,醒來干活兒
        if (!isSending) {
            synchronized (mergeLock) {
                mergeLock.notifyAll();
            }
        }

        try {
            // 阻塞等待消息的響應。
            return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
        } catch (Exception exx) {
            LOGGER.error("wait response error:{},ip:{},request:{}",
                exx.getMessage(), serverAddress, rpcMessage.getBody());
            if (exx instanceof TimeoutException) {
                throw (TimeoutException) exx;
            } else {
                throw new RuntimeException(exx);
            }
        }

    } else {
        // 不合并發(fā)送的話,就獲取指定IP的channel,并立即發(fā)送。
        Channel channel = clientChannelManager.acquireChannel(serverAddress);
        return super.sendSync(channel, rpcMessage, timeoutMillis);
    }

}
  1. AbstractNettyRemotingClient#init中構建線程池mergeSendExecutorService,在這個線程池中執(zhí)行消息的批處理(消息合并、消息發(fā)送)。
public void init() {
    ...
    // 通過線程池有1個線程,執(zhí)行消息合并發(fā)送
    if (this.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(
            MAX_MERGE_SEND_THREAD,//1
            MAX_MERGE_SEND_THREAD,//1
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory(
                    //TM : rpcMergeMessageSend_TMROLE
                    //RM : rpcMergeMessageSend_RMROLE
                    //SERVER : rpcMergeMessageSend_SERVERROLE
                    getThreadPrefix(),
                    MAX_MERGE_SEND_THREAD)//1
        );
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    clientBootstrap.start();
}
  1. 批處理任務MergedSendRunnable中,實現(xiàn)了消息合并和消息發(fā)送
private class MergedSendRunnable implements Runnable {

    @Override
    public void run() {
        while (true) {
            //mergeLock 用于生產(chǎn)-消費的協(xié)作
            synchronized (mergeLock) {
                try {
                    // MAX_MERGE_SEND_MILLS = 1,還有線程休眠的效果
                    mergeLock.wait(MAX_MERGE_SEND_MILLS);
                } catch (InterruptedException e) {
                }
            }
            isSending = true;
            // 發(fā)送消息,消息是按照IP地址分組
            basketMap.forEach((address, basket) -> {
                if (basket.isEmpty()) {
                    return;
                }

                MergedWarpMessage mergeMessage = new MergedWarpMessage();
                //如果basket隊列不為空,將其中的消息全取出來,添加到mergeMessage中
                while (!basket.isEmpty()) {
                    RpcMessage msg = basket.poll();
                    mergeMessage.msgs.add((AbstractMessage) msg.getBody());
                    mergeMessage.msgIds.add(msg.getId());
                }
                // debug 打印本次發(fā)送的消息個數(shù)和每個消息的Id,以及此時在futures中做超時管控的所有消息的Id,
                // 兩個消息Id比對,可知道消息積壓情況9666
                if (mergeMessage.msgIds.size() > 1) {
                    printMergeMessageLog(mergeMessage);
                }
                Channel sendChannel = null;
                try {
                    // 獲取指定地址的channel對象,異步發(fā)送消息
                    // 發(fā)送批量消息是同步的請求,但是這里不需要得到返回的值,在消息保存到basketMap之前,已經(jīng)創(chuàng)建了messageFuture了,
                    // 返回值將會從ClientOnResponseProcessor中得到
                    sendChannel = clientChannelManager.acquireChannel(address);
                    // 因為原始消息的發(fā)送已經(jīng)加入過超時管控,所以批量發(fā)送環(huán)節(jié)不再需要加入額外的超時控制
                    AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
                } catch (FrameworkException e) {
                    if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                        destroyChannel(address, sendChannel);
                    }
                    // fast fail
                    // 發(fā)生異常,快速將保存在mergeMessage的消息清理掉
                    for (Integer msgId : mergeMessage.msgIds) {
                        MessageFuture messageFuture = futures.remove(msgId);
                        if (messageFuture != null) {
                            messageFuture.setResultMessage(
                                new RuntimeException(String.format("%s is unreachable", address), e));
                        }
                    }
                    LOGGER.error("client merge call failed: {}", e.getMessage(), e);
                }
            });
            isSending = false;
        }
    }

2)AbstractNettyRemoting#timerExecutor

Netty 的 I/O 操作異步的,RPC 消息的發(fā)送操作會對應一個 Future 對象,在 Seata 中這個 Futrue 對象被封裝為 MessageFuture,需同步發(fā)送的消息,其對應的 MessageFuture 被放入 map 緩存起來,當收到消息的 response 后,將消息從 map 中移除。AbstractNettyRemoting#timerExecutor里的這個線程定時巡檢 map 中的消息,若超時未收到 response 則認定為發(fā)送超時。

  • 線程數(shù):1
  • 線程名前綴:”timeoutChecker“
  • scheduleAtFixedRate :延遲 3 秒,頻率 3 秒
  • AbstractNettyRemoting中的功能相關的屬性介紹:
    • ScheduledExecutorService timerExecutor:執(zhí)行定時任務,消息發(fā)送以后,到了過期時間還沒有返回,則會對消息進行清理。
    • ConcurrentHashMap<Integer, MessageFuture> futures:保存著不同消息,timerExecutor 會清理 futures 中過期的消息。

對應的關鍵代碼邏輯如下:

  1. 構建定時任務的線程池AbstractNettyRemoting#timerExecutor,只用 1 個線程
/**
 * 定時器,用于巡檢消息的發(fā)送是否超時
 */
protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1,
    new NamedThreadFactory("timeoutChecker", 1, true));
復制代碼
  1. 通過AbstractNettyRemoting#sendSync同步發(fā)送消息,構建MessageFuture并放入futures這個 map 中,發(fā)送過程配置監(jiān)聽器 用于處理 channel 異常,指定失敗原因并從futures中移除,還要銷毀 channel。
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
    ...
    // 構建 MessageFuture
    MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeoutMillis);
    // 放入 futures 這個map中
    futures.put(rpcMessage.getId(), messageFuture);
    //檢查通道是否可以寫
    channelWritableCheck(channel, rpcMessage.getBody());
    String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
    //在請求發(fā)送之前執(zhí)行鉤子
    doBeforeRpcHooks(remoteAddr, rpcMessage);
    // 發(fā)送請求,并配置監(jiān)聽器 用于處理channel異常
    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
        // 這里響應不成功,基本是channel不正常了
        if (!future.isSuccess()) {
            //移除消息
            MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
            if (messageFuture1 != null) {
                messageFuture1.setResultMessage(future.cause());
            }
            //響應不成功,則銷毀channel
            destroyChannel(future.channel());
        }
    });
    ...
    //獲取響應結果
    Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
    //響應之后執(zhí)行鉤子
    doAfterRpcHooks(remoteAddr, rpcMessage, result);
    ...
}
  1. 正常收到 response 后,給MessageFuture對象賦值,從futures中移除,如ClientOnResponseProcessor#process中的實現(xiàn)
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  ...
  // 從futures中移除
  MessageFuture messageFuture = futures.remove(rpcMessage.getId());
  if (messageFuture != null) {
      // 賦值結果
      messageFuture.setResultMessage(rpcMessage.getBody());
  }
}
  1. AbstractNettyRemoting#init中開啟定時任務,巡檢出futures 這個 map 中的超時對象后從 futures 中移除,不再檢查,并指定結果為 TimeoutException
public void init() {
    // 檢測消息同步發(fā)送(sendSync(xxx))是否超時,
    // 定時任務默認是延遲3秒,間隔3秒
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
                MessageFuture future = entry.getValue();
                if (future.isTimeout()) {
                    // 如果過期了則將發(fā)送結果設置為TimeoutException
                    // 從futures中移除,不再檢查
                    futures.remove(entry.getKey());
                    RpcMessage rpcMessage = future.getRequestMessage();
                    future.setResultMessage(new TimeoutException(String
                        .format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                    }
                }
            }

            nowMills = System.currentTimeMillis();
        }
    }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}

還有線程池跟服務注冊發(fā)現(xiàn)和建連相關,會后邊篇章再介紹。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容