首先服務(wù)提供方需要定義接口,
EchoService.java
package com.xx.pigeon.demo;
public interface EchoService {
public String echo(String name);
}
同時,服務(wù)提供方同時需要實現(xiàn)該接口,然后服務(wù)提供方就可以注冊服務(wù),傳統(tǒng)的spring注冊方法如下:
<bean class="com.dianping.dpsf.spring.ServiceRegistry"
init-method="init">
<property name="services">
<map>
<entry key="http://service.xx.com/demoService/echoService_1.0.0"
value-ref="echoServiceImpl" />
</map>
</property>
</bean>
OK,然后我們啟動spring上下文的時候,spring會幫助我們初始化這個bean然后調(diào)用init方法,這個方法調(diào)用了
ServiceFactory.addServices(providerConfigList);
ServiceFactory這個類是個工廠類,它會幫我們初始化服務(wù)。剩下就是RPC框架幫我們做的一些事情了。其核心的方法是:
ServicePublisher.addService(providerConfig);
ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig);
ServicePublisher.publishService(providerConfig, false);
其中ServicePublisher.addService(providerConfig) 這個方法主要處理了serviceName,methodName,parameters,同時對服務(wù)進行緩存。
ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig)則是啟動了“容器”。pigeon目前支持兩層,http和tcp。http方面,pigeon會啟動內(nèi)置的jetty,提供了一些服務(wù)控制的方法,比如publish、unpublish、online、offline等,同時支持用http來進行RPC。tcp方面,pigeon底層依賴netty進行,所謂啟動容器,這里是啟動對特定端口的監(jiān)聽。另外,這個方法還會對請求處理器進行注冊,這個后面會聊到。
ServicePublisher.publishService(providerConfig, false);該方法為核心方法,主要?用于服務(wù)發(fā)布。
if (existingService) {
boolean autoPublishEnable = ConfigManagerLoader.getConfigManager().getBooleanValue(
Constants.KEY_AUTOPUBLISH_ENABLE, true);
if (autoPublishEnable || forcePublish) {
List<Server> servers = ProviderBootStrap.getServers(providerConfig);
int registerCount = 0;
for (Server server : servers) {
publishService(url, server.getRegistryUrl(url), server.getPort(), providerConfig.getServerConfig()
.getGroup(), providerConfig.isSupported());//注冊服務(wù)到zk
registerCount++;
}
if (registerCount > 0) {
boolean isHeartbeatEnable = configManager.getBooleanValue(Constants.KEY_HEARTBEAT_ENABLE,
DEFAULT_HEARTBEAT_ENABLE);
if (isHeartbeatEnable) {
HeartBeatListener.registerHeartBeat(providerConfig);//注冊心跳上報
}
boolean isNotify = configManager
.getBooleanValue(Constants.KEY_NOTIFY_ENABLE, DEFAULT_NOTIFY_ENABLE);
if (isNotify && serviceChangeListener != null) {
serviceChangeListener.notifyServicePublished(providerConfig);//向pigeon的管理中心上報發(fā)布情況
}
boolean autoRegisterEnable = ConfigManagerLoader.getConfigManager().getBooleanValue(
Constants.KEY_AUTOREGISTER_ENABLE, true);
if (autoRegisterEnable) {
ServiceOnlineTask.start();//上線服務(wù)
} else {
logger.info("auto register is disabled");
}
providerConfig.setPublished(true);
}
} else {
logger.info("auto publish is disabled");
}
}
首先會調(diào)用方法publishService(url, server.getRegistryUrl(url), server.getPort(), providerConfig.getServerConfig()
.getGroup(), providerConfig.isSupported()),該方法會先計算weight(權(quán)重),一般會初始化為0,然后再進行服務(wù)注冊,
RegistryManager.getInstance().registerService(registryUrl, group, serverAddress, weight);//注冊服務(wù)地址和服務(wù)名
RegistryManager.getInstance().registerSupportNewProtocol(serverAddress, registryUrl, support);//注冊服務(wù)協(xié)議
if (weight >= 0) {
if (!serverWeightCache.containsKey(serverAddress)) {
RegistryManager.getInstance().setServerApp(serverAddress, configManager.getAppName());//注冊應(yīng)用名,一個實例注冊一次
RegistryManager.getInstance().setServerVersion(serverAddress, VersionUtils.VERSION);//注冊服務(wù)版本,一個實例注冊一次
}
serverWeightCache.put(serverAddress, weight);
}
這幾個registry其實大同小異,都是將特定的值寫入zk,以注冊服務(wù)為例,
void registerPersistentNode(String serviceName, String group, String serviceAddress, int weight)
throws RegistryException {
String weightPath = Utils.getWeightPath(serviceAddress);
String servicePath = Utils.getServicePath(serviceName, group);
try {
if (client.exists(servicePath, false)) {
Stat stat = new Stat();
String addressValue = client.get(servicePath, stat);
String[] addressArray = addressValue.split(",");
List<String> addressList = new ArrayList<String>();
for (String addr : addressArray) {
addr = addr.trim();
if (addr.length() > 0 && !addressList.contains(addr)) {
addressList.add(addr.trim());
}
}
if (!addressList.contains(serviceAddress)) {
addressList.add(serviceAddress);
Collections.sort(addressList);
client.set(servicePath, StringUtils.join(addressList.iterator(), ","), stat.getVersion());
}
} else {
client.create(servicePath, serviceAddress);
}
if (weight >= 0) {
client.set(weightPath, "" + weight);
}
if (logger.isInfoEnabled()) {
logger.info("registered service to persistent node: " + servicePath);
}
} catch (Throwable e) {
if(e instanceof BadVersionException || e instanceof NodeExistsException) {
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
//ignore
}
registerPersistentNode(serviceName, group, serviceAddress, weight);
} else {
logger.error("failed to register service to " + servicePath, e);
throw new RegistryException(e);
}
}
}
該方法就是將服務(wù)地址和服務(wù)名寫入zk。其中這里有一個Group的概念,其實就是將同一個服務(wù)不同的實例可以分到不同的swimlane里面。
當(dāng)然RegistryManager可以有不同的實現(xiàn),你也可以選擇將服務(wù)信息寫到別的地方,比如db。默認實現(xiàn)是寫入zk,這也是大型分布式系統(tǒng)比較常用的方法。
然后是注冊心跳上報,這個后面會具體談到。接著,向pigeon的管理中心上報發(fā)布情況,這個其實在整個服務(wù)中,不是必須的。
最后就是服務(wù)上線,從前面的代碼我們可以知道服務(wù)的權(quán)重一般會被初始化為0,而客戶端在調(diào)用服務(wù)的時候會利用權(quán)重來負載均衡,也就是說權(quán)重是0的服務(wù)基本不會被調(diào)用到,那么就可以理解為通過修改權(quán)重來進行上線或者下線服務(wù)。
public void run() {
logger.info("Service online task start");
try {
Thread.sleep(delay);
if (!isStop) {
ServiceFactory.online();
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
啟動是利用另外一個線程進行的,可以設(shè)置一個時延。
public static void online() throws RegistryException {
logger.info("online");
ServicePublisher.setServerWeight(Constants.WEIGHT_DEFAULT);
/*ServiceProviderFactory.notifyServiceOnline();*/
}
修改服務(wù)權(quán)重,用來上線服務(wù)。
至此,就完成了全部的服務(wù)注冊和發(fā)布的過程。