DUBBO線程模型和調(diào)度策略

DUBBO線程模型

線程模型

從官方描述來看dubbo線程模型支持業(yè)務(wù)線程和I/O線程分離,并且提供5種不同的調(diào)度策略。

調(diào)度策略

拿Netty組件為例(Netty4x),在NettyServer的構(gòu)造方法中通過ChannelHandlers#wrap方法設(shè)置MultiMessageHandler,HeartbeatHandler并通過SPI擴(kuò)展選擇調(diào)度策略。

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); // 線程派發(fā)
    }

ChannelHandlers#wrapInternal

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        // 選擇調(diào)度策略 默認(rèn)是all
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url))); //
    }

接下來看下NettyServer#doOpen方法 ,主要設(shè)置Netty的boss線程池?cái)?shù)量為1,worker線程池(也就是I/O線程)為cpu核心數(shù)+1和向Netty中注測(cè)Handler用于消息的編解碼和處理。

 protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();
        // 多線程模型
        // boss線程池,負(fù)責(zé)和消費(fèi)者建立新的連接
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        // worker線程池,負(fù)責(zé)連接的數(shù)據(jù)交換
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) // nagele 算法
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// TIME_WAIT
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //內(nèi)存池
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder()) //設(shè)置編解碼器
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind 端口
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

可以看出,如果我們?cè)谝粋€(gè)JVM進(jìn)程只暴露一個(gè)Dubbo服務(wù)端口,那么一個(gè)JVM進(jìn)程只會(huì)有一個(gè)NettyServer實(shí)例,也會(huì)只有一個(gè)NettyHandler實(shí)例 。從上面代碼也可以看出,Dubbo在Netty的Pipeline中只注冊(cè)了三個(gè)Handler,而Dubbo內(nèi)部也定義了一個(gè)ChannelHandler接口用來將和Channel相關(guān)的處理串起來,而第一個(gè)ChannelHandler就是由NettyHandler來調(diào)用的。有趣的是NettyServer本身也是一個(gè)ChannelHandler。當(dāng)Dubbo將Spring容器中的服務(wù)實(shí)例做了動(dòng)態(tài)代理的處理后,就會(huì)通過NettyServer#doOpen來暴露服務(wù)端口,再接著將服務(wù)注冊(cè)到注冊(cè)中心。這些步驟做完后,Dubbo的消費(fèi)者就可以來和提供者建立連接了,當(dāng)然是消費(fèi)者來主動(dòng)建立連接,而提供者在初始化連接后會(huì)調(diào)用NettyHandler#channelActive方法來創(chuàng)建一個(gè)NettyChannel:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.debug("channelActive <" + NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()) + ">" + " channle <" + ctx.channel());
        //獲取或者創(chuàng)建一個(gè)NettyChannel
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            if (channel != null) {
                // <ip:port, channel>
                channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
            }
            // 這里的 handler就是NettyServer
            handler.connected(channel);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

與Netty和Dubbo都有自己的ChannelHandler一樣,Netty和Dubbo也有著自己的Channel。該方法最后會(huì)調(diào)用NettyServer#connected方法來檢查新添加channel后是否會(huì)超出提供者配置的accepts配置,如果超出,則直接打印錯(cuò)誤日志并關(guān)閉該Channel,這樣的話消費(fèi)者端自然會(huì)收到連接中斷的異常信息,詳細(xì)可以見AbstractServer#connected方法。

public void connected(Channel ch) throws RemotingException {
        // If the server has entered the shutdown process, reject any new connection
        if (this.isClosing() || this.isClosed()) {
            logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
            ch.close();
            return;
        }

        Collection<Channel> channels = getChannels();
        //大于accepts的tcp連接直接關(guān)閉
        if (accepts > 0 && channels.size() > accepts) { 
            logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
            ch.close();
            return;
        }
        super.connected(ch);
    }

注意的是在dubbo中消費(fèi)者和提供者默認(rèn)只建立一個(gè)TCP長(zhǎng)連接,為了增加消費(fèi)者調(diào)用服務(wù)提供者的吞吐量, 可以在消費(fèi)方增加如下配置:

<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" connections="20"/>

