本地IDEA搭建開發(fā)環(huán)境,實(shí)現(xiàn)local模式spark對(duì)開啟Kerberos認(rèn)證的云端hive數(shù)據(jù)進(jìn)行讀寫操作。
一、環(huán)境版本:
本地PC: win10
開發(fā)環(huán)境: IntelliJ IDEA 2019.1.3 (Community Edition)
java: jdk1.8.0_162
scala: 2.11.12
spark: 2.3.2-mrs-2.0
hadoop: 3.1.1-mrs-2.0
集群版本: 華為云MRS 2.0.5
Kerberos認(rèn)證:開啟
二、環(huán)境配置過程
-
打通本地windows與云端MRS集群網(wǎng)絡(luò)MRS集群
-》hdfs、hive組件主節(jié)點(diǎn)、從節(jié)點(diǎn)均需要綁定彈性公網(wǎng)ip
-》添加安全組規(guī)則:
查詢本地windows公網(wǎng)ip,在MRS集群安全組添加入方向規(guī)則,協(xié)議端口簡單粗暴設(shè)置為全部放通。
配置完后,可以在本地windows測試在集群綁定的彈性公網(wǎng)ip連通性,是否ping通,telnet相應(yīng)端口是否成功。
創(chuàng)建添加安全組規(guī)則的詳細(xì)步驟
image.png 開啟Kerberos認(rèn)證的集群,需要?jiǎng)?chuàng)建開發(fā)用戶。
-》用戶需要有HDFS、HIVE、YARN等權(quán)限,才可以運(yùn)行spark程序。這里依然簡單粗暴的創(chuàng)建并賦予sparkuser該用戶所有組件的全部權(quán)限。
創(chuàng)建開發(fā)用戶的詳細(xì)步驟
-》在MRS Manager界面選擇“系統(tǒng)設(shè)置>用戶管理”,在用戶名中選擇sparkuser,單擊操作中下載認(rèn)證憑據(jù)文件,保存后解壓得到用戶的keytab文件與krb5.conf文件,將文件中的ip地址改為之前綁定的相應(yīng)彈性公網(wǎng)ip,用于在spark程序本地運(yùn)行時(shí)進(jìn)行安全認(rèn)證。-
下載集群客戶端配置文件
-》將hdfs-site.xml、core-site.xml、hive-site.xml、yarn-site.xml、mapred-site.xml等*.xml配置文件放到IDEA工程中的資源目錄下。
image.png
-》將集群客戶端配置中hosts文件的集群節(jié)點(diǎn)ip修改為之前綁定的彈性公網(wǎng)ip,將修改好的ip主機(jī)名映射添加到本地windows的hosts文件。
IDEA中創(chuàng)建Maven工程
-》配置華為云鏡像
kerberos認(rèn)證代碼參考華為的樣例代碼
-》將步驟3的集群客戶端配置文件和用戶憑證keytab、krb5.conf放到工程的資源目錄下,將客戶端配置文件中的所有ip地址修改成hosts文件中相應(yīng)的主機(jī)名。
-》在hdfs-site.xml中添加如下配置
本地測試PC與集群不在一個(gè)局域網(wǎng),這種情況下,本地訪問hdfs時(shí),namenode會(huì)返回?cái)?shù)據(jù)所在的datanode地址,但是返回的可能是datanode的內(nèi)網(wǎng)私有ip,我們無法根據(jù)該ip訪問數(shù)據(jù)節(jié)點(diǎn)datanode,添加如下配置,讓namenode返回datanode的域名。之前我們已經(jīng)在本地hosts文件配置了所有節(jié)點(diǎn)的公網(wǎng)ip。因此本地就可以通過域名訪問到hdfs中的數(shù)據(jù)了。
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
<description>only cofig in clients</description>
</property>
-》spark訪問云端hdfs代碼如下:
package com.huawei.bigdata.spark.examples
import java.io.File
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import com.huawei.hadoop.security.LoginUtil
object FemaleInfoCollection {
def main(args: Array[String]) {
// security mode
val userPrincipal = "spark_wang"
val filePath = System.getProperty("user.dir") + File.separator + "resources" + File.separator
val userKeyTableFile = filePath + "user.keytab"
val krbFile = filePath + "krb5.conf"
val hadoopConf: Configuration = new Configuration()
// hadoopConf.set("dfs.client.use.datanode.hostname", "true") // 已在hdfs-side.xml添加該配置
LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf)
// Configure the Spark application name.
val conf = new SparkConf().setAppName("CollectFemaleInfo")
.setMaster("local")
// Initializing Spark
val sc = new SparkContext(conf)
// Read data. This code indicates the data path that the input parameter args(0) specifies.
val text = sc.textFile("/user/spark_wang/female-info.txt") // 默認(rèn)會(huì)從配置文件中獲取hdfs地址,可以寫成全路徑hdfs://node-master1bcgx:9820/user/spark_wang/female-info.txt
// Filter the data information about the time that female netizens spend online.
val data = text.filter(_.contains("female"))
// Aggregate the time that each female netizen spends online
val femaleData: RDD[(String, Int)] = data.map { line =>
val t = line.split(',')
(t(0), t(2).toInt)
}.reduceByKey(_ + _)
// Filter the information about female netizens who spend more than 2 hours online, and export the results
val result = femaleData.filter(line => line._2 > 10)
result.collect().map(x => x._1 + ',' + x._2).foreach(println)
sc.stop()
}
}
-》spark本地讀云端hive代碼如下:
本地可以訪問到hive上的所有數(shù)據(jù)庫,如果訪問不到云端hive,結(jié)果要么報(bào)錯(cuò),要么只能顯示default一個(gè)數(shù)據(jù)庫名稱。
package com.huawei.bigdata.spark.examples
import java.io.File
import com.huawei.hadoop.security.LoginUtil
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* Author: whn
* Date: 2020-2-6 14:49
* Version: 1.0
* Function:
*/
object SparkHiveOnCloud {
case class Test(occupy: String, name: String, age: Int)
def main(args: Array[String]): Unit = {
val userPrincipal = "spark_wang"
val filePath = System.getProperty("user.dir") + File.separator + "resources" + File.separator
val userKeyTableFile = filePath + "user.keytab"
val krbFile = filePath + "krb5.conf"
val hadoopConf: Configuration = new Configuration()
hadoopConf.set("dfs.client.use.datanode.hostname", "true")
LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf)
val conf = new SparkConf()
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local")
.config(conf)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
spark.sql("SHOW DATABASES").show()
//運(yùn)行結(jié)果
// +------------+
// |databaseName|
// +------------+
// | default|
// |mrs_reserved|
// | test|
// +------------+
val arr = Array(("a", "whn", 20), ("b", "cjj", 12), ("c", "haha", 18), ("f", "jay", 2), ("g", "kobe", 210), ("z", "asdf", 11))
// 本地創(chuàng)建Dataframe寫入云端hive
spark.sparkContext.parallelize(arr)
.map(tuple => Test(tuple._1, tuple._2, tuple._3))
.toDF("occupy", "name", "age")
.write
.format("hive")
.mode("append")
.saveAsTable("test.longi")
// 讀hive
val data = spark.sql("Select * from test.longi")
data.show()
spark.stop()
}
}
至此就實(shí)現(xiàn)了云端集群外的本地window通過spark local模式直接訪問云端集群的hdfs、hive數(shù)據(jù)并進(jìn)行讀寫操作。
-》附上pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.huawei.spark.examples</groupId>
<artifactId>SparkScalaExample</artifactId>
<version>mrs-2.0</version>
<name>SparkScalaExample</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.3.2-mrs-2.0</spark.version>
<hadoop.version>3.1.1-mrs-2.0</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<configuration>
<recompileMode>modified-only</recompileMode>
</configuration>
<executions>
<execution>
<id>main-scalac</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<directory>target</directory>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<sourceDirectory>src</sourceDirectory>
</build>
</project>

