Flink源碼分析系列文檔目錄
請點擊:Flink 源碼分析系列文檔目錄
Kerberos認證時間限制問題
Kerberos認證限制了TGT的最大有效時間(max_life和ticket_lifitime),單次認證后超過該期限,TGT會失效,需要重新認證。
Kerberos提供了續(xù)期機制,在TGT過期的時候可以通過類似kinit -R方式續(xù)期,續(xù)期之后會再次獲得長度為ticket_lifitime的有效時間。但是續(xù)期并不是無限的,最大可續(xù)期時間受到max_renewable_life和renew_lifetime的限制。
Kerberos的上述機制確保了認證之后不會長期有效,一定程度上提高了安全性。但是針對Flink流計算這種需要長期運行的任務(wù),Kerbero的認證時間限制阻礙了其穩(wěn)定運行。
Flink為了解決這個問題,在1.17及其之后的版本增加了定期自動認證功能。本篇為大家?guī)碓摴δ芟嚓P(guān)代碼的分析。
自動續(xù)期/認證相關(guān)代碼
Flink安全認證的代碼參見Flink 源碼之安全認證。
本篇作為其補充,分析1.17版本之后Kerberos認證邏輯以及自動續(xù)期模塊。
我們從分析flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java的install方法開始分析。該方法使用Hadoop提供的UserGroupInformation進行Kerberos認證。
@Override
public void install() throws SecurityInstallException {
// UserGroupInformation以下簡稱UGI
UserGroupInformation.setConfiguration(hadoopConfiguration);
UserGroupInformation loginUser;
try {
// KerberosLoginProvider包裝了UGI的login操作,以及Flink的Security相關(guān)配置信息
KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(securityConfig);
// 如果啟用了安全配置并且principal等配置合法
if (kerberosLoginProvider.isLoginPossible(true)) {
// 調(diào)用UGI的login...方法,執(zhí)行認證
kerberosLoginProvider.doLogin(true);
// 獲取loginUser
loginUser = UserGroupInformation.getLoginUser();
// 對于代理用戶,F(xiàn)link只支持delegation token在Flink外部維護
if (HadoopUserUtils.isProxyUser((loginUser))
&& securityConfig
.getFlinkConfig()
.get(SecurityOptions.DELEGATION_TOKENS_ENABLED)) {
throw new UnsupportedOperationException(
"Hadoop Proxy user is supported only when"
+ " delegation tokens fetch is managed outside of Flink!"
+ " Please try again with "
+ SecurityOptions.DELEGATION_TOKENS_ENABLED.key()
+ " config set to false!");
}
// 如果是通過keytab認證的話
if (loginUser.isFromKeytab()) {
// user憑據(jù)中加入token file對應(yīng)的憑據(jù)
String fileLocation =
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
Credentials credentials =
Credentials.readTokenStorageFile(
new File(fileLocation), hadoopConfiguration);
loginUser.addCredentials(credentials);
}
// 這一步是關(guān)鍵,增加了定期TGT續(xù)約方法調(diào)用
tgtRenewalExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory("TGTRenewalExecutorService"));
// 啟動定期續(xù)約服務(wù)
startTGTRenewal(tgtRenewalExecutorService, loginUser);
}
} else {
loginUser = UserGroupInformation.getLoginUser();
}
LOG.info("Hadoop user set to {}", loginUser);
boolean isKerberosSecurityEnabled =
HadoopUserUtils.hasUserKerberosAuthMethod(loginUser);
LOG.info(
"Kerberos security is {}.", isKerberosSecurityEnabled ? "enabled" : "disabled");
if (isKerberosSecurityEnabled) {
LOG.info(
"Kerberos credentials are {}.",
loginUser.hasKerberosCredentials() ? "valid" : "invalid");
}
} catch (Throwable ex) {
throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
}
}
在上面代碼中,自動續(xù)期的關(guān)鍵是startTGTRenewal方法,它啟動一個周期執(zhí)行的任務(wù)。該任務(wù)周期執(zhí)行checkTGTAndReloginFromKeytab方法。周期觸發(fā)時間由配置項security.kerberos.relogin.period指定,默認為1分鐘。
@VisibleForTesting
void startTGTRenewal(
ScheduledExecutorService tgtRenewalExecutorService, UserGroupInformation loginUser) {
LOG.info("Starting TGT renewal task");
long tgtRenewalPeriod = securityConfig.getTgtRenewalPeriod().toMillis();
tgtRenewalExecutorService.scheduleAtFixedRate(
() -> {
// In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but
// in Hadoop
// 3.x, it is configurable (see
// hadoop.kerberos.keytab.login.autorenewal.enabled, added
// in HADOOP-9567). This task will make sure that the user stays logged in
// regardless of
// that configuration's value. Note that checkTGTAndReloginFromKeytab() is a
// no-op if
// the TGT does not need to be renewed yet.
try {
LOG.debug("Renewing TGT");
loginUser.checkTGTAndReloginFromKeytab();
LOG.debug("TGT renewed successfully");
} catch (Exception e) {
LOG.warn("Error while renewing TGT", e);
}
},
tgtRenewalPeriod,
tgtRenewalPeriod,
TimeUnit.MILLISECONDS);
LOG.info("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
}
代碼中有一大段注釋,提及了Hadoop2和3支持keytab的自動認證續(xù)約。Flink 1.17及其以后的版本將周期認證邏輯搬到Flink內(nèi)完成。這樣無論Hadoop如何配置,F(xiàn)link均可以自動續(xù)約,避免了長期運行的作業(yè)因lifetime到期認證失效而停止的問題。
另外,Hadoop的UserGroupInformation的自動續(xù)約實際執(zhí)行的是kinit -R,該方法可最大續(xù)約的時間仍然受Kerberos的max_renewable_life和renew_lifetime的限制。checkTGTAndReloginFromKeytab方法事實上是重新認證而不是續(xù)約,不存在這個問題。
到此為止Flink自動認證相關(guān)邏輯已分析完畢。接下來讀者可能會有如下問題:
- Flink自帶的周期性自動認證功能,認證頻率遠比TGT可能過期的頻率高,是否會影響性能?
- 為什么不依賴Hadoop的自動認證功能,而需要Flink自己去周期執(zhí)行認證?
這兩個問題的答案在附錄代碼分析中。
附錄
checkTGTAndReloginFromKeytab方法
該方法首先檢查是否有必要去重新認證。所以說前面Flink周期調(diào)用該方法,即便是調(diào)用的周期遠遠超過認證過期的頻率,也不會影響性能。具體條件在代碼分析的注釋中。
public synchronized void checkTGTAndReloginFromKeytab() throws IOException {
// 檢查是否啟用安全認證,是否通過kerberos的keytab認證
if (isSecurityEnabled() && this.user.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS && this.isKeytab) {
KerberosTicket tgt = this.getTGT();
// 檢查是否需要刷新
// 即當前時刻是否大于TGT的(end - start) * 0.8 + start。換句話說,在可用時間段的后20%部分才需要refresh
if (tgt == null || shouldRenewImmediatelyForTests || Time.now() >= this.getRefreshTime(tgt)) {
this.reloginFromKeytab();
}
}
}
其中getRefreshTime邏輯已在前面代碼注釋中給出,感興趣的讀者可以看它的源代碼,如下所示:
private long getRefreshTime(KerberosTicket tgt) {
long start = tgt.getStartTime().getTime();
long end = tgt.getEndTime().getTime();
return start + (long) ((end - start) * TICKET_RENEW_WINDOW);
}
這個方法是確保再次認證不會過于頻繁的關(guān)鍵。
reloginFromKeytab方法,再次檢查滿足條件時,登出當前用戶并再次登錄。
@Public
@Evolving
public void reloginFromKeytab() throws IOException {
this.reloginFromKeytab(false);
}
private synchronized void reloginFromKeytab(boolean ignoreTimeElapsed) throws IOException {
// 檢查使用使用kerberos的keytab來認證
if (isSecurityEnabled() && this.user.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS && this.isKeytab) {
long now = Time.now();
// 檢查和上次login的時間差是否過小,由配置項hadoop.kerberos.min.seconds.before.relogin控制
if (shouldRenewImmediatelyForTests || ignoreTimeElapsed || this.hasSufficientTimeElapsed(now)) {
KerberosTicket tgt = this.getTGT();
// 檢查是否需要刷新
// 即當前時刻是否大于TGT的(end - start) * 0.8 + start。換句話說,在可用時間段的后20%部分才需要refresh
if (tgt == null || shouldRenewImmediatelyForTests || now >= this.getRefreshTime(tgt)) {
LoginContext login = this.getLogin();
if (login != null && keytabFile != null) {
long start = 0L;
this.user.setLastLogin(now);
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Initiating logout for " + this.getUserName());
}
// 登出并重新登錄
synchronized(UserGroupInformation.class) {
login.logout();
login = newLoginContext("hadoop-keytab-kerberos", this.getSubject(), new HadoopConfiguration());
if (LOG.isDebugEnabled()) {
LOG.debug("Initiating re-login for " + keytabPrincipal);
}
start = Time.now();
login.login();
this.fixKerberosTicketOrder();
metrics.loginSuccess.add(Time.now() - start);
this.setLogin(login);
}
} catch (LoginException le) {
if (start > 0L) {
metrics.loginFailure.add(Time.now() - start);
}
KerberosAuthException kae = new KerberosAuthException("Login failure", le);
kae.setPrincipal(keytabPrincipal);
kae.setKeytabFile(keytabFile);
throw kae;
}
} else {
throw new KerberosAuthException("loginUserFromKeyTab must be done first");
}
}
}
}
}
spawnAutoRenewalThreadForUserCreds
Hadoop自身提供的自動續(xù)約在spawnAutoRenewalThreadForUserCreds方法中。如下所示:
@InterfaceAudience.Private
@InterfaceStability.Unstable
@VisibleForTesting
void spawnAutoRenewalThreadForUserCreds(boolean force) {
if (!force && (!shouldRelogin() || isFromKeytab())) {
return;
}
//spawn thread only if we have kerb credentials
KerberosTicket tgt = getTGT();
if (tgt == null) {
return;
}
String cmd = conf.get("hadoop.kerberos.kinit.command", "kinit");
long nextRefresh = getRefreshTime(tgt);
executeAutoRenewalTask(getUserName(),
new TicketCacheRenewalRunnable(tgt, cmd, nextRefresh));
}
該方法最后調(diào)用了executeAutoRenewalTask,創(chuàng)建一個定期執(zhí)行的TicketCacheRenewalRunnable任務(wù)。
executeAutoRenewalTask啟動一個單線程線程池,用來執(zhí)行TicketCacheRenewalRunnable邏輯。
private void executeAutoRenewalTask(final String userName,
AutoRenewalForUserCredsRunnable task) {
kerberosLoginRenewalExecutor = Optional.of(
Executors.newSingleThreadExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("TGT Renewer for " + userName);
return t;
}
}
));
kerberosLoginRenewalExecutor.get().submit(task);
}
TicketCacheRenewalRunnable繼承了AutoRenewalForUserCredsRunnable。AutoRenewalForUserCredsRunnable會周期調(diào)用relogin方法。
TicketCacheRenewalRunnable周期執(zhí)行kinit -R命令續(xù)約,然后執(zhí)行reloginFromTicketCache,從ticket cache重新登錄。
@InterfaceAudience.Private
@InterfaceStability.Unstable
@VisibleForTesting
final class TicketCacheRenewalRunnable
extends AutoRenewalForUserCredsRunnable {
private String kinitCmd;
TicketCacheRenewalRunnable(KerberosTicket tgt, String kinitCmd,
long nextRefresh) {
super(tgt, nextRefresh);
this.kinitCmd = kinitCmd;
}
@Override
public void relogin() throws IOException {
String output = Shell.execCommand(kinitCmd, "-R");
if (LOG.isDebugEnabled()) {
LOG.debug("Renewed ticket. kinit output: {}", output);
}
reloginFromTicketCache();
}
}
到這里可以Hadoop自己的自動認證實際上是kinit -R自動續(xù)約。根據(jù)前言中所講,續(xù)約有最長期限限制,不能無限續(xù)約。參見:解決kinit -R因ticket not renewable無法刷新Kerberos憑證-開發(fā)者社區(qū)-阿里云。