Spark HiveThriftServer高可用的問題
spark HiveThriftServer 繼承了HiveServer2,但是卻沒有繼承HiveServer2的HA機(jī)制,現(xiàn)在我們通過修改源碼的方式來實(shí)現(xiàn)HiveThriftServer的高可用,基本原理是在zk上注冊多個服務(wù)的連接地址,與HiveServer2的使用方式相同
涉及類及源碼修改
- spark HiveThriftServer的入口類為HiveThriftServer2,該object有個main方法入口,我們看下這里的main方法做了什么
def main(args: Array[String]) {
// 解析命令行參數(shù)
Utils.initDaemon(log)
val optionsProcessor = new HiveServer2.ServerOptionsProcessor("HiveThriftServer2")
optionsProcessor.parse(args)
logInfo("Starting SparkContext")
// 初始化環(huán)境
SparkSQLEnv.init()
ShutdownHookManager.addShutdownHook { () =>
SparkSQLEnv.stop()
uiTab.foreach(_.detach())
}
val executionHive = HiveUtils.newClientForExecution(
SparkSQLEnv.sqlContext.sparkContext.conf,
SparkSQLEnv.sqlContext.sessionState.newHadoopConf())
try {
/**
* 實(shí)例化hiveThriftServer2
*/
val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
SparkSQLEnv.sparkContext.addSparkListener(listener)
uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
} else {
None
}
// If application was killed before HiveThriftServer2 start successfully then SparkSubmit
// process can not exit, so check whether if SparkContext was stopped.
if (SparkSQLEnv.sparkContext.stopped.get()) {
logError("SparkContext has stopped even if HiveServer2 has started, so exit")
System.exit(-1)
}
} catch {
case e: Exception =>
logError("Error starting HiveThriftServer2", e)
System.exit(-1)
}
}
- 我們看到這里new了一個HiveThriftServer2的對象,我們進(jìn)這個對象看一下
private[hive] class HiveThriftServer2(sqlContext: SQLContext)
extends HiveServer2
with ReflectedCompositeService {
// state is tracked internally so that the server only attempts to shut down if it successfully
// started, and then once only.
private val started = new AtomicBoolean(false)
// todo: 新加的
var hiveConf:HiveConf = _
/**
* 初始化hiveThriftServer
*/
override def init(hiveConf: HiveConf) {
this.hiveConf = hiveConf
val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext)
setSuperField(this, "cliService", sparkSqlCliService)
addService(sparkSqlCliService)
val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
new ThriftHttpCLIService(sparkSqlCliService)
} else {
new ThriftBinaryCLIService(sparkSqlCliService)
}
setSuperField(this, "thriftCLIService", thriftCliService)
addService(thriftCliService)
initCompositeService(hiveConf)
}
private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
transportMode.toLowerCase(Locale.ROOT).equals("http")
}
override def start(): Unit = {
super.start()
started.set(true)
/**
* todo: 這里使用了HiveServer的高可用的配置
*/
if (this.hiveConf.getBoolVar(
ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
/*
* addServerInstanceToZooKeeper 把hiveServer2注冊到zookeeper
* */
invoke(classOf[HiveServer2], this, "addServerInstanceToZooKeeper",
classOf[HiveConf] -> this.hiveConf)
}
}
override def stop(): Unit = {
/**
* todo: 停止的時候,將zookeeper上的注冊信息刪除
*/
if (started.getAndSet(false)) {
if (this.hiveConf.getBoolVar(
ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
invoke(classOf[HiveServer2], this, "removeServerInstanceFromZooKeeper")
}
super.stop()
}
}
}
- 這里的方法也很簡單,一個初始化,一個start()方法,一個stop()方法,代碼中todo的部分是我新加的代碼,改動也很少
- 新加hiveConf的成員變量,記錄初始化時的配置,以便后面用
- start方法: 在zookeeper中注冊服務(wù)信息
- stop方法: 在zookeeper中刪除注冊信息
- 這里主要是應(yīng)用了反射,在HiveServer中添加了注冊和刪除zk的信息,我們來到HiveServer2的代碼中看一下如何修改
/*################################# 新增方法 ############################################*/
//獲取thriftServer的IP:HOST
private String getServerInstanceURI() throws Exception {
if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
}
return getHiveHost() + ":"
+ thriftCLIService.getPortNumber();
}
private String getHiveHost() {
HiveConf hiveConf = thriftCLIService.getHiveConf();
String hiveHost = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
if (hiveHost != null && !hiveHost.isEmpty()) {
return hiveHost;
} else {
return thriftCLIService.getServerIPAddress().getHostName();
}
}
/**
* 控制是否需要重新在zookeeper上注冊HiveServer2
* */
private boolean deregisteredWithZooKeeper = false;
private void setDeregisteredWithZooKeeper(boolean deregisteredWithZooKeeper) {
this.deregisteredWithZooKeeper = deregisteredWithZooKeeper;
}
/**
* zk的監(jiān)控者,如果發(fā)現(xiàn)注冊信息為null,會觸發(fā)監(jiān)控,然后關(guān)掉當(dāng)前注冊hiveServer2的實(shí)例信息
*/
private PersistentEphemeralNode znode;
private class DeRegisterWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
if (znode != null) {
try {
znode.close();
LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
+ "The server will be shut down after the last client sesssion completes.");
} catch (IOException e) {
LOG.error("Failed to close the persistent ephemeral znode", e);
} finally {
HiveServer2.this.setDeregisteredWithZooKeeper(true);
// 如果當(dāng)前已經(jīng)沒有可用的服務(wù),那么就把HiveServer2關(guān)閉掉
if (cliService.getSessionManager().getOpenSessionCount() == 0) {
LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+ "instances available for dynamic service discovery. "
+ "The last client session has ended - will shutdown now.");
HiveServer2.this.stop();
}
}
}
}
}
}
private CuratorFramework zooKeeperClient;
private String znodePath;
/**
* 把服務(wù)注冊到zookeeper中
* @param hiveConf
* @throws Exception
*/
private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
//從hiveConf中獲取zookeeper地址
String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
//從hive-site.xml中獲取hive.server2.zookeeper.namespace的配置信息
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
//獲取用戶提供的thriftServer地址
String instanceURI = getServerInstanceURI();
//獲取hive連接zookeeper的session超時時間
int sessionTimeout =
(int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
TimeUnit.MILLISECONDS);
//hive連接zookeeper的等待時間
int baseSleepTime =
(int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
TimeUnit.MILLISECONDS);
//hive連接zookeeper的最大重試次數(shù)
int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
// 獲取zookeeper客戶端
zooKeeperClient =
CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
//啟動zookeeper客戶端
zooKeeperClient.start();
//TODO 在zookeeper上根據(jù)rootNamespace創(chuàng)建一個空間(用來存儲數(shù)據(jù)的文件夾)
try {
zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
} catch (KeeperException e) {
if (e.code() != KeeperException.Code.NODEEXISTS) {
LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
throw e;
}
}
//TODO 把hiveserver2的信息注冊到rootNamespace下:
// serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005
try {
String pathPrefix =
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
+ "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
znode =
new PersistentEphemeralNode(zooKeeperClient,
PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
znode.start();
// We'll wait for 120s for node creation
long znodeCreationTimeout = 120;
if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
}
setDeregisteredWithZooKeeper(false);
znodePath = znode.getActualPath();
// TODO 添加zk的watch , 如果服務(wù)不見了,需要第一時間watche到
if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
// No node exists, throw exception
throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
}
LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
} catch (Exception e) {
LOG.fatal("Unable to create a znode for this server instance", e);
if (znode != null) {
znode.close();
}
throw (e);
}
}
//移除znode,代表當(dāng)前程序關(guān)閉
private void removeServerInstanceFromZooKeeper() throws Exception {
setDeregisteredWithZooKeeper(true);
if (znode != null) {
znode.close();
}
zooKeeperClient.close();
LOG.info("Server instance removed from ZooKeeper.");
}
- 這里主要是對HiveServer2添加了兩個方法,及HiveThriftServer2中通過反射調(diào)用的兩個方法,分別是在啟動的時候,在zk指定的地址上注冊連接信息,以及停止的時候刪除對應(yīng)位置的連接信息。
- 新建了一個zkWatcher監(jiān)控
- 注意這里zk存儲的數(shù)據(jù)信息,必須是:
serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005的形式,主要是為了復(fù)用hiveServer2的高可用
編譯
mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Dscala-2.11 -DskipTests clean package
- 得到的hive-thriftServer.jar包替換spark lib目錄下的jar包即可
相關(guān)配置
- 這里的高可用的配置與hiveServer2的高可用配置是一致的,在hive-site.xml文件中添加
<name>hive.server2.support.dynamic.service.discovery</name>
<value>true</value>
</property>
<property>
<name>hive.server2.zookeeper.namespace</name>
<value>hiveserver2_zk</value>
</property>
<property
<name>hive.zookeeper.quorum</name>
<value>cdh1:2181,cdh2:2181,cdh3:2181</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>cdh1</value>
</property>
- 另外,因?yàn)槲覀冃枰詓park的
sbin/start-thriftserver.sh腳本啟動進(jìn)程,腳本中調(diào)用了spark-daemon.sh腳本實(shí)際啟動任務(wù),而這個腳本中的有關(guān)于重復(fù)應(yīng)用的檢查,即同一臺機(jī)器上啟動多個相同應(yīng)用會報錯,如果我們要在同一臺機(jī)器上啟動兩個hiveThriftServer的話,我們需要對spark-daemon.sh做一些修改
# 屏蔽以下腳本###########################################
# if [ -f "$pid" ]; then
# TARGET_ID="$(cat "$pid")"
# if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
# echo "$command running as process $TARGET_ID. Stop it first."
# exit 1
# fi
# fi
########################################################
啟動
- 啟動命令
sbin/start-thriftserver.sh \
--master yarn \
--conf spark.driver.memory=1G \
--executor-memory 1G \
--num-executors 1 \
--hiveconf hive.server2.thrift.port=10002
- beeline連接命令
!connect jdbc:hive2://cdh1:2181,cdh2:2181,cdh3:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk