Flink源碼分析系列文檔目錄
請(qǐng)點(diǎn)擊:Flink 源碼分析系列文檔目錄
ZooKeeperHaServices
ZooKeeperHaServices包裝了使用Zookeeper方式實(shí)現(xiàn)的Flink一系列高可用服務(wù),比如JobManager,ResourceManager和Dispatcher等leader的選舉操作。
ZooKeeperHaServices通過(guò)HighAvailabilityServicesUtils創(chuàng)建,例如在TaskManagerRunner創(chuàng)建的相關(guān)調(diào)用如下:
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
createHighAvailabilityServices是通過(guò)工廠的方式,根據(jù)configuration中的配置,創(chuàng)建不同的HighAvailabilityServices。
下面我們回到ZooKeeperHaServices。它提供了一系列創(chuàng)建LeaderElectionService和LeaderRetrievalService的方法:
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@Override
public LeaderRetrievalService getDispatcherLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH);
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
return getJobManagerLeaderRetriever(jobID);
}
@Override
public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH);
}
@Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH);
}
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
}
@Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH);
}
下一章我們重點(diǎn)關(guān)注Job Manager選舉服務(wù)。我們找到對(duì)應(yīng)服務(wù)的創(chuàng)建方法,如下所示:
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
}
這里使用了ZooKeeperUtils工具類(lèi),它的createLeaderElectionService方法代碼如下:
public static ZooKeeperLeaderElectionService createLeaderElectionService(
final CuratorFramework client,
final Configuration configuration,
final String pathSuffix) {
final String latchPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix;
final String leaderPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
}
這段代碼讀取了配置中LATCH_PATH和LEADER_PATH的配置信息,創(chuàng)建出了一個(gè)ZooKeeperLeaderElectionService。這些配置的具體含義放在下一章分析。
LeaderElectionService
Flink有多種角色需要參與leader競(jìng)選,例如JobManager和ResourceManager,與此同時(shí)還支持多種leader競(jìng)選的方式比如Zookeeper和Standalone等。如果這些邏輯強(qiáng)耦合在一起,需要編寫(xiě)大量實(shí)現(xiàn),維護(hù)起來(lái)十分困難。
為了解決這個(gè)問(wèn)題,F(xiàn)link將leader選舉模塊拆分,提供了兩個(gè)接口:LeaderContender和LeaderElectionService。所有參與leader競(jìng)選的角色都需要實(shí)現(xiàn)LeaderContender接口,例如JobManagerRunnerImpl和ResourceManager等。所有l(wèi)eader競(jìng)選的方式需要實(shí)現(xiàn)LeaderElectionService,例如ZookeeperLeaderElectionService,StandaloneLeaderElectionService和EmbeddedLeaderElectionService。
LeaderContender接口和各個(gè)方法的用途說(shuō)明如下所示:
public interface LeaderContender {
/**
* Callback method which is called by the {@link LeaderElectionService} upon selecting this
* instance as the new leader. The method is called with the new leader session ID.
*
* @param leaderSessionID New leader session ID
*/
// 回調(diào)函數(shù),如果被選舉為leader,LeaderElectionService會(huì)調(diào)用此方法
void grantLeadership(UUID leaderSessionID);
/**
* Callback method which is called by the {@link LeaderElectionService} upon revoking the
* leadership of a former leader. This might happen in case that multiple contenders have
* been granted leadership.
*/
// 回調(diào)函數(shù),leader角色被收回的時(shí)候調(diào)用
void revokeLeadership();
/**
* Callback method which is called by {@link LeaderElectionService} in case of an error in the
* service thread.
*
* @param exception Caught exception
*/
// leader選舉服務(wù)發(fā)生異常的時(shí)候調(diào)用
void handleError(Exception exception);
/**
* Returns the description of the {@link LeaderContender} for logging purposes.
*
* @return Description of this contender.
*/
default String getDescription() {
return "LeaderContender: " + getClass().getSimpleName();
}
}
LeaderElectionService接口和各個(gè)方法的用途說(shuō)明如下所示:
public interface LeaderElectionService {
/**
* Starts the leader election service. This method can only be called once.
*
* @param contender LeaderContender which applies for the leadership
* @throws Exception
*/
// 啟動(dòng)leader選舉服務(wù),需要把參與競(jìng)選的角色作為參數(shù)傳入
// 只能被調(diào)用一次
void start(LeaderContender contender) throws Exception;
/**
* Stops the leader election service.
* @throws Exception
*/
// 停止leader選舉服務(wù)
void stop() throws Exception;
/**
* Confirms that the {@link LeaderContender} has accepted the leadership identified by the
* given leader session id. It also publishes the leader address under which the leader is
* reachable.
*
* <p>The rational behind this method is to establish an order between setting the new leader
* session ID in the {@link LeaderContender} and publishing the new leader session ID as well
* as the leader address to the leader retrieval services.
*
* @param leaderSessionID The new leader session ID
* @param leaderAddress The address of the new leader
*/
// 確認(rèn)LeaderContender已經(jīng)接收了leader身份
// 同時(shí)也會(huì)公布leader的訪問(wèn)地址
// 這個(gè)方法的實(shí)現(xiàn)需要包含:
// 1. 設(shè)置新leader的id
// 2. 向leader獲取服務(wù)公布新leader的session id和訪問(wèn)地址
void confirmLeadership(UUID leaderSessionID, String leaderAddress);
/**
* Returns true if the {@link LeaderContender} with which the service has been started owns
* currently the leadership under the given leader session id.
*
* @param leaderSessionId identifying the current leader
*
* @return true if the associated {@link LeaderContender} is the leader, otherwise false
*/
// 如果具有l(wèi)eader身份,返回true
boolean hasLeadership(@Nonnull UUID leaderSessionId);
}
ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService是LeaderElectionService的一個(gè)實(shí)現(xiàn),使用Zookeeper協(xié)助選舉leader的過(guò)程。
ZooKeeperLeaderElectionService借助Curator框架的LeaderLatch,實(shí)現(xiàn)leader選舉操作和通知回調(diào)操作。更為詳細(xì)的內(nèi)容參見(jiàn)Curator leader選舉。
ZooKeeperLeaderElectionService使用如下三個(gè)成員變量記錄leader選舉的狀態(tài):
private volatile UUID issuedLeaderSessionID;
private volatile UUID confirmedLeaderSessionID;
private volatile String confirmedLeaderAddress;
其中:
- issuedLeaderSessionID(提議的leader session id)是leader狀態(tài)發(fā)生變更的時(shí)候,會(huì)產(chǎn)生一個(gè)新的session id,將這個(gè)id寫(xiě)入issuedLeaderSessionID,到這一步需要LeaderContender的授予leader身份操作,授權(quán)成功之后才會(huì)更新confirmedLeaderSessionID
- LeaderContender在leader選舉獲得leader身份的時(shí)候,通常需要進(jìn)行一些額外的操作。這些操作完成之后才能確認(rèn)leader的選舉操作完成,才能向集群公布新leader的地址和session id信息
在使用ZooKeeperLeaderElectionService之前需要啟動(dòng)。啟動(dòng)方法(start)的邏輯如下:
@Override
// start方法必須要傳入LeaderContender
public void start(LeaderContender contender) throws Exception {
Preconditions.checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");
LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);
synchronized (lock) {
// 設(shè)置錯(cuò)誤監(jiān)聽(tīng)器
client.getUnhandledErrorListenable().addListener(this);
// 設(shè)置leaderContender
// 需要參與leader競(jìng)選的角色需要實(shí)現(xiàn)LeaderContender接口
leaderContender = contender;
// 設(shè)置leaderLatch的監(jiān)聽(tīng)器,路徑為latchPath
leaderLatch.addListener(this);
// 啟動(dòng)leaderLatch
leaderLatch.start();
// 設(shè)置NodeCache的監(jiān)聽(tīng)器,路徑為leaderPath
cache.getListenable().addListener(this);
// 啟動(dòng)NodeCache
cache.start();
// 設(shè)置Curator連接狀態(tài)的監(jiān)聽(tīng)器
client.getConnectionStateListenable().addListener(listener);
// 設(shè)置運(yùn)行狀態(tài)為正在運(yùn)行
running = true;
}
}
接收LeaderLatch回調(diào)通知的class需要繼承LeaderLatchListener接口。ZooKeeperLeaderElectionService中這兩個(gè)方法的實(shí)現(xiàn)如下:
@Override
// 如果獲取到leader狀態(tài),回調(diào)此方法
public void isLeader() {
synchronized (lock) {
if (running) {
// 創(chuàng)建一個(gè)leader 選舉提議session id,采用UUID
issuedLeaderSessionID = UUID.randomUUID();
// 清除確認(rèn)的leader信息
clearConfirmedLeaderInformation();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Grant leadership to contender {} with session ID {}.",
leaderContender.getDescription(),
issuedLeaderSessionID);
}
// 告訴leaderContender,授予leader角色
// leaderContender在確認(rèn)了授予leader角色和進(jìn)行完相關(guān)操作之后,需要調(diào)用confirmLeadership方法,確認(rèn)新的leader已選舉完畢
leaderContender.grantLeadership(issuedLeaderSessionID);
} else {
LOG.debug("Ignoring the grant leadership notification since the service has " +
"already been stopped.");
}
}
}
@Override
// 如果失去了leader狀態(tài),調(diào)用此方法
public void notLeader() {
synchronized (lock) {
// 如果ZooKeeperLeaderElectionService正在運(yùn)行
if (running) {
LOG.debug(
"Revoke leadership of {} ({}@{}).",
leaderContender.getDescription(),
confirmedLeaderSessionID,
confirmedLeaderAddress);
// 清空issuedLeaderSessionID
issuedLeaderSessionID = null;
// 清除確認(rèn)的leader信息
clearConfirmedLeaderInformation();
// 告訴leaderContender,收回leader角色
leaderContender.revokeLeadership();
} else {
LOG.debug("Ignoring the revoke leadership notification since the service " +
"has already been stopped.");
}
}
}
監(jiān)聽(tīng)NodeCache的變化需要實(shí)現(xiàn)NodeCacheListener接口。ZooKeeperLeaderElectionService的實(shí)現(xiàn)方法如下所示。它的作用為當(dāng)獲取到leader身份的時(shí)候,將leader信息寫(xiě)入Zookeeper
// 如果Node內(nèi)容有變化,調(diào)用此方法
@Override
public void nodeChanged() throws Exception {
try {
// leaderSessionID is null if the leader contender has not yet confirmed the session ID
// 如果競(jìng)選leader成功
if (leaderLatch.hasLeadership()) {
synchronized (lock) {
if (running) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Leader node changed while {} is the leader with session ID {}.",
leaderContender.getDescription(),
confirmedLeaderSessionID);
}
// 如果保存的有已確認(rèn)的leader sessionID
if (confirmedLeaderSessionID != null) {
// 讀取NodeCache的內(nèi)容
ChildData childData = cache.getCurrentData();
// 如果NodeCache沒(méi)有內(nèi)容
if (childData == null) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Writing leader information into empty node by {}.",
leaderContender.getDescription());
}
// 寫(xiě)入leader的信息
writeLeaderInformation();
} else {
// 如果NodeCache有內(nèi)容,把內(nèi)容讀取出來(lái)
byte[] data = childData.getData();
// 如果內(nèi)容為空或者是長(zhǎng)度為0
if (data == null || data.length == 0) {
// the data field seems to be empty, rewrite information
if (LOG.isDebugEnabled()) {
LOG.debug(
"Writing leader information into node with empty data field by {}.",
leaderContender.getDescription());
}
// 寫(xiě)入leader信息
writeLeaderInformation();
} else {
// 如果有數(shù)據(jù)的話
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bais);
// 讀取leader的地址和leader sessionID
String leaderAddress = ois.readUTF();
UUID leaderSessionID = (UUID) ois.readObject();
// 如果leader地址和已確認(rèn)的leader地址不同,需要寫(xiě)入leader信息
if (!leaderAddress.equals(confirmedLeaderAddress) ||
(leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) {
// the data field does not correspond to the expected leader information
if (LOG.isDebugEnabled()) {
LOG.debug(
"Correcting leader information by {}.",
leaderContender.getDescription());
}
writeLeaderInformation();
}
}
}
}
} else {
LOG.debug("Ignoring node change notification since the service has already been stopped.");
}
}
}
} catch (Exception e) {
leaderContender.handleError(new Exception("Could not handle node changed event.", e));
throw e;
}
}
上面的邏輯中多次調(diào)用了writeLeaderInformation方法。此方法負(fù)責(zé)將leader的信息(地址和session id)寫(xiě)入到Zookeeper。
protected void writeLeaderInformation() {
// this method does not have to be synchronized because the curator framework client
// is thread-safe
// curator是線程安全的,因此這里不需要設(shè)置為synchronized
try {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Write leader information: Leader={}, session ID={}.",
confirmedLeaderAddress,
confirmedLeaderSessionID);
}
// 將leader的地址和session id寫(xiě)入baos
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeUTF(confirmedLeaderAddress);
oos.writeObject(confirmedLeaderSessionID);
oos.close();
// 標(biāo)記leader信息是否已寫(xiě)入
boolean dataWritten = false;
// 如果leader信息沒(méi)有寫(xiě)入,并且競(jìng)選獲得了leader權(quán)限
while (!dataWritten && leaderLatch.hasLeadership()) {
// 獲取leaderPath這個(gè)Node的狀態(tài)
Stat stat = client.checkExists().forPath(leaderPath);
if (stat != null) {
// 獲取臨時(shí)節(jié)點(diǎn)的創(chuàng)建者
long owner = stat.getEphemeralOwner();
獲取當(dāng)前會(huì)話的ID
long sessionID = client.getZookeeperClient().getZooKeeper().getSessionId();
// 如果這個(gè)臨時(shí)節(jié)點(diǎn)是當(dāng)前會(huì)話創(chuàng)建的
if (owner == sessionID) {
try {
// 將baos的數(shù)據(jù)(leader地址和session id)寫(xiě)入節(jié)點(diǎn)
client.setData().forPath(leaderPath, baos.toByteArray());
// 標(biāo)記data已經(jīng)寫(xiě)入
dataWritten = true;
} catch (KeeperException.NoNodeException noNode) {
// node was deleted in the meantime
}
} else {
try {
// 如果這個(gè)節(jié)點(diǎn)不是當(dāng)前會(huì)話創(chuàng)建的,刪除此節(jié)點(diǎn)
client.delete().forPath(leaderPath);
} catch (KeeperException.NoNodeException noNode) {
// node was deleted in the meantime --> try again
}
}
} else {
try {
// 如果該節(jié)點(diǎn)不存在
// 創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn),數(shù)據(jù)為baos中的數(shù)據(jù)
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
leaderPath,
baos.toByteArray());
// 標(biāo)記data已經(jīng)寫(xiě)入
dataWritten = true;
} catch (KeeperException.NodeExistsException nodeExists) {
// node has been created in the meantime --> try again
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Successfully wrote leader information: Leader={}, session ID={}.",
confirmedLeaderAddress,
confirmedLeaderSessionID);
}
} catch (Exception e) {
leaderContender.handleError(
new Exception("Could not write leader address and leader session ID to " +
"ZooKeeper.", e));
}
}
除了以上的接口方法外,ZooKeeperLeaderElectionService還是curator client連接狀態(tài)的監(jiān)聽(tīng)器(實(shí)現(xiàn)了UnhandledErrorListener)。如果client連接出現(xiàn)問(wèn)題,unhandledError方法會(huì)被調(diào)用:
@Override
public void unhandledError(String message, Throwable e) {
// 如果遇到異常,告知leaderContender
leaderContender.handleError(new FlinkException("Unhandled error in ZooKeeperLeaderElectionService: " + message, e));
}
ZooKeeperLeaderRetrievalService
經(jīng)過(guò)上面的分析我們已經(jīng)清楚了Leader選舉的過(guò)程。但是有一個(gè)問(wèn)題,F(xiàn)link的各個(gè)角色是怎么知道leader信息的呢?
為了解決這個(gè)問(wèn)題,F(xiàn)link引入了LeaderRetrievalService。這個(gè)Service負(fù)責(zé)獲取當(dāng)前l(fā)eader的相關(guān)信息。LeaderRetrievalService持有一個(gè)LeaderRetrievalListener對(duì)象,用于等leader發(fā)生變更的時(shí)候發(fā)出通知。
ZooKeeperLeaderRetrievalService為LeaderRetrievalService的一個(gè)實(shí)現(xiàn)方式,從Zookeeper中讀取leader的信息并通知listener。
ZooKeeperLeaderRetrievalService在使用之前,需要先啟動(dòng)它。start方法的內(nèi)容如下所示。主要進(jìn)行一些初始化工作。
@Override
// 這里需要傳入一個(gè)LeaderRetrievalListener對(duì)象
public void start(LeaderRetrievalListener listener) throws Exception {
Preconditions.checkNotNull(listener, "Listener must not be null.");
Preconditions.checkState(leaderListener == null, "ZooKeeperLeaderRetrievalService can " +
"only be started once.");
LOG.info("Starting ZooKeeperLeaderRetrievalService {}.", retrievalPath);
synchronized (lock) {
// 記錄下listener
leaderListener = listener;
// 設(shè)置client錯(cuò)誤監(jiān)聽(tīng)器
client.getUnhandledErrorListenable().addListener(this);
// 設(shè)置NodeCache監(jiān)聽(tīng)器,node路徑為retrievalPath,和leader選舉服務(wù)中保存leader信息的node路徑一致
cache.getListenable().addListener(this);
// 啟動(dòng)NodeCache
cache.start();
// 設(shè)置client連接狀態(tài)監(jiān)聽(tīng)器
client.getConnectionStateListenable().addListener(connectionStateListener);
running = true;
}
}
通知listener的調(diào)用時(shí)機(jī)在curator間聽(tīng)到NodeCache發(fā)生變化的時(shí)候,和ZooKeeperLeaderElectionService相同,這段邏輯位于nodeChanged方法中,如下所示:
@Override
public void nodeChanged() throws Exception {
synchronized (lock) {
if (running) {
try {
LOG.debug("Leader node has changed.");
// 讀取NodeCache的內(nèi)容
ChildData childData = cache.getCurrentData();
String leaderAddress;
UUID leaderSessionID;
if (childData == null) {
leaderAddress = null;
leaderSessionID = null;
} else {
byte[] data = childData.getData();
if (data == null || data.length == 0) {
leaderAddress = null;
leaderSessionID = null;
} else {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bais);
// 讀取leader地址和session id
leaderAddress = ois.readUTF();
leaderSessionID = (UUID) ois.readObject();
}
}
// 通知新的leader地址
notifyIfNewLeaderAddress(leaderAddress, leaderSessionID);
} catch (Exception e) {
leaderListener.handleError(new Exception("Could not handle node changed event.", e));
throw e;
}
} else {
LOG.debug("Ignoring node change notification since the service has already been stopped.");
}
}
}
我們接下來(lái)分析下notifyIfNewLeaderAddress方法。該方法最終通知了listener leader信息變更。
@GuardedBy("lock")
private void notifyIfNewLeaderAddress(String newLeaderAddress, UUID newLeaderSessionID) {
// 新老leader address或者是session id不同的時(shí)候,才會(huì)通知listener
if (!(Objects.equals(newLeaderAddress, lastLeaderAddress) &&
Objects.equals(newLeaderSessionID, lastLeaderSessionID))) {
if (newLeaderAddress == null && newLeaderSessionID == null) {
LOG.debug("Leader information was lost: The listener will be notified accordingly.");
} else {
LOG.debug(
"New leader information: Leader={}, session ID={}.",
newLeaderAddress,
newLeaderSessionID);
}
lastLeaderAddress = newLeaderAddress;
lastLeaderSessionID = newLeaderSessionID;
// 調(diào)用listener的notifyLeaderAddress方法
leaderListener.notifyLeaderAddress(newLeaderAddress, newLeaderSessionID);
}
}
本文為原創(chuàng)內(nèi)容,歡迎大家討論、批評(píng)指正與轉(zhuǎn)載。轉(zhuǎn)載時(shí)請(qǐng)注明出處。