1 問題
用戶有大量的并發(fā) beeline hive sql 任務,偶發(fā) Unable to read HiveServer2 uri from ZooKeeper 報錯。
hive 版本:hdp 1.2.1
2 解決方案
修改 beeline connect url 增加 retries。
beeline url 修改前:
jdbc:hive2://**:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;principal=hive/_HOST@KDC;auth=kerberos
beeline url 修改后:
jdbc:hive2://**:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;principal=hive/_HOST@KDC;auth=kerberos;retries=5;
3 問題原因及分析
通過閱讀源碼,hiveserver2 連接步驟大致如下:
1)獲取連接 url,如果是 serviceDiscoveryMode=zooKeeper,動態(tài)的方式獲取真正的連接信息與配置
2)通過 ZK 獲取 hiveserver2 所有節(jié)點,對應的路徑為/hiveserver2/。路徑格式為:"/" + zooKeeperNamespace:這個 zooKeeperNamespace 就是連接串里面配置的 hiveserver2。
3)從 list 中隨機取一個 Znode,獲取 zk 中 Znode 的值
4)解析 url ,獲取真正的 hiveserver2 的地址與端口等信息
5)創(chuàng)建連接,支持重試
問題在于:
- 通過代碼邏輯判斷,第 3 步獲取 znode 數(shù)據(jù)為空(可能是高并發(fā)請求導致的問題,zk 日志沒有相關的信息),導致拋出 unable to read HiveServer2 uri from ZooKeeper: 異常
- 連接串里 retries 沒有配置,默認是 1,所以一次失敗就返回退出了。
源碼
static void configureConnParams(JdbcConnectionParams connParams)
throws ZooKeeperHiveClientException {
String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
String zooKeeperNamespace =
connParams.getSessionVars().get(JdbcConnectionParams.ZOOKEEPER_NAMESPACE);
if ((zooKeeperNamespace == null) || (zooKeeperNamespace.isEmpty())) {
zooKeeperNamespace = JdbcConnectionParams.ZOOKEEPER_DEFAULT_NAMESPACE;
}
List<String> serverHosts;
Random randomizer = new Random();
String serverNode;
CuratorFramework zooKeeperClient =
CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
try {
zooKeeperClient.start();
serverHosts = zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace);
// Remove the znodes we've already tried from this list
serverHosts.removeAll(connParams.getRejectedHostZnodePaths());
if (serverHosts.isEmpty()) {
throw new ZooKeeperHiveClientException(
"Tried all existing HiveServer2 uris from ZooKeeper.");
}
// Now pick a server node randomly
serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size()));
connParams.setCurrentHostZnodePath(serverNode);
// Read data from the znode for this server node
// This data could be either config string (new releases) or server end
// point (old releases)
String dataStr =
new String(
zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode),
Charset.forName("UTF-8"));
Matcher matcher = kvPattern.matcher(dataStr);
// If dataStr is not null and dataStr is not a KV pattern,
// it must be the server uri added by an older version HS2
if ((dataStr != null) && (!matcher.find())) {
String[] split = dataStr.split(":");
if (split.length != 2) {
throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper: "
+ dataStr);
}
connParams.setHost(split[0]);
connParams.setPort(Integer.parseInt(split[1]));
} else {
applyConfs(dataStr, connParams);
}
} catch (Exception e) {
throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e);
} finally {
// Close the client connection with ZooKeeper
if (zooKeeperClient != null) {
zooKeeperClient.close();
}
}
}
for (int numRetries = 0;;) {
try {
assumeSubject =
JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap
.get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE));
transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport();
if (!transport.isOpen()) {
transport.open();
logZkDiscoveryMessage("Connected to " + connParams.getHost() + ":" + connParams.getPort());
}
break;
} catch (TTransportException e) {
LOG.warn("Failed to connect to " + connParams.getHost() + ":" + connParams.getPort());
String errMsg = null;
String warnMsg = "Could not open client transport with JDBC Uri: " + jdbcUriString + ": ";
if (isZkDynamicDiscoveryMode()) {
errMsg = "Could not open client transport for any of the Server URI's in ZooKeeper: ";
// Try next available server in zookeeper, or retry all the servers again if retry is enabled
while(!Utils.updateConnParamsFromZooKeeper(connParams) && ++numRetries < maxRetries) {
connParams.getRejectedHostZnodePaths().clear();
}
// Update with new values
jdbcUriString = connParams.getJdbcUriString();
host = connParams.getHost();
port = connParams.getPort();
} else {
errMsg = warnMsg;
++numRetries;
}
if (numRetries >= maxRetries) {
throw new SQLException(errMsg + e.getMessage(), " 08S01", e);
} else {
LOG.warn(warnMsg + e.getMessage() + " Retrying " + numRetries + " of " + maxRetries);
}
}
}
一個小建議:遇到此類問題,直接看源碼是最好的, 不要去網(wǎng)上瞎找。
Hive 3.x 的版本請注意這個 issue:https://issues.apache.org/jira/browse/HIVE-19825,也可能會導致隨機的失敗