本人小白,這個(gè)文章是本人的閱讀筆記,不是權(quán)威解讀,需要自己甄別對(duì)錯(cuò)
最后一個(gè)部分了,就是IPing組件了,接著看看這個(gè)
IPing機(jī)制在我的理解當(dāng)中可能就是就是彌補(bǔ)Eureka的服務(wù)信息變更的延時(shí)效應(yīng)的。
我們算算,假設(shè)一個(gè)服務(wù)宕機(jī)了,其他服務(wù)Eureka Client列表最慢獲取到服務(wù)變更消息的時(shí)間,也是復(fù)習(xí)下Eureka的相關(guān)知識(shí)
- 服務(wù)故障移除 30S執(zhí)行一次,但是的但是,服務(wù)租約的過期是需要2 * 90S,就是3分鐘,取最長的,那么需要6次檢測(cè)才能感知故障進(jìn)行移除。
- 故障實(shí)例移除了,但是只是清空了注冊(cè)表和讀寫緩存,只讀緩存需要花費(fèi)30S進(jìn)行同步,才能從最近變更隊(duì)列中獲取。
- Eureka Client是花費(fèi)30S進(jìn)行一次增量獲取
假設(shè)啊,時(shí)間都錯(cuò)開了,那么SumTime = 180 + 30 + 30 大概需要4分鐘,并且Ribbon的AllList從Eureka Client本地注冊(cè)表拉取也是30S進(jìn)行一次,那么感知一個(gè)服務(wù)宕機(jī)了,需要花費(fèi)好幾分鐘,這不是有點(diǎn)小坑?
這種情況下,在并發(fā)比較高的場景,那么請(qǐng)求宕機(jī)服務(wù)的線程都會(huì)超時(shí),超時(shí)造成的當(dāng)前線程資源耗盡會(huì)引發(fā)服務(wù)雪崩效應(yīng),不知道是不是基于這個(gè)考慮,Hystrix中資源隔離和服務(wù)熔斷降級(jí)很好的解決了這個(gè)情況!
IPing機(jī)制在一定情況下也緩解了這種情況發(fā)生。
說說主要的IPing實(shí)現(xiàn)吧
首先IPing是怎么實(shí)例化的呢?
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
return this.propertiesFactory.get(IPing.class, config, serviceId);
}
NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
ping.initWithNiwsConfig(config);
return ping;
}
如果配置了自定義IPing機(jī)制就用自己那一套,沒配就走NIWSDiscoveryPing
那么在哪進(jìn)行使用的呢,在ZoneAwareLoadBalancer進(jìn)行初始化的時(shí)候傳入了,向上追蹤父類構(gòu)造,你會(huì)發(fā)現(xiàn)在BaseLoadBalancer初始化時(shí)候使用了,順便說一嘴,BaseLoadBalancer是原生的Ribbon調(diào)用接口類。
String clientName = clientConfig.getClientName();
this.name = clientName;
// 默認(rèn)定時(shí)調(diào)度30S
int pingIntervalTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerPingInterval,
Integer.parseInt("30")));
int maxTotalPingTime = Integer.parseInt(""
+ clientConfig.getProperty(
CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
Integer.parseInt("2")));
//啟動(dòng)IPing機(jī)制
setPingInterval(pingIntervalTime);
setMaxTotalPingTime(maxTotalPingTime);
// cross associate with each other
// i.e. Rule,Ping meet your container LB
// LB, these are your Ping and Rule guys ...
setRule(rule);
setPing(ping);
接著會(huì)觸發(fā)一個(gè)定時(shí)任務(wù)
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true);
//30S執(zhí)行一次
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
任務(wù)核心執(zhí)行方法
public void runPinger() throws Exception {
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
/*
* The readLock should be free unless an addServer operation is
* going on...
*/
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
//使用IPing的isAlive()方法判斷服務(wù)是否可用
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i++) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
//重置服務(wù)狀態(tài)
svr.setAlive(isAlive);
if (oldIsAlive != isAlive) {
//加入服務(wù)狀態(tài)改變列表
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
//通知服務(wù)狀態(tài)變更
notifyServerStatusChangeListener(changedServers);
} finally {
pingInProgress.set(false);
}
}
}
就是這一塊調(diào)用了,目的也是重置服務(wù)狀態(tài)
具體的實(shí)現(xiàn)類我們看幾個(gè)主要的,不常用的就懶得看了
NIWSDiscoveryPing
默認(rèn)實(shí)現(xiàn)
public boolean isAlive(Server server) {
boolean isAlive = true;
if (server!=null && server instanceof DiscoveryEnabledServer){
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo!=null){
InstanceStatus status = instanceInfo.getStatus();
if (status!=null){
isAlive = status.equals(InstanceStatus.UP);
}
}
}
return isAlive;
}
從本地注冊(cè)表拉取判斷服務(wù)實(shí)例狀態(tài),但是有個(gè)疑問,Ribbon的AllList拉取,即enableAndInitLearnNewServersFeature()啟動(dòng)的定時(shí)更新AllList任務(wù)也是30S執(zhí)行一次,這個(gè)任務(wù)是全量更新服務(wù)列表,那么這個(gè)IPing的作用是???不懂,待大佬解答啊
PingUrl
public boolean isAlive(Server server) {
String urlStr = "";
if (isSecure){
urlStr = "https://";
}else{
urlStr = "http://";
}
urlStr += server.getId();
urlStr += getPingAppendString();
boolean isAlive = false;
HttpClient httpClient = new DefaultHttpClient();
HttpUriRequest getRequest = new HttpGet(urlStr);
String content=null;
try {
HttpResponse response = httpClient.execute(getRequest);
content = EntityUtils.toString(response.getEntity());
isAlive = (response.getStatusLine().getStatusCode() == 200);
if (getExpectedContent()!=null){
LOGGER.debug("content:" + content);
if (content == null){
isAlive = false;
}else{
if (content.equals(getExpectedContent())){
isAlive = true;
}else{
isAlive = false;
}
}
}
} catch (IOException e) {
e.printStackTrace();
}finally{
// Release the connection.
getRequest.abort();
}
return isAlive;
}
就是直接請(qǐng)求服務(wù)了,默認(rèn)的就是http://192.168.31.128:8080,默認(rèn)情況下返回200就任務(wù)服務(wù)可用,當(dāng)然可配置接口和返回值進(jìn)行二次驗(yàn)證
這個(gè)機(jī)制對(duì)Eureka Client服務(wù)感知延遲做了一定的保障,但是這個(gè)是不是有點(diǎn)耗費(fèi)性能
好了,IPing機(jī)制就說這點(diǎn),沒啥好說的