Spring Cloud源碼分析——Ribbon客戶端負(fù)載均衡

IZONE小櫻花(●’?’●)?

年前聊了Eureka和Zookeeper的區(qū)別,然后微服務(wù)架構(gòu)系列就鴿了三個(gè)多月,一直沉迷逛B站,無法自拔。最近公司復(fù)工,工作狀態(tài)慢慢恢復(fù)(又是元?dú)鉂M滿地劃水)。本文從以下3個(gè)方面進(jìn)行分析(參考了翟永超[程序猿DD])的《Spring Cloud微服務(wù)實(shí)戰(zhàn)》

  1. LoadBalancerInterceptor攔截器對(duì)RestTemplate的請(qǐng)求攔截;
  2. RibbonLoadBalancerClient實(shí)際接口實(shí)現(xiàn);
  3. 負(fù)載均衡策略

1、LoadBalancerInterceptor源碼

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;

    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
            LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null,
                "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName,
                this.requestFactory.createRequest(request, body, execution));
    }

}

可以看出,該攔截器注入了LoadBalancerClient實(shí)例,當(dāng)一個(gè)被@LoadBalanced修飾的RestTemplate對(duì)象發(fā)起Http請(qǐng)求,會(huì)被LoadBalancerInterceptor中的intercept函數(shù)攔截。該函數(shù)會(huì)通過getHost()獲取Http請(qǐng)求的服務(wù)名,恰巧我們使用的RestTemplate對(duì)象采用服務(wù)名作為Host,接著loadBalancer查找到對(duì)應(yīng)服務(wù)名的服務(wù),調(diào)用execute函數(shù)對(duì)該服務(wù)發(fā)起請(qǐng)求。

2、RibbonLoadBalancerClient實(shí)現(xiàn)

/**
     * New: Execute a request by selecting server using a 'key'. The hint will have to be
     * the last parameter to not mess with the `execute(serviceId, ServiceInstance,
     * request)` method. This somewhat breaks the fluent coding style when using a lambda
     * to define the LoadBalancerRequest.
     * @param <T> returned request execution result type
     * @param serviceId id of the service to execute the request to
     * @param request to be executed
     * @param hint used to choose appropriate {@link Server} instance
     * @return request execution result
     * @throws IOException executing the request may result in an {@link IOException}
     */
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
            throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer, hint);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));

        return execute(serviceId, ribbonServer, request);
    }

經(jīng)過LoadBalancerInterceptor攔截器后,調(diào)用LoadBalancerClient的execute函數(shù)去發(fā)起對(duì)應(yīng)服務(wù)的請(qǐng)求。(LoadBalancerClient只是個(gè)抽象的負(fù)載均衡接口,RibbonLoadBalancerClient則是該接口的具體實(shí)現(xiàn))
execute函數(shù)的作用,如官方所說:通過‘key’找到對(duì)應(yīng)的服務(wù)并執(zhí)行請(qǐng)求。
從源碼中可以看出,execute函數(shù)具體實(shí)現(xiàn)首先是定義一個(gè)傳入serviceId的loadBalancer對(duì)象,再getServer獲取對(duì)應(yīng)的具體服務(wù),最后通過ribbonServer整合一系列服務(wù)信息發(fā)起請(qǐng)求。
其中g(shù)etServer()是關(guān)鍵操作,來看看對(duì)應(yīng)的源碼:

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
        if (loadBalancer == null) {
            return null;
        }
        // Use 'default' on a null hint, or just pass it on?
        return loadBalancer.chooseServer(hint != null ? hint : "default");
    }

顯然,需要再深入看下loadBalancer 。

public interface ILoadBalancer {

    /**
     * Initial list of servers.
     * This API also serves to add additional ones at a later time
     * The same logical server (host:port) could essentially be added multiple times
     * (helpful in cases where you want to give more "weightage" perhaps ..)
     * 
     * @param newServers new servers to add
     */
    public void addServers(List<Server> newServers);
    
