
年前聊了Eureka和Zookeeper的區(qū)別,然后微服務(wù)架構(gòu)系列就鴿了三個(gè)多月,一直沉迷逛B站,無法自拔。最近公司復(fù)工,工作狀態(tài)慢慢恢復(fù)(又是元?dú)鉂M滿地劃水)。本文從以下3個(gè)方面進(jìn)行分析(參考了翟永超[程序猿DD])的《Spring Cloud微服務(wù)實(shí)戰(zhàn)》)
- LoadBalancerInterceptor攔截器對(duì)RestTemplate的請(qǐng)求攔截;
- RibbonLoadBalancerClient實(shí)際接口實(shí)現(xiàn);
- 負(fù)載均衡策略
1、LoadBalancerInterceptor源碼
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
}
可以看出,該攔截器注入了LoadBalancerClient實(shí)例,當(dāng)一個(gè)被@LoadBalanced修飾的RestTemplate對(duì)象發(fā)起Http請(qǐng)求,會(huì)被LoadBalancerInterceptor中的intercept函數(shù)攔截。該函數(shù)會(huì)通過getHost()獲取Http請(qǐng)求的服務(wù)名,恰巧我們使用的RestTemplate對(duì)象采用服務(wù)名作為Host,接著loadBalancer查找到對(duì)應(yīng)服務(wù)名的服務(wù),調(diào)用execute函數(shù)對(duì)該服務(wù)發(fā)起請(qǐng)求。
2、RibbonLoadBalancerClient實(shí)現(xiàn)
/**
* New: Execute a request by selecting server using a 'key'. The hint will have to be
* the last parameter to not mess with the `execute(serviceId, ServiceInstance,
* request)` method. This somewhat breaks the fluent coding style when using a lambda
* to define the LoadBalancerRequest.
* @param <T> returned request execution result type
* @param serviceId id of the service to execute the request to
* @param request to be executed
* @param hint used to choose appropriate {@link Server} instance
* @return request execution result
* @throws IOException executing the request may result in an {@link IOException}
*/
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
經(jīng)過LoadBalancerInterceptor攔截器后,調(diào)用LoadBalancerClient的execute函數(shù)去發(fā)起對(duì)應(yīng)服務(wù)的請(qǐng)求。(LoadBalancerClient只是個(gè)抽象的負(fù)載均衡接口,RibbonLoadBalancerClient則是該接口的具體實(shí)現(xiàn))
execute函數(shù)的作用,如官方所說:通過‘key’找到對(duì)應(yīng)的服務(wù)并執(zhí)行請(qǐng)求。
從源碼中可以看出,execute函數(shù)具體實(shí)現(xiàn)首先是定義一個(gè)傳入serviceId的loadBalancer對(duì)象,再getServer獲取對(duì)應(yīng)的具體服務(wù),最后通過ribbonServer整合一系列服務(wù)信息發(fā)起請(qǐng)求。
其中g(shù)etServer()是關(guān)鍵操作,來看看對(duì)應(yīng)的源碼:
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
return loadBalancer.chooseServer(hint != null ? hint : "default");
}
顯然,需要再深入看下loadBalancer 。
public interface ILoadBalancer {
/**
* Initial list of servers.
* This API also serves to add additional ones at a later time
* The same logical server (host:port) could essentially be added multiple times
* (helpful in cases where you want to give more "weightage" perhaps ..)
*
* @param newServers new servers to add
*/
public void addServers(List<Server> newServers);
/**
* Choose a server from load balancer.
*
* @param key An object that the load balancer may use to determine which server to return. null if
* the load balancer does not use this parameter.
* @return server chosen
*/
public Server chooseServer(Object key);
/**
* To be called by the clients of the load balancer to notify that a Server is down
* else, the LB will think its still Alive until the next Ping cycle - potentially
* (assuming that the LB Impl does a ping)
*
* @param server Server to mark as down
*/
public void markServerDown(Server server);
/**
* @deprecated 2016-01-20 This method is deprecated in favor of the
* cleaner {@link #getReachableServers} (equivalent to availableOnly=true)
* and {@link #getAllServers} API (equivalent to availableOnly=false).
*
* Get the current list of servers.
*
* @param availableOnly if true, only live and available servers should be returned
*/
@Deprecated
public List<Server> getServerList(boolean availableOnly);
/**
* @return Only the servers that are up and reachable.
*/
public List<Server> getReachableServers();
/**
* @return All known servers, both reachable and unreachable.
*/
public List<Server> getAllServers();
}
ILoadBalancer定義了客戶端負(fù)載均衡器的一系列抽象操作接口,從官方說明看出:
- addServers:向負(fù)載均衡器的實(shí)例列表中添加新的服務(wù)實(shí)例
- chooseServer:通過某種策略,挑選出一個(gè)具體的服務(wù)實(shí)例
- markServerDown:通知并標(biāo)識(shí)負(fù)載均衡器中某個(gè)具體服務(wù)實(shí)例已停止服務(wù),不然的話,負(fù)載均衡器在下一次獲取具體服務(wù)實(shí)例的時(shí)候,還會(huì)以為該服務(wù)正常
- getReachableServers:獲取可正常使用的服務(wù)實(shí)例列表
- getAllServers:獲取所有服務(wù)實(shí)例列表,包括正常和停止的
來看看具體實(shí)現(xiàn)BaseLoadBalancer,
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
IPing ping, IPingStrategy pingStrategy) {
logger.debug("LoadBalancer [{}]: initialized", name);
this.name = name;
this.ping = ping;
this.pingStrategy = pingStrategy;
setRule(rule);
setupPingTask();
lbStats = stats;
init();
}
默認(rèn)構(gòu)造函數(shù)ping設(shè)為null,rule策略默認(rèn)設(shè)為輪詢(RoundRobin)。該構(gòu)造函數(shù)除了基本的賦值之外,主要是setRule(設(shè)置負(fù)載均衡策略)和setupPingTask(啟動(dòng)ping心跳任務(wù))。
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
setupPingTask邏輯主要是定義ShutdownEnabledTimer實(shí)例來執(zhí)行一個(gè)10秒間隔的schedule。timer定時(shí)器還定義了個(gè)PingTask任務(wù)
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
官方注釋中,TimerTask會(huì)在自定義的時(shí)間間隔內(nèi)檢查服務(wù)實(shí)例列表中每個(gè)服務(wù)實(shí)例的運(yùn)行狀態(tài)。
再看看PingTask 任務(wù)里runPinger方法的關(guān)鍵邏輯:
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i++) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
if (oldIsAlive != isAlive) {
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
notifyServerStatusChangeListener(changedServers);
從源碼可以看出,PingTask運(yùn)行runPinger方法,根據(jù)pingerStrategy.pingServers(ping, allServers)來獲取服務(wù)的可用性,然后對(duì)比前后服務(wù)的狀態(tài),如果狀態(tài)一致,則不去EurekaClient(一般用Eureka作為注冊(cè)中心,可換成其他注冊(cè)中心)獲取注冊(cè)列表;否則,則調(diào)用notifyServerStatusChangeListener通知EurekaClient更新或重新拉取。
簡(jiǎn)單總結(jié)下完整的過程:
RibbonLoadBalancerClient(負(fù)載均衡客戶端)初始化(調(diào)用execute),通過ILoadBalance從Eureka注冊(cè)中心獲取服務(wù)注冊(cè)列表,同時(shí)以10s為間隔往EurekaClient發(fā)送ping,來保證服務(wù)的可用性,如果服務(wù)前后發(fā)生改變,則ILoadBalance重新從Eureka注冊(cè)中心獲取。RibbonLoadBalancerClient拿到服務(wù)注冊(cè)列表之后,再根據(jù)IRule具體的策略,去獲取對(duì)應(yīng)的服務(wù)實(shí)例。
3、負(fù)載均衡策略
前面講到RibbonLoadBalancerClient獲取具體服務(wù)實(shí)例的過程,這里就需要了解下負(fù)載均衡策略。眾所周知,使用負(fù)載均衡的好處主要有:當(dāng)一臺(tái)或多臺(tái)機(jī)器宕機(jī)之后,剩余的機(jī)器可以保證服務(wù)正常運(yùn)行;分擔(dān)機(jī)器運(yùn)行的壓力,防止某一高峰機(jī)器CPU負(fù)載過高。
常見的策略有:隨機(jī)(Random)、輪詢(RoundRobin)、一致性哈希(ConsistentHash)、哈希(Hash)、加權(quán)(Weighted)
- 輪詢(RoundRobin)
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
輪詢算法其實(shí)就一句(current + 1) % modulo,每次都取下一臺(tái)服務(wù)器。
- 隨機(jī)(Random)
choose方法其實(shí)都差不多,主要看下算法
protected int chooseRandomInt(int serverCount) {
return ThreadLocalRandom.current().nextInt(serverCount);
}
ThreadLocalRandom獲取隨機(jī)數(shù)即可
- 一致性哈希(ConsistentHash)、哈希(Hash)
這兩個(gè)是很常見的算法,本文就不討論了 -
加權(quán)(Weighted)、BestAvailableRule、WeightedResponseTimeRule、ZoneAvoidanceRule
負(fù)載均衡策略方法
這個(gè)研究起來就又要長(zhǎng)篇大論了,下次再寫篇來介紹吧(下次一定)
Ribbon的源碼分析大概就這樣,后面可能會(huì)不定期更新,有興趣的朋友可以繼續(xù)深入了解下,有啥問題也可以在評(píng)論中一起討論下。
最后有件很重要的事,那就是麻煩點(diǎn)贊關(guān)注贊賞,謝謝(??????)??
