Spark HiveThriftServer2 高可用的實(shí)現(xiàn)

Spark HiveThriftServer高可用的問題

spark HiveThriftServer 繼承了HiveServer2,但是卻沒有繼承HiveServer2的HA機(jī)制,現(xiàn)在我們通過修改源碼的方式來實(shí)現(xiàn)HiveThriftServer的高可用,基本原理是在zk上注冊多個服務(wù)的連接地址,與HiveServer2的使用方式相同

涉及類及源碼修改

  • spark HiveThriftServer的入口類為HiveThriftServer2,該object有個main方法入口,我們看下這里的main方法做了什么
  def main(args: Array[String]) {
    // 解析命令行參數(shù)
    Utils.initDaemon(log)
    val optionsProcessor = new HiveServer2.ServerOptionsProcessor("HiveThriftServer2")
    optionsProcessor.parse(args)

    logInfo("Starting SparkContext")
    // 初始化環(huán)境
    SparkSQLEnv.init()

    ShutdownHookManager.addShutdownHook { () =>
      SparkSQLEnv.stop()
      uiTab.foreach(_.detach())
    }

    val executionHive = HiveUtils.newClientForExecution(
      SparkSQLEnv.sqlContext.sparkContext.conf,
      SparkSQLEnv.sqlContext.sessionState.newHadoopConf())

    try {
      /**
        * 實(shí)例化hiveThriftServer2
        */
      val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
      server.init(executionHive.conf)
      server.start()
      logInfo("HiveThriftServer2 started")
      listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
      SparkSQLEnv.sparkContext.addSparkListener(listener)
      uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
        Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
      } else {
        None
      }
      // If application was killed before HiveThriftServer2 start successfully then SparkSubmit
      // process can not exit, so check whether if SparkContext was stopped.
      if (SparkSQLEnv.sparkContext.stopped.get()) {
        logError("SparkContext has stopped even if HiveServer2 has started, so exit")
        System.exit(-1)
      }
    } catch {
      case e: Exception =>
        logError("Error starting HiveThriftServer2", e)
        System.exit(-1)
    }
  }
  • 我們看到這里new了一個HiveThriftServer2的對象,我們進(jìn)這個對象看一下
