最近要在 Spark job 中通過 Spark SQL 的方式讀取 Elasticsearch 數(shù)據(jù),踩了一些坑,總結(jié)于此。
環(huán)境說明
Spark job 的編寫語言為 Scala,scala-library 的版本為 2.11.8。
Spark 相關(guān)依賴包的版本為 2.3.2,如 spark-core、spark-sql。
-
Elasticsearch 數(shù)據(jù)
schema
{ "settings": { "number_of_replicas": 1 }, "mappings": { "label": { "properties": { "docId": { "type": "keyword" }, "labels": { "type": "nested", "properties": { "id": { "type": "long" }, "label": { "type": "keyword" } } }, "itemId": { "type": "long" } } } } }sample data
{ "took" : 141, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 17370929, "max_score" : 1.0, "hits" : [ { "_index" : "aen-label-v1", "_type" : "label", "_id" : "123_ITEM", "_score" : 1.0, "_source" : { "docId" : "123_ITEM", "labels" : [ { "id" : 7378, "label" : "1kg" } ], "itemId" : 123 } }, { "_index" : "aen-label-v1", "_type" : "label", "_id" : "456_ITEM", "_score" : 1.0, "_source" : { "docId" : "456_ITEM", "labels" : [ { "id" : 7378, "label" : "2kg" } ], "itemId" : 456 } } ] } }
準備工作
既然要用 Spark SQL,當然少不了其對應(yīng)的依賴,
dependencies {
implementation 'org.apache.spark:spark-core_2.11:2.3.2'
implementation 'org.apache.spark:spark-sql_2.11:2.3.2'
}
對于 ES 的相關(guān)庫,如同 官網(wǎng) 所說,要在 Spark 中訪問 ES,需要將 elasticsearch-hadoop 依賴包加入到 Spark job 運行的類路徑中,具體而言就是添加到 Spark job 工程的依賴中,公司的 nexus 中當前最新的版本為 7.15.0,且目前我們是使用 gradle 管理依賴,故添加依賴的代碼如下,
dependencies {
implementation 'org.elasticsearch:elasticsearch-hadoop:7.15.0'
}
本地測試
對于 Spark,基于資源管理器的不同,可以在兩種模式下運行:本地模式和集群模式,可通過 --master 參數(shù)來指定資源管理器的方式。本地模式時,不依賴額外的 Spark 集群,Spark 將在同一臺機器上運行所有內(nèi)容,非常方便用于本地測試,對于 Spark SQL,只需要在創(chuàng)建 SparkSession 時采用 local 的模式即可,
class MyUtils extends Serializable {
def esHost() = s"es.sherlockyb.club"
// local mode
def getLocalSparkSession: SparkSession = SparkSession.builder()
.master("local")
.getOrCreate()
// cluster mode
def getSparkSession: SparkSession = SparkSession.builder()
.enableHiveSupport()
.config("spark.sql.broadcastTimeout", "3600")
.getOrCreate()
}
測試代碼
object LocalTest extends LazyLogging {
def main(args: Array[String]): Unit = {
new LocalTest().run()
}
}
class LocalTest {
def run(): Unit = {
val myUtils = new MyUtils
val spark = myUtils.getLocalSparkSession
import spark.implicits._
var start = System.currentTimeMillis()
val attributeId = 7378L
val labelNames = Array("aen-label-retail", "aen-label-seller")
spark.read
.format("es")
.option("es.nodes", myUtils.esHost())
.option("es.port", "9200")
.option("es.nodes.wan.only", value = true)
.option("es.resource", Joiner.on(",").join(java.util.Arrays.asList(labelNames:_*)) + "/label")
.option("es.scroll.size", 2000)
.load()
.createOrReplaceTempView("temp_labels")
val sqlDf = spark.sql("select itemId, labels from temp_labels where itemId in (123, 456)")
val newDf = sqlDf
.map(row => {
val labels = row.getAs[Seq[Row]]("labels")
val labelValue = labels.find(p => p.getAs[Long]("id") == attributeId).map(p => p.getAs[String]("label"))
(row.getAs[Long]("itemId"), attributeId, labelValue.orNull)
})
.withColumn("final_result", lit("PASS"))
.toDF("itemId", "attributeId", "label", "final_result")
val finalDf = newDf.toDF("itemId", "attributeId", "label", "result")
finalDf.printSchema()
finalDf.show()
var emptyDf = newDf
.filter(col("label").isNotNull)
.toDF("itemId", "attributeId", "label", "result")
emptyDf = emptyDf.union(finalDf)
emptyDf.printSchema()
emptyDf.show()
emptyDf.filter(col("itemId") === 6238081929L and col("label").notEqual(col("result")))
.show()
val attributeTypeIds = Array.fill(3)(100)
val attributeTypeIdsStr = Joiner.on(",").join(java.util.Arrays.asList(attributeTypeIds:_*))
println(attributeTypeIdsStr)
import scala.collection.JavaConverters._
emptyDf = emptyDf.filter(!col("itemId").isin(trainItemIds.asScala.map(Long2long).toList:_*))
emptyDf.show(false)
}
}
知識點
Spark SQL Data Sources
Spark SQL 通過 DataFrameReader 類支持讀取各種類型的數(shù)據(jù)源,比如 Parquet、ORC、JSON、CSV 等格式的文件,Hive table,以及其他 database。而 Elasticsearch 只不過是眾多數(shù)據(jù)源中的一種,DataFrameReader 通過 format(...) 指定數(shù)據(jù)源格式,通過 option(...) 定制對應(yīng)數(shù)據(jù)源下的配置,最后通過 load() 加載生成 DataFrame,也就是 Dataset[Row] 的類型別名。有了 DataFrame,就可以創(chuàng)建一個臨時表,然后就能以 SQL 的方式讀取數(shù)據(jù)。
在 Spark 1.5 以前,Elasticsearch 在 format(...) 中對應(yīng)的 source 名需要是全包名 org.elasticsearch.spark.sql,而在 Spark 1.5 以及之后的版本,source 名稱簡化為 es。
Spark SQL 中 DataFrame 常用 API
- df.printSchema(),打印 schema
- df.show(),查看數(shù)據(jù)列表,默認是 truncate 前 20 條,傳 false 時列出全部數(shù)據(jù)。
- df.createOrReplaceTempView("view_name"),構(gòu)建臨時表視圖,方便后續(xù) SQL 操作。
- df.withColumn(),添加新列或替換現(xiàn)有列。
- df.withColumn("final_result", lit("PASS")) ,通過
lit添加常量列。
- df.withColumn("final_result", lit("PASS")) ,通過
- df.filter(col("label").isNotNull),用指定的條件過濾行。
- df.dropDuplicates("itemId","attributeId"),按指定列對行去重,返回新的數(shù)據(jù)集。
- df.union(otherDf),將兩個 DataFrame 的記錄合并且不去重,相當于 union all。
- df.toDF("itemId", "attributeId", "label", "final_result"),為 df 各列指定一個有意義的名稱。
Scala 與 Java 類型映射
- scala.Long -> long
- Array[T] -> T[]
Scala 與 Java 類型轉(zhuǎn)換
import scala.collection.JavaConverters._
newDf = df.filter(!col("itemId").isin(trainItemIds.asScala.map(Long2long).toList:_*))
Scala 中的 : _*
:_* 是 type ascription 的一個特例,它會告訴編譯器將序列類型的單個參數(shù)視為變參數(shù)序列,即 varargs。應(yīng)用例子,
val indices = Array("aen-label", "aen-label-seller")
Joiner.on(",").join(java.util.Arrays.asList(indices:_*))
踩的坑
es.nodes.wan.only
該配置項表示連接器是否用于 WAN 上的云或受限環(huán)境如 AWS 中的 Elasticsearch 實例,默認為 false,而公司的 Elasticsearch 集群是在 AWS 上的,endpoint 只能在內(nèi)網(wǎng)訪問,因而剛開始測試時,遇到如下報錯,
Exception in thread "main" org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available
at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonDataNodesIfNeeded(InitializationUtils.java:159)
at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:223)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:73)
at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:72)
at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:44)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:340)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
通過 option("es.nodes.wan.only", value = true) 將配置項設(shè)置為 true 后恢復正常。
importing spark.implicits._
在遍歷 DataFrame 時遇到如下編譯錯誤,
Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._
在處理 DataFrame 之前需要加上 importing spark.implicits._,用于將常見的 Scala 對象轉(zhuǎn)換為 DataFrame,通常在獲取 SparkSession 后立馬 import。
Spark SQL 讀取 hive 表中 array 類型時,對于 Scala 語言,得到的類型是 WrappedArray 而不是 Array
當我們通過 createOrReplaceTempView("temp_labels") 構(gòu)建一個臨時表視圖后,就可以通過 SQL 像操作 hive 表那樣讀取數(shù)據(jù)。例如讀取指定的列,
val sqlDf = spark.sql("select itemId, labels from temp_labels where itemId in (123, 456)")
通過 sqlDf.printSchema() 可以看到 sqlDf 的 schema 長這樣,
root
|-- itemId: long (nullable = true)
|-- labels: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- label: string (nullable = true)
labels 是包含 struct 的數(shù)組,于是從 row 中將 labels 列讀出時想嘗試轉(zhuǎn)換為 Array,
val newDf = sqlDf.map(
row => {
val labels = row.getAs[Array[Row]]("labels")
val labelValue = labels.find(p => p.getAs[Long]("id") == attributeId).map(p => p.getAs[String]("label"))
(row.getAs[Long]("itemId"), attributeId, labelValue.orNull)
}
)
結(jié)果報錯如下,
java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lorg.apache.spark.sql.Row;
可以看到 Spark SQL 在讀取表中數(shù)組列時,是用的 scala.collection.mutable.WrappedArray 來存儲結(jié)果的,看其類定義可知,它是間接實現(xiàn) Seq 接口的,所以也可用 row.getAs[Seq[Row]]("labels") 來讀取。這里需要注意的是,Array[T] 雖然在 Scala 源碼定義中是 class,但其對標的 Java 類型是原生數(shù)組 T[]。
判斷 Column 是否為 null 時,需要用 is null 或 is not null,而不是 === 或 !==
對于錯誤的用法,filter 并不會生效,就像下面這樣
newDf.filter(col("label") !== null)
這一點和 hive 表以及 MySQL 表判斷字段是否為 null,是保持一致的,應(yīng)該像下面這樣,
newDf.filter(col("label").isNotNull)
最終代碼
import com.google.common.base.Joiner
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
object TestMain extends LazyLogging {
def main(args: Array[String]): Unit = {
val myUtils = new MyUtils
new TestApp(myUtils).run()
}
}
class TestApp(myUtils: MyUtils) extends Serializable with LazyLogging {
def esDf(spark: SparkSession, indices: Array[String]): DataFrame = {
spark.read
.format("es")
.option("es.nodes", myUtils.esHost())
.option("es.port", "9200")
.option("es.nodes.wan.only", value = true)
.option("es.resource", Joiner.on(",").join(java.util.Arrays.asList(indices:_*)) + "/label")
.option("es.scroll.size", 2000)
.load()
}
def run(): Unit = {
val spark = myUtils.getSparkSession
import spark.implicits._
val esTempView = "es_label"
val labelNames = Array("aen-label-retail", "aen-label-seller")
esDf(spark, labelNames).createOrReplaceTempView(esTempView)
val labelDf = getLabelDf(spark, itemIdsStr, attributeTypeIds, esTempView)
println("debug log")
labelDf.printSchema()
labelDf.show()
labelDf.createOrReplaceTempView("final_labels")
val data = spark.sql(
s"""
|select cc.*, pp.final_result, pp.label, null as remark
|from temp_request cc
|left join final_labels pp
|on cc.itemid = pp.itemId
|and cc.attributetypeid = pp.attributeId
|where cc.profile = '$jobId'
|""".stripMargin)
data.distinct().write.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(s"s3://sherlockyb-test/check-precision/job_id=$jobId")
}
def getLabelDf(spark: SparkSession, itemIdsStr: String, attributeTypeIds: Array[String], esTempView: String): DataFrame = {
import spark.implicits._
val sqlDf = spark.sql(s"select itemId, labels from $esTempView where itemId in ($itemIdsStr)")
val emptyDf = spark.emptyDataFrame
var labelDf = emptyDf
attributeTypeIds.foreach(attributeTypeId => {
val attributeDf = sqlDf
.map(row => {
val labels = row.getAs[Seq[Row]]("labels")
val labelValue = labels.find(p => p.getAs[Long]("id") == attributeTypeId.toLong).map(p => p.getAs[String]("label"))
(row.getAs[Long]("itemId"), attributeTypeId.toLong, labelValue.orNull)
})
.withColumn("final_result", lit("PASS"))
.toDF("itemId", "attributeId", "label", "final_result")
.filter(col("label").isNotNull)
if (labelDf == emptyDf) {
labelDf = attributeDf
} else {
labelDf = labelDf.union(attributeDf)
}
})
labelDf.dropDuplicates("itemId","attributeId")
}
}
補充:提交 spark job
將 job 工程打包為 Jar,上傳到 AWS 的 s3,比如 s3://sherlockyb-test/1.0.0/artifacts/spark/ 目錄下,然后通過 Genie 提交 spark job 到 Spark 集群運行。Genie 是 Netflix 研發(fā)的聯(lián)合作業(yè)執(zhí)行引擎,提供 REST-full API 來運行各種大數(shù)據(jù)作業(yè),如 Hadoop、Pig、Hive、Spark、Presto、Sqoop 等。
def run_spark(job_name, spark_jar_name, spark_class_name, arg_str, spark_param=''):
import pygenie
pygenie.conf.DEFAULT_GENIE_URL = "genie.sherlockyb.club"
job = pygenie.jobs.GenieJob() \
.genie_username('sherlockyb') \
.job_name(job_name) \
.job_version('0.0.1') \
.metadata(teamId='team_account') \
.metadata(teamCredential='team_password')
job.cluster_tags(['type:yarn-kerberos', 'sched:default'])
job.command_tags(['type:spark-submit-kerberos', 'ver:2.3.2'])
job.command_arguments(
f"--class {spark_class_name} {spark_param} "
f"s3a://sherlockyb-test/1.0.0/artifacts/spark/{spark_jar_name} "
f"{arg_str}"
)
# Submit the job to Genie
running_job = job.execute()
running_job.wait()
return running_job.status
首發(fā)鏈接:https://www.yangbing.fun/2022/06/03/Spark-reading-elasticsearch-guide/
許可協(xié)議:除特殊聲明外,本博文均采用 CC BY-NC-SA 3.0 CN 許可協(xié)議,轉(zhuǎn)載請注明出處!