    /**
     * Choose a server from load balancer.
     * 
     * @param key An object that the load balancer may use to determine which server to return. null if 
     *         the load balancer does not use this parameter.
     * @return server chosen
     */
    public Server chooseServer(Object key);
    
    /**
     * To be called by the clients of the load balancer to notify that a Server is down
     * else, the LB will think its still Alive until the next Ping cycle - potentially
     * (assuming that the LB Impl does a ping)
     * 
     * @param server Server to mark as down
     */
    public void markServerDown(Server server);
    
    /**
     * @deprecated 2016-01-20 This method is deprecated in favor of the
     * cleaner {@link #getReachableServers} (equivalent to availableOnly=true)
     * and {@link #getAllServers} API (equivalent to availableOnly=false).
     *
     * Get the current list of servers.
     *
     * @param availableOnly if true, only live and available servers should be returned
     */
    @Deprecated
    public List<Server> getServerList(boolean availableOnly);

    /**
     * @return Only the servers that are up and reachable.
     */
    public List<Server> getReachableServers();

    /**
     * @return All known servers, both reachable and unreachable.
     */
    public List<Server> getAllServers();
}

ILoadBalancer定義了客戶端負(fù)載均衡器的一系列抽象操作接口,從官方說明看出:

  • addServers:向負(fù)載均衡器的實(shí)例列表中添加新的服務(wù)實(shí)例
  • chooseServer:通過某種策略,挑選出一個(gè)具體的服務(wù)實(shí)例
  • markServerDown:通知并標(biāo)識(shí)負(fù)載均衡器中某個(gè)具體服務(wù)實(shí)例已停止服務(wù),不然的話,負(fù)載均衡器在下一次獲取具體服務(wù)實(shí)例的時(shí)候,還會(huì)以為該服務(wù)正常
  • getReachableServers:獲取可正常使用的服務(wù)實(shí)例列表
  • getAllServers:獲取所有服務(wù)實(shí)例列表,包括正常和停止的

來看看具體實(shí)現(xiàn)BaseLoadBalancer,

public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
            IPing ping, IPingStrategy pingStrategy) {
    
        logger.debug("LoadBalancer [{}]:  initialized", name);
        
        this.name = name;
        this.ping = ping;
        this.pingStrategy = pingStrategy;
        setRule(rule);
        setupPingTask();
        lbStats = stats;
        init();
    }

默認(rèn)構(gòu)造函數(shù)ping設(shè)為null,rule策略默認(rèn)設(shè)為輪詢(RoundRobin)。該構(gòu)造函數(shù)除了基本的賦值之外,主要是setRule(設(shè)置負(fù)載均衡策略)和setupPingTask(啟動(dòng)ping心跳任務(wù))。

void setupPingTask() {
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                true);
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
        forceQuickPing();
    }

setupPingTask邏輯主要是定義ShutdownEnabledTimer實(shí)例來執(zhí)行一個(gè)10秒間隔的schedule。timer定時(shí)器還定義了個(gè)PingTask任務(wù)

class PingTask extends TimerTask {
        public void run() {
            try {
                new Pinger(pingStrategy).runPinger();
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Error pinging", name, e);
            }
        }
    }

官方注釋中,TimerTask會(huì)在自定義的時(shí)間間隔內(nèi)檢查服務(wù)實(shí)例列表中每個(gè)服務(wù)實(shí)例的運(yùn)行狀態(tài)。
再看看PingTask 任務(wù)里runPinger方法的關(guān)鍵邏輯:

                results = pingerStrategy.pingServers(ping, allServers);

                final List<Server> newUpList = new ArrayList<Server>();
                final List<Server> changedServers = new ArrayList<Server>();

                for (int i = 0; i < numCandidates; i++) {
                    boolean isAlive = results[i];
                    Server svr = allServers[i];
                    boolean oldIsAlive = svr.isAlive();

                    svr.setAlive(isAlive);

                    if (oldIsAlive != isAlive) {
                        changedServers.add(svr);
                        logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                            name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                    }

                    if (isAlive) {
                        newUpList.add(svr);
                    }
                }
                upLock = upServerLock.writeLock();
                upLock.lock();
                upServerList = newUpList;
                upLock.unlock();

                notifyServerStatusChangeListener(changedServers);