而作為提供者可以使用accepts控制長(zhǎng)連接的數(shù)量防止連接數(shù)量過多,配置如下:

<dubbo:protocol name="dubbo" port="20880" accepts="10"/>

當(dāng)連接建立完成后,消費(fèi)者就可以請(qǐng)求提供者的服務(wù)了,當(dāng)請(qǐng)求到來,提供者這邊會(huì)依次經(jīng)過如下Handler的處理:

--->NettyServerHandler#channelRead接收請(qǐng)求消息

--- >AbstractPeer#received:如果服務(wù)已經(jīng)關(guān)閉,則返回,否則調(diào)用下一個(gè)Handler來處理。

--->MultiMessageHandler#received:如果是批量請(qǐng)求,則依次對(duì)請(qǐng)求調(diào)用下一個(gè)Handler來處理。

? --->HeartbeatHandler#received: 處理心跳消息。

? --->AllChannelHandler#received:該Dubbo的Handler非常重要,因?yàn)閺倪@里是IO線程池和業(yè)務(wù)線程池的隔離。

? --->DecodeHandler#received: 消息解碼

? --->HeaderExchangeHandler #received :消息處理

? --->DubboProtocol : 遠(yuǎn)程調(diào)用

看下AllChannelHandler#received:

public void received(Channel channel, Object message) throws RemotingException {
        // 獲取業(yè)務(wù)線程池
        ExecutorService cexecutor = getExecutorService();
        try {
            // 使用線程池處理消息
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
         throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

這里對(duì)execute進(jìn)行了異常捕獲,這是因?yàn)镮/O線程池是無界的,但業(yè)務(wù)線程池可能是有界的,所以進(jìn)行execute提交可能會(huì)遇到RejectedExecutionException異常 。

那么這里是如何獲取到業(yè)務(wù)線程池的那?實(shí)際上WrappedChannelHandler是xxxChannelHandlerd的裝飾類,根據(jù)dubbo SPI可以知道,獲取AllChannelHandler會(huì)首先實(shí)例化WrappedChannelHandler。

public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        // 獲取業(yè)務(wù)線程池
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

根據(jù)上面的代碼可以看出dubbo業(yè)務(wù)線程池是在WrappedChannelHandler實(shí)例化的時(shí)候獲取的。

接下來我們要看下dubbo的業(yè)務(wù)線程池模型,先上一個(gè)官方描述:

線程池?cái)U(kuò)展
  • FixedThreadPool:
public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        // 線程池名稱DubboServerHanler-server:port
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 缺省線程數(shù)量200
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        // 任務(wù)隊(duì)列類型
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

