導(dǎo)言
隨著大數(shù)據(jù)集群的使用,大數(shù)據(jù)的安全受到越來(lái)越多的關(guān)注一個(gè)安全的大數(shù)據(jù)集群的使用,運(yùn)維必普通的集群更為復(fù)雜。
集群的安全通常基于kerberos集群完成安全認(rèn)證。kerberos基本原理可參考:一張圖了解Kerberos訪問(wèn)流程
Spark應(yīng)用(On Yarn模式下)在安全的hadoop集群下的訪問(wèn),需要訪問(wèn)各種各樣的組件/進(jìn)程,如ResourceManager,NodeManager,NameNode,DataNode,Kafka,Hmaster,HregionServer,MetaStore等等。尤其是在長(zhǎng)時(shí)運(yùn)行的應(yīng)用,如sparkStreaming,StructedStreaming,如何保證用戶認(rèn)證后的長(zhǎng)期有效性,其安全/認(rèn)證更為復(fù)雜。
一個(gè)Spark應(yīng)用提交用戶要先在kdc中完成用戶的認(rèn)證,及拿到對(duì)應(yīng)service服務(wù)的票據(jù)之后才能訪問(wèn)對(duì)應(yīng)的服務(wù)。由于Spark應(yīng)用運(yùn)行時(shí)涉及yarnclient,driver,applicationMaster,executor等多個(gè)服務(wù),這其中每個(gè)進(jìn)程都應(yīng)當(dāng)是同一個(gè)用戶啟動(dòng)并運(yùn)行,這就涉及到多個(gè)進(jìn)程中使用同一個(gè)用戶的票據(jù)來(lái)對(duì)各種服務(wù)進(jìn)行訪問(wèn),本文基于Spark2.3對(duì)此做簡(jiǎn)要分析。
- spark應(yīng)用包含進(jìn)程
| 進(jìn)程 | 功能 | yarn-client模式 | yarn-cluster模式 |
|---|---|---|---|
| yarnclient | Spark應(yīng)用提交app的模塊 | yarn-client模式下生命周期與driver一致; | yarn-cluster模式下可以設(shè)置為app提交后即退出,或者提交后一直監(jiān)控app運(yùn)行狀態(tài) |
| driver | spark應(yīng)用驅(qū)動(dòng)器,調(diào)度應(yīng)用邏輯,應(yīng)用的“大腦” | yarn-client模式下運(yùn)行在yarnclient的JVM中; | yarn-cluster模式下運(yùn)行在applicationMaster中 |
| applicationMaster | 基于yarn服務(wù)抽象出的app管理者 | yarn-client模式下僅僅負(fù)責(zé)啟動(dòng)/監(jiān)控container,匯報(bào)應(yīng)用狀態(tài)的功能; | yarn-cluster模式下負(fù)責(zé)啟動(dòng)/監(jiān)控container,匯報(bào)應(yīng)用狀態(tài)的功,同時(shí)包含driver功能 |
| Executor | spark應(yīng)用的執(zhí)行器,yarn應(yīng)用的container實(shí)體,業(yè)務(wù)邏輯的實(shí)際執(zhí)行者 |
spark應(yīng)用的提交用戶認(rèn)證之后才能提交應(yīng)用,所以在yarnclient/driver的邏輯中必然會(huì)執(zhí)行到kerberos認(rèn)證相關(guān)的登錄認(rèn)證。然而其他的進(jìn)程如applicationMaster,executor等均需要經(jīng)過(guò)認(rèn)證,應(yīng)用提交后才由用戶啟動(dòng),這些進(jìn)程則可以不進(jìn)行kerberos認(rèn)證而是利用Hadoop的token機(jī)制完成認(rèn)證,減小kerberos服務(wù)壓力,同時(shí)提高訪問(wèn)效率。
- Hadoop Token機(jī)制
Hadoop的token實(shí)現(xiàn)基類為org.apache.hadoop.security.token.Token,
/**
* Construct a token from the components.
* @param identifier the token identifier
* @param password the token's password
* @param kind the kind of token
* @param service the service for this token
*/
public Token(byte[] identifier, byte[] password, Text kind, Text service) {
this.identifier = identifier;
this.password = password;
this.kind = kind;
this.service = service;
}
不同的服務(wù)也可hadoop的token來(lái)交互,只要使用不同的identifer來(lái)區(qū)分token即可。 如NMTokenIdentifier, AMRMTokenIdentifier,AuthenticationTokenIdentifier等不同的tokenIdentifier來(lái)區(qū)分不同的服務(wù)類型的token。
Spark應(yīng)用各進(jìn)程的安全實(shí)現(xiàn)
yarnclient的實(shí)現(xiàn)
此處yarnclient指的是向ResourceManager提交yarn應(yīng)用的客戶端。在spark中,向yarn提交應(yīng)用有兩種應(yīng)用有yarn-client,yarn-cluster模式。在這兩種應(yīng)用模式下提交應(yīng)用,yarn client邏輯有些許不同。
安全hadoop場(chǎng)景下spark的用戶登錄認(rèn)證機(jī)制
-
spark提交應(yīng)用時(shí),通過(guò)--principal, --keytab參數(shù)傳入認(rèn)證所需文件。
在sparkSubmit中prepareSubmitEnvironment時(shí),完成認(rèn)證// assure a keytab is available from any place in a JVM if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) { if (args.principal != null) { if (args.keytab != null) { require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") // Add keytab and principal configurations in sysProps to make them available // for later use; e.g. in spark sql, the isolated class loader used to talk // to HiveMetastore will use these settings. They will be set as Java system // properties and then loaded by SparkConf sparkConf.set(KEYTAB, args.keytab) sparkConf.set(PRINCIPAL, args.principal) UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) } } } 在yarn-cluster模式下,不會(huì)調(diào)用業(yè)務(wù)層代碼,即不會(huì)初始化SparkContext,其通過(guò)YarnClusterApplication的start方法調(diào)用client.submitApplication提交應(yīng)用
在yarn-client模式下,會(huì)在yarnclient邏輯中調(diào)用業(yè)務(wù)代碼,即會(huì)初始化并運(yùn)行SparkContext,通過(guò)YarnClientSchedulerBackend其調(diào)度client.submitApplication提交應(yīng)用。
在client的submitApplication方法中提交app,之后創(chuàng)建amContext,準(zhǔn)備本地資源,此時(shí)會(huì)將本地的文件上傳至HDFS,其中就包括keytab文件,同時(shí)會(huì)生成spark_conf.properties配置文件以供am使用,該配置文件中會(huì)包含keytab的配置
val props = new Properties()
sparkConf.getAll.foreach { case (k, v) =>
props.setProperty(k, v)
}
// Override spark.yarn.key to point to the location in distributed cache which will be used
// by AM.
Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) }
其中的amKeytabFileName是在setUpCredentials時(shí)設(shè)置如下,該值為指定的keytab文件加上隨機(jī)的字符串后綴,騎在am重點(diǎn)使用,可參考下節(jié)的介紹。
val f = new File(keytab)
// Generate a file name that can be used for the keytab file, that does not conflict
// with any user file.
amKeytabFileName = f.getName + "-" + UUID.randomUUID().toString
sparkConf.set(PRINCIPAL.key, principal)
獲取相關(guān)組件的token,注意:此處的token均非與yarn服務(wù)交互相關(guān)token,這里只有與HDFS,HBASE,Hive服務(wù)交互的token。
def obtainDelegationTokens(
hadoopConf: Configuration,
creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
// 各provider的obtainDelegationTokens方法中,會(huì)獲取對(duì)應(yīng)組件的token,并放入credentials中
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
} else {
logDebug(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
None
}
}.foldLeft(Long.MaxValue)(math.min)
}
Spark中常訪問(wèn)的服務(wù)使用token機(jī)制的有hive,hbase,hdfs,對(duì)應(yīng)的tokenProvider如下:
| 服務(wù) | tokenProvider | token獲取類 | token獲取方法 |
|---|---|---|---|
| HDFS | HadoopFSDelegationTokenProvider | org.apache.hadoop.hbase.security.token.TokenUtil | obtainToken |
| HIVE | HiveDelegationTokenProvider | org.apache.hadoop.hive.ql.metadata | getDelegationToken |
| HBASE | HBaseDelegationTokenProvider | org.apache.hadoop.hdfs.DistributedFileSystem | addDelegationTokens |
以HbaseDelegationTokenProvider為例,主要是通過(guò)反射調(diào)用hbase的TokenUtil類的obtainTOken方法,對(duì)應(yīng)的obtainDelegationTokens方法如下:
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
try {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
val obtainToken = mirror.classLoader.
loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
getMethod("obtainToken", classOf[Configuration])
logDebug("Attempting to fetch HBase security token.")
val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
.asInstanceOf[Token[_ <: TokenIdentifier]]
logInfo(s"Get token from HBase: ${token.toString}")
creds.addToken(token.getService, token)
} catch {
case NonFatal(e) =>
logDebug(s"Failed to get token from service $serviceName", e)
}
None
}
PS : HBase的token獲取的用戶需要具有hbase:meta表的exec權(quán)限,否則無(wú)法成功獲取token
在獲取token后,將token設(shè)置到amContainer中,并放入appContext中
private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
val dob = new DataOutputBuffer
credentials.writeTokenStorageToStream(dob)
amContainer.setTokens(ByteBuffer.wrap(dob.getData))
}
//
appContext.setAMContainerSpec(containerContext)
driver的token更新
在yarn-client模式下,driver在yarnclient進(jìn)程中啟動(dòng),同樣需要訪問(wèn)業(yè)務(wù)層及集群的相關(guān)組件如hdfs。driver通過(guò)讀取am更新在hdfs路徑下的credentials文件來(lái)保證driver節(jié)點(diǎn)的token有效。
// SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
// reads the credentials from HDFS, just like the executors and updates its own credentials
// cache.
if (conf.contains("spark.yarn.credentials.file")) {
YarnSparkHadoopUtil.startCredentialUpdater(conf)
}
在yarn-cluster模式下,driver運(yùn)行在applicationMaster的JVM中,其安全相關(guān)由Am同一操作
ApplicationMaster 的安全認(rèn)證
applicationMaster是Yarn進(jìn)行應(yīng)用調(diào)度/管理的核心,需要與RM/NM等進(jìn)行交互以便應(yīng)用運(yùn)行。其中相關(guān)的交互均通過(guò)token完成認(rèn)證,認(rèn)證實(shí)現(xiàn)由Yarn內(nèi)部框架完成。查看am日志發(fā)現(xiàn),即是在非安全(非kerberos)的場(chǎng)景下,同樣會(huì)使用到token。而與hdfs,hbase等服務(wù)交互使用的token則需Spark框架來(lái)實(shí)現(xiàn)。
applicationMaster中與YARN相關(guān)的認(rèn)證
- AM與RM的認(rèn)證
在ResourceManager接收到應(yīng)用提交的ApplicationSubmissionContext后,在其AmLauncher.java的run方法中為am設(shè)置生成“YARN_AM_RM_TOKEN,該token用于am于rm通信使用”
public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
ApplicationAttemptId appAttemptId) {
this.writeLock.lock();
try {
LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
AMRMTokenIdentifier identifier =
new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey()
.getKeyId());
byte[] password = this.createPassword(identifier);
appAttemptSet.add(appAttemptId);
return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,
identifier.getKind(), new Text());
} finally {
this.writeLock.unlock();
}
}
- AM與NM的認(rèn)證
Am在啟動(dòng)之后,會(huì)向ResourceManager申請(qǐng)container,并與對(duì)應(yīng)的NodeManager通信以啟動(dòng)container。然而AM與NM通信的token是如何得到的呢?
查看AMRMClientImpl類可以看到,AM向RM發(fā)送分配請(qǐng)求,RM接收到請(qǐng)求后,將container要分配至的NM節(jié)點(diǎn)的Token放置response中返回給AM。Am接收到response后,會(huì)保存NMToken,并判定是否需要更新YARN_AM_RM_TOKEN
//通過(guò)rmClient向RM發(fā)送分配請(qǐng)求
allocateResponse = rmClient.allocate(allocateRequest);
//拿到response后,保存NMToken并根據(jù)response判定是否需要更新AMRM通信的TOken
if (!allocateResponse.getNMTokens().isEmpty()) {
populateNMTokens(allocateResponse.getNMTokens());
}
if (allocateResponse.getAMRMToken() != null) {
updateAMRMToken(allocateResponse.getAMRMToken());
}
RM通過(guò)ApplicationMasterService響應(yīng)allocation請(qǐng)求
// 通過(guò)調(diào)度器為cotnainer分配NodeManager并生成該NodeManager的Token放入allcation中
Allocation allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals);
......
if (!allocation.getContainers().isEmpty()) {
allocateResponse.setNMTokens(allocation.getNMTokens());
}
AM在準(zhǔn)備啟動(dòng)container時(shí),將當(dāng)前用戶的token都設(shè)置進(jìn)ContainerLaunchContext中
def startContainer(): java.util.Map[String, ByteBuffer] = {
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
val env = prepareEnvironment().asJava
ctx.setLocalResources(localResources.asJava)
ctx.setEnvironment(env)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
ApplicationMaster業(yè)務(wù)相關(guān)的服務(wù)的token更新
Am啟動(dòng)的資源情況
查看Am啟動(dòng)命令大致如下,可以發(fā)現(xiàn)有指定配置文件,而該配置文件即為yarnclient生成上傳至hdfs,在am啟動(dòng)前由NodeManager從hdfs中copy至本地路徑,供container使用:
/usr/jdk64/jdk1.8.0_77//bin/java -server -Xmx512m -Djava.io.tmpdir=/localpath/*/tmp -Dspark.yarn.app.container.log.dir=/localpath/*/ org.apache.spark.deploy.yarn.ExecutorLauncher --arg host:port --properties-file /localpath/*/__spark_conf__/__spark_conf__.properties
查看此配置文件可以看到有如下配置項(xiàng):
spark.yarn.principal=ocsp-ygcluster@ASIAINFO.COM
spark.yarn.keytab=hbase.headless.keytab-18f29b79-b7a6-4cb2-b79d-4305432a5e9a
下圖為am進(jìn)程使用到的資源文件

如上可以看出,am雖然運(yùn)行在集群中,但運(yùn)行時(shí)認(rèn)證相關(guān)的資源已經(jīng)準(zhǔn)備就緒。下面分析其運(yùn)行中關(guān)于安全的邏輯
Am安全認(rèn)證及token更新邏輯
在applicationMaster中,定期更新token,并寫入文件到hdfs的相關(guān)目錄,并清理舊文件以供各executor使用。
在ApplicationMaster啟動(dòng)后,進(jìn)行l(wèi)ogin登錄并啟動(dòng)名為am-kerberos-renewer的dameon線程定期登錄,保證用戶認(rèn)證的有效性
private val ugi = {
val original = UserGroupInformation.getCurrentUser()
// If a principal and keytab were provided, log in to kerberos, and set up a thread to
// renew the kerberos ticket when needed. Because the UGI API does not expose the TTL
// of the TGT, use a configuration to define how often to check that a relogin is necessary.
// checkTGTAndReloginFromKeytab() is a no-op if the relogin is not yet needed.
val principal = sparkConf.get(PRINCIPAL).orNull
val keytab = sparkConf.get(KEYTAB).orNull
if (principal != null && keytab != null) {
UserGroupInformation.loginUserFromKeytab(principal, keytab)
val renewer = new Thread() {
override def run(): Unit = Utils.tryLogNonFatalError {
while (true) {
TimeUnit.SECONDS.sleep(sparkConf.get(KERBEROS_RELOGIN_PERIOD))
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab()
}
}
}
renewer.setName("am-kerberos-renewer")
renewer.setDaemon(true)
renewer.start()
// Transfer the original user's tokens to the new user, since that's needed to connect to
// YARN. It also copies over any delegation tokens that might have been created by the
// client, which will then be transferred over when starting executors (until new ones
// are created by the periodic task).
val newUser = UserGroupInformation.getCurrentUser()
SparkHadoopUtil.get.transferCredentials(original, newUser)
newUser
} else {
SparkHadoopUtil.get.createSparkUser()
}
}
在am中啟動(dòng)AMCredentialRenewerStarter線程,調(diào)度認(rèn)證登錄及token renew邏輯
if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
val credentialRenewerThread = new Thread {
setName("AMCredentialRenewerStarter")
setContextClassLoader(userClassLoader)
override def run(): Unit = {
val credentialManager = new YARNHadoopDelegationTokenManager(
sparkConf,
yarnConf,
conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
val credentialRenewer =
new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
credentialRenewer.scheduleLoginFromKeytab()
}
}
credentialRenewerThread.start()
credentialRenewerThread.join()
}
在scheduleLoginFromKeytab中,會(huì)周期調(diào)度登錄,token獲取更新寫入hdfs文件等操作。
其核心邏輯如下
調(diào)度周期:
各種組件的token更新周期如hdfs的更新周期dfs.namenode.delegation.token.renew-interval默認(rèn)為1天,hbase的token更新周期hbase.auth.key.update.interval默認(rèn)為1天;調(diào)度更新的周期為如上各組件最小值的75%,
調(diào)度流程:
//將生成的token寫入hdfs目錄${spark.yarn.credentials.file}-${timeStamp}-${nextSuffix}
writeNewCredentialsToHDFS(principal, keytab)
//刪除邏輯為保留五個(gè)(${spark.yarn.credentials.file.retention.count})文件,文件更新時(shí)間早于五天(${spark.yarn.credentials.file.retention.days})的全部清理
cleanupOldFiles()
Executor的認(rèn)證機(jī)制
executor的認(rèn)證同樣使用的是token機(jī)制。executor啟動(dòng)之后,根據(jù)driver啟動(dòng)設(shè)置的${spark.yarn.credentials.file}啟動(dòng)token更新:
if (driverConf.contains("spark.yarn.credentials.file")) {
logInfo("Will periodically update credentials from: " +
driverConf.get("spark.yarn.credentials.file"))
Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
.getMethod("startCredentialUpdater", classOf[SparkConf])
.invoke(null, driverConf)
}
Executor中的token更新是讀取hdfs目錄{timeStamp}-${nextSuffix}目錄下的文件,讀取到緩存中,以便保證讀取到的是更新后的token使用。
安全Spark的使用
Spark框架完成的kerberos認(rèn)證及使用token與其他服務(wù)交互的機(jī)制使用較為簡(jiǎn)單,只需要在提交應(yīng)用時(shí)的spark-submit命令行中加入--principal appuserName --keytab /path/to/user.keytab即可