private[hive] class HiveThriftServer2(sqlContext: SQLContext)
  extends HiveServer2
  with ReflectedCompositeService {
  // state is tracked internally so that the server only attempts to shut down if it successfully
  // started, and then once only.
  private val started = new AtomicBoolean(false)

  // todo: 新加的
  var hiveConf:HiveConf = _
  /**
    * 初始化hiveThriftServer
    */
  override def init(hiveConf: HiveConf) {
    this.hiveConf = hiveConf
    val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext)
    setSuperField(this, "cliService", sparkSqlCliService)
    addService(sparkSqlCliService)

    val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
      new ThriftHttpCLIService(sparkSqlCliService)
    } else {
      new ThriftBinaryCLIService(sparkSqlCliService)
    }

    setSuperField(this, "thriftCLIService", thriftCliService)
    addService(thriftCliService)
    initCompositeService(hiveConf)
  }

  private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
    val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
    transportMode.toLowerCase(Locale.ROOT).equals("http")
  }

  override def start(): Unit = {
    super.start()
    started.set(true)
    /**
      * todo: 這里使用了HiveServer的高可用的配置
      */
    if (this.hiveConf.getBoolVar(
      ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
      /*
      * addServerInstanceToZooKeeper 把hiveServer2注冊到zookeeper
      * */
      invoke(classOf[HiveServer2], this, "addServerInstanceToZooKeeper",
        classOf[HiveConf] -> this.hiveConf)
    }

  }

  override def stop(): Unit = {
    /**
      * todo: 停止的時候,將zookeeper上的注冊信息刪除
      */
    if (started.getAndSet(false)) {
      if (this.hiveConf.getBoolVar(
        ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
        invoke(classOf[HiveServer2], this, "removeServerInstanceFromZooKeeper")
      }
      super.stop()
    }
  }
 }
  • 這里的方法也很簡單,一個初始化,一個start()方法,一個stop()方法,代碼中todo的部分是我新加的代碼,改動也很少
  • 新加hiveConf的成員變量,記錄初始化時的配置,以便后面用
  • start方法: 在zookeeper中注冊服務(wù)信息
  • stop方法: 在zookeeper中刪除注冊信息
  • 這里主要是應(yīng)用了反射,在HiveServer中添加了注冊和刪除zk的信息,我們來到HiveServer2的代碼中看一下如何修改
  /*#################################     新增方法      ############################################*/

  //獲取thriftServer的IP:HOST
  private String getServerInstanceURI() throws Exception {
    if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
      throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
    }

    return getHiveHost() + ":"
        + thriftCLIService.getPortNumber();
  }

  private String getHiveHost() {
    HiveConf hiveConf = thriftCLIService.getHiveConf();
    String hiveHost = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
    if (hiveHost != null && !hiveHost.isEmpty()) {
      return hiveHost;
    } else {
      return thriftCLIService.getServerIPAddress().getHostName();
    }
  }
  
  /**
   * 控制是否需要重新在zookeeper上注冊HiveServer2
   * */
  private boolean deregisteredWithZooKeeper = false;
  private void setDeregisteredWithZooKeeper(boolean deregisteredWithZooKeeper) {
    this.deregisteredWithZooKeeper = deregisteredWithZooKeeper;
  }

  /**
   * zk的監(jiān)控者,如果發(fā)現(xiàn)注冊信息為null,會觸發(fā)監(jiān)控,然后關(guān)掉當(dāng)前注冊hiveServer2的實(shí)例信息
   */
  private PersistentEphemeralNode znode;
  private class DeRegisterWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
      if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
        if (znode != null) {
          try {
            znode.close();
            LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
                + "The server will be shut down after the last client sesssion completes.");
          } catch (IOException e) {
            LOG.error("Failed to close the persistent ephemeral znode", e);
          } finally {
            HiveServer2.this.setDeregisteredWithZooKeeper(true);
            // 如果當(dāng)前已經(jīng)沒有可用的服務(wù),那么就把HiveServer2關(guān)閉掉
            if (cliService.getSessionManager().getOpenSessionCount() == 0) {
              LOG.warn("This instance of HiveServer2 has been removed from the list of server "
                  + "instances available for dynamic service discovery. "
                  + "The last client session has ended - will shutdown now.");
              HiveServer2.this.stop();
            }
          }
        }
      }
    }
  }

  private CuratorFramework zooKeeperClient;
  private String znodePath;
  /**
   * 把服務(wù)注冊到zookeeper中
   * @param hiveConf
   * @throws Exception
   */
  private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
    //從hiveConf中獲取zookeeper地址
    String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
    //從hive-site.xml中獲取hive.server2.zookeeper.namespace的配置信息
    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
    //獲取用戶提供的thriftServer地址
    String instanceURI = getServerInstanceURI();
    
    //獲取hive連接zookeeper的session超時時間
    int sessionTimeout =
        (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
            TimeUnit.MILLISECONDS);
    //hive連接zookeeper的等待時間
    int baseSleepTime =
        (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
            TimeUnit.MILLISECONDS);
    //hive連接zookeeper的最大重試次數(shù)
    int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
    // 獲取zookeeper客戶端
    zooKeeperClient =
        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
            .sessionTimeoutMs(sessionTimeout)
            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
    //啟動zookeeper客戶端
    zooKeeperClient.start();
    //TODO 在zookeeper上根據(jù)rootNamespace創(chuàng)建一個空間(用來存儲數(shù)據(jù)的文件夾)
    try {
      zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
          .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
      LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
    } catch (KeeperException e) {
      if (e.code() != KeeperException.Code.NODEEXISTS) {
        LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
        throw e;
      }
    }
    //TODO 把hiveserver2的信息注冊到rootNamespace下:
    // serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005
    try {
      String pathPrefix =
          ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
              + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
              + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
      byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
      znode =
          new PersistentEphemeralNode(zooKeeperClient,
              PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
      znode.start();
      // We'll wait for 120s for node creation
      long znodeCreationTimeout = 120;
      if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
        throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
      }
      setDeregisteredWithZooKeeper(false);
      znodePath = znode.getActualPath();
      // TODO 添加zk的watch , 如果服務(wù)不見了,需要第一時間watche到
      if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
        // No node exists, throw exception
        throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
      }
      LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
    } catch (Exception e) {
      LOG.fatal("Unable to create a znode for this server instance", e);
      if (znode != null) {
        znode.close();
      }
      throw (e);
    }
  }

  //移除znode,代表當(dāng)前程序關(guān)閉
  private void removeServerInstanceFromZooKeeper() throws Exception {
    setDeregisteredWithZooKeeper(true);

    if (znode != null) {
      znode.close();
    }
    zooKeeperClient.close();
    LOG.info("Server instance removed from ZooKeeper.");
  }
  • 這里主要是對HiveServer2添加了兩個方法,及HiveThriftServer2中通過反射調(diào)用的兩個方法,分別是在啟動的時候,在zk指定的地址上注冊連接信息,以及停止的時候刪除對應(yīng)位置的連接信息。
  • 新建了一個zkWatcher監(jiān)控
  • 注意這里zk存儲的數(shù)據(jù)信息,必須是: serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005的形式,主要是為了復(fù)用hiveServer2的高可用

編譯

mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Dscala-2.11 -DskipTests clean package 
  • 得到的hive-thriftServer.jar包替換spark lib目錄下的jar包即可

相關(guān)配置

  • 這里的高可用的配置與hiveServer2的高可用配置是一致的,在hive-site.xml文件中添加
    <name>hive.server2.support.dynamic.service.discovery</name>
    <value>true</value>
</property>
<property>
    <name>hive.server2.zookeeper.namespace</name>
    <value>hiveserver2_zk</value>
</property>

<property
    <name>hive.zookeeper.quorum</name>
    <value>cdh1:2181,cdh2:2181,cdh3:2181</value>
</property>
<property>
    <name>hive.zookeeper.client.port</name>
    <value>2181</value>
</property>
<property>
    <name>hive.server2.thrift.bind.host</name>
    <value>cdh1</value>
</property>
  • 另外,因?yàn)槲覀冃枰詓park的sbin/start-thriftserver.sh腳本啟動進(jìn)程,腳本中調(diào)用了spark-daemon.sh腳本實(shí)際啟動任務(wù),而這個腳本中的有關(guān)于重復(fù)應(yīng)用的檢查,即同一臺機(jī)器上啟動多個相同應(yīng)用會報錯,如果我們要在同一臺機(jī)器上啟動兩個hiveThriftServer的話,我們需要對spark-daemon.sh做一些修改
# 屏蔽以下腳本###########################################
# if [ -f "$pid" ]; then
#   TARGET_ID="$(cat "$pid")"
#   if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
#     echo "$command running as process $TARGET_ID.  Stop it first."
#     exit 1
#   fi
# fi
########################################################

啟動

  • 啟動命令
sbin/start-thriftserver.sh \
--master yarn \
--conf spark.driver.memory=1G \
--executor-memory 1G \
--num-executors 1 \
--hiveconf hive.server2.thrift.port=10002
  • beeline連接命令
!connect jdbc:hive2://cdh1:2181,cdh2:2181,cdh3:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容