缺省情況下使用200個(gè)線程和SynchronousQueue這意味著如果如果線程池所有線程都在工作再有新任務(wù)會(huì)直接拒絕。

  • CachedThreadPool:

    public class CachedThreadPool implements ThreadPool {
    
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            // 核心線程數(shù)量 缺省為0
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            // 最大線程數(shù)量 缺省為Integer.MAX_VALUE
            int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
            // queue 缺省為0
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            // 空閑線程存活時(shí)間
            int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
            return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    }
    

    緩存線程池,可以看出如果提交任務(wù)的速度大于maxThreads將會(huì)不斷創(chuàng)建線程,極端條件下將會(huì)耗盡CPU和內(nèi)存資源。在突發(fā)大流量進(jìn)入時(shí)不適合使用。

  • LimitedThreadPool:

    public class  LimitedThreadPool implements ThreadPool {
    
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            // 缺省核心線程數(shù)量為0
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            // 缺省最大線程數(shù)量200
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            // 任務(wù)隊(duì)列缺省0
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    
    }
    

    不配置的話和FixedThreadPool沒有區(qū)別

  • EagerThreadPool :

    public class EagerThreadPool implements ThreadPool {
    
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            // 0
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            // Integer.MAX_VALUE
            int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
            // 0
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            // 60s
            int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
    
            // init queue and executor
            // 初始任務(wù)隊(duì)列為1
            TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
            EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                    threads,
                    alive,
                    TimeUnit.MILLISECONDS,
                    taskQueue,
                    new NamedInternalThreadFactory(name, true),
                    new AbortPolicyWithReport(name, url));
            taskQueue.setExecutor(executor);
            return executor;
        }
    }
    

    EagerThreadPoolExecutor:

    public void execute(Runnable command) {
            if (command == null) {
                throw new NullPointerException();
            }
            // do not increment in method beforeExecute!
            //已提交任務(wù)數(shù)量
            submittedTaskCount.incrementAndGet();
            try {
                super.execute(command);
            } catch (RejectedExecutionException rx) { //大于最大線程數(shù)被拒絕任務(wù) 重新添加到任務(wù)隊(duì)列
                // retry to offer the task into queue.
                final TaskQueue queue = (TaskQueue) super.getQueue();
                try {
                    if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                        submittedTaskCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.", rx);
                    }
                } catch (InterruptedException x) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } catch (Throwable t) {
                // decrease any way
                submittedTaskCount.decrementAndGet();
                throw t;
            }
        }
    

    TaskQueue:

    public boolean offer(Runnable runnable) {
            if (executor == null) {
                throw new RejectedExecutionException("The task queue does not have executor!");
            }
            // 獲取當(dāng)前線程池中的線程數(shù)量
            int currentPoolThreadSize = executor.getPoolSize();
            // have free worker. put task into queue to let the worker deal with task.
            // 如果已經(jīng)提交的任務(wù)數(shù)量小于當(dāng)前線程池中線程數(shù)量(不是很理解這里的操作)
            if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
                return super.offer(runnable);
            }
    
            // return false to let executor create new worker.
            //當(dāng)前線程數(shù)小于最大線程程數(shù)直接創(chuàng)建新worker
            if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
                return false;
            }
    
            // currentPoolThreadSize >= max
            return super.offer(runnable);
        }
    

    優(yōu)先創(chuàng)建Worker線程池。在任務(wù)數(shù)量大于corePoolSize但是小于maximumPoolSize時(shí),優(yōu)先創(chuàng)建Worker來處理任務(wù)。當(dāng)任務(wù)數(shù)量大于maximumPoolSize時(shí),將任務(wù)放入阻塞隊(duì)列中。阻塞隊(duì)列充滿時(shí)拋出RejectedExecutionException。(相比于cached:cached在任務(wù)數(shù)量超過maximumPoolSize時(shí)直接拋出異常而不是將任務(wù)放入阻塞隊(duì)列)

根據(jù)以上的代碼分析,如果消費(fèi)者的請(qǐng)求過快很有可能導(dǎo)致服務(wù)提供者業(yè)務(wù)線程池拋出RejectedExecutionException異常。這個(gè)異常是duboo的采用的線程拒絕策略AbortPolicyWithReport#rejectedExecution拋出的,并且會(huì)被反饋到消費(fèi)端,此時(shí)簡(jiǎn)單的解決辦法就是將提供者的服務(wù)調(diào)用線程池?cái)?shù)目調(diào)大點(diǎn),例如如下配置:

<dubbo:provider threads="500"/>
或
<dubbo:protocol name="dubbo" port="20882" accepts="10" threads="500"/>

我們?yōu)榱吮WC模塊內(nèi)的主要服務(wù)有線程可用(防止次要服務(wù)搶占過多服務(wù)調(diào)用線程),可以對(duì)次要服務(wù)進(jìn)行并發(fā)限制,例如:

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" executes="100"/>

回過頭來再看下dubbo得Dispatcher 策略默認(rèn)是all,實(shí)際上比較好的處理方式是I/O線程和業(yè)務(wù)線程分離,所以采取message是比較好得配置。并且如果采用all如果使用的dubo版本比較低很有可能會(huì)觸發(fā)dubbo的bug。

原因請(qǐng)參見:全部請(qǐng)求都使用業(yè)務(wù)線程池的問題

在dubbo重新維護(hù)之后這個(gè)bug已經(jīng)被修復(fù):

public void received(Channel channel, Object message) throws RemotingException {
        // 獲取業(yè)務(wù)線程池
        ExecutorService cexecutor = getExecutorService();
        try {
            // 使用線程池處理消息
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

如果是RejectedExecutionException異常直接返回給消費(fèi)者。

建議的配置是:

<dubbo:protocol name="dubbo" port="8888" threads="500" dispatcher="message" />
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容