從源碼可以看出,PingTask運(yùn)行runPinger方法,根據(jù)pingerStrategy.pingServers(ping, allServers)來獲取服務(wù)的可用性,然后對(duì)比前后服務(wù)的狀態(tài),如果狀態(tài)一致,則不去EurekaClient(一般用Eureka作為注冊(cè)中心,可換成其他注冊(cè)中心)獲取注冊(cè)列表;否則,則調(diào)用notifyServerStatusChangeListener通知EurekaClient更新或重新拉取。

簡(jiǎn)單總結(jié)下完整的過程:
RibbonLoadBalancerClient(負(fù)載均衡客戶端)初始化(調(diào)用execute),通過ILoadBalance從Eureka注冊(cè)中心獲取服務(wù)注冊(cè)列表,同時(shí)以10s為間隔往EurekaClient發(fā)送ping,來保證服務(wù)的可用性,如果服務(wù)前后發(fā)生改變,則ILoadBalance重新從Eureka注冊(cè)中心獲取。RibbonLoadBalancerClient拿到服務(wù)注冊(cè)列表之后,再根據(jù)IRule具體的策略,去獲取對(duì)應(yīng)的服務(wù)實(shí)例。

3、負(fù)載均衡策略

前面講到RibbonLoadBalancerClient獲取具體服務(wù)實(shí)例的過程,這里就需要了解下負(fù)載均衡策略。眾所周知,使用負(fù)載均衡的好處主要有:當(dāng)一臺(tái)或多臺(tái)機(jī)器宕機(jī)之后,剩余的機(jī)器可以保證服務(wù)正常運(yùn)行;分擔(dān)機(jī)器運(yùn)行的壓力,防止某一高峰機(jī)器CPU負(fù)載過高。
常見的策略有:隨機(jī)(Random)、輪詢(RoundRobin)、一致性哈希(ConsistentHash)、哈希(Hash)、加權(quán)(Weighted)

  • 輪詢(RoundRobin)
public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
        while (server == null && count++ < 10) {
            List<Server> reachableServers = lb.getReachableServers();
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }

            int nextServerIndex = incrementAndGetModulo(serverCount);
            server = allServers.get(nextServerIndex);

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }
private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextServerCyclicCounter.get();
            int next = (current + 1) % modulo;
            if (nextServerCyclicCounter.compareAndSet(current, next))
                return next;
        }
    }

輪詢算法其實(shí)就一句(current + 1) % modulo,每次都取下一臺(tái)服務(wù)器。

  • 隨機(jī)(Random)
    choose方法其實(shí)都差不多,主要看下算法
protected int chooseRandomInt(int serverCount) {
        return ThreadLocalRandom.current().nextInt(serverCount);
    }

ThreadLocalRandom獲取隨機(jī)數(shù)即可

  • 一致性哈希(ConsistentHash)、哈希(Hash)
    這兩個(gè)是很常見的算法,本文就不討論了
  • 加權(quán)(Weighted)、BestAvailableRule、WeightedResponseTimeRule、ZoneAvoidanceRule


    負(fù)載均衡策略方法

    這個(gè)研究起來就又要長(zhǎng)篇大論了,下次再寫篇來介紹吧(下次一定)

Ribbon的源碼分析大概就這樣,后面可能會(huì)不定期更新,有興趣的朋友可以繼續(xù)深入了解下,有啥問題也可以在評(píng)論中一起討論下。
最后有件很重要的事,那就是麻煩點(diǎn)贊關(guān)注贊賞,謝謝(??????)??

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容