前言
企業(yè)正在經(jīng)歷其數(shù)據(jù)資產(chǎn)的爆炸式增長(zhǎng),這些數(shù)據(jù)包括批式或流式傳輸?shù)慕Y(jié)構(gòu)化、半結(jié)構(gòu)化以及非結(jié)構(gòu)化數(shù)據(jù),隨著海量數(shù)據(jù)批量導(dǎo)入的場(chǎng)景的增多,企業(yè)對(duì)于 Data Pipeline 的需求也愈加復(fù)雜。新一代云原生實(shí)時(shí)數(shù)倉(cāng) SelectDB Cloud 作為一款運(yùn)行于多云之上的云原生實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù),致力于通過(guò)開(kāi)箱即用的能力為客戶帶來(lái)簡(jiǎn)單快速的數(shù)倉(cāng)體驗(yàn)。在生態(tài)方面,SelectDB Cloud 提供了豐富的數(shù)據(jù)連接器插件(Connector)來(lái)連接各種來(lái)自周邊大數(shù)據(jù)工具的數(shù)據(jù)源,內(nèi)置 Kafka、Flink、Spark、DataX 等常見(jiàn)的 Connector?;诖?,企業(yè)開(kāi)發(fā)者能夠更加便捷的將數(shù)據(jù)移動(dòng)到 SelectDB Cloud 上,并利用 SelectDB Cloud 從數(shù)據(jù)資產(chǎn)中獲取更高的價(jià)值。
SelectDB Cloud 基于 Apache Doris 研發(fā)的新一代云原生實(shí)時(shí)數(shù)倉(cāng) SelectDB,運(yùn)行于多家云上,為客戶提供極簡(jiǎn)運(yùn)維和極致性價(jià)比的數(shù)倉(cāng)服務(wù)。
本篇將帶來(lái) Spark SelectDB Connector 的詳細(xì)介紹與實(shí)踐。
什么是 Spark SelectDB Connector?
Spark SelectDB Connector 作為 SelectDB Cloud 上大數(shù)據(jù)量的導(dǎo)入方式之一,可以利用 Spark天然的分布式計(jì)算優(yōu)勢(shì)將數(shù)據(jù)導(dǎo)入到 SelectDB Cloud 中。具體來(lái)講,Spark SelectDB Connector 支持將其他數(shù)據(jù)源(PostgreSQL, HDFS, S3等)的數(shù)據(jù)通過(guò) Spark 計(jì)算引擎后同步到 SelectDB Cloud 的數(shù)據(jù)表中。
利用 Spark SelectDB Connector,開(kāi)發(fā)者能夠使用 Spark 將上游數(shù)據(jù)源讀取到 DataFrame 中,然后使用 Spark SelectDB Connector 將大規(guī)模數(shù)據(jù)導(dǎo)入到SelectDB Cloud 數(shù)據(jù)倉(cāng)庫(kù)的表中;同時(shí),開(kāi)發(fā)者可以使用 Spark 的 JDBC 的方式來(lái)讀取 SelectDB Cloud 表中的數(shù)據(jù)。
基礎(chǔ)架構(gòu)

在整個(gè)架構(gòu)中,通常 Spark SelectDB Connector 作為外部數(shù)據(jù)寫(xiě)入到 SelectDB Cloud 的橋梁,優(yōu)化了傳統(tǒng)地使用 JDBC 這種低性能的連接寫(xiě)入方式,以其分布式、高效的特性豐富了整個(gè)數(shù)據(jù)鏈路。
工作原理
Spark Selectdb Connector 底層實(shí)現(xiàn)依賴于 SelectDB Cloud 的 stage 導(dǎo)入方式,當(dāng)前支持兩種Stage 導(dǎo)入方式:
- 通過(guò)創(chuàng)建對(duì)象存儲(chǔ)上的 stage 來(lái)進(jìn)行批量數(shù)據(jù)拉取導(dǎo)入,這個(gè)主要適合大批量數(shù)據(jù)導(dǎo)入,使用前提是用戶有自己的對(duì)象存儲(chǔ)及其相關(guān)密鑰;
- 基于內(nèi)置的 stage 的推送導(dǎo)入,這個(gè)主要是和小批量推送,使用較簡(jiǎn)單;
對(duì)于第一種導(dǎo)入方式,依賴于用戶自己的對(duì)象存儲(chǔ),首先需要在 SelectDB Cloud 中創(chuàng)建 External Stage,然后將創(chuàng)建的 External Stage 的訪問(wèn)權(quán)限給用戶,用戶可以將需要導(dǎo)入的數(shù)據(jù)存儲(chǔ)已經(jīng)配置好的External Stage的存儲(chǔ)中,通過(guò) Spark 調(diào)用 SelectDB Cloud 的 copy into 接口(/copyinto)將對(duì)象存儲(chǔ)的數(shù)據(jù)導(dǎo)入SelectDB Cloud的表中。
對(duì)于第二種導(dǎo)入方式,主要依賴于 SelectDB Cloud 提供的內(nèi)置對(duì)象存儲(chǔ),Spark 通過(guò)調(diào)用SelectDB Cloud的upload接口(/copy/upload)會(huì)返回一個(gè)重定向的對(duì)象存儲(chǔ)地址,使用 http 的方式向S3地址發(fā)送字節(jié)流,待數(shù)據(jù)上傳完成之后在調(diào)用 SelectDB Cloud 的 copy into 接口(/copyinto)將對(duì)象存儲(chǔ)的數(shù)據(jù)導(dǎo)入 SelectDB Cloud 的表中。
使用教程
下載方式
我們已經(jīng)預(yù)編譯了三個(gè)包供大家來(lái)直接下載,詳細(xì)版本以及下載地址見(jiàn)下表:
| Connector | Runtime Jar |
| 2.3.4-2.11-1.0 | spark-selectdb-connector-2.3.4_2.11-1.0 |
| 3.1.2-2.12-1.0 | spark-selectdb-connector-3.1.2_2.12-1.0 |
| 3.2.0-2.12-1.0 | spark-selectdb-connector-3.2.0_2.12-1.0 |
注意:
- Connector的格式為:spark-selectdb-connector-
{scala.version}-${connector.version}-SNAPSHOT.jar;
- 所有的jar包是通過(guò)java 8來(lái)編譯的;
- 如有其他版本需求可通過(guò)SelectDB官網(wǎng)的聯(lián)系方式來(lái)聯(lián)系我們;
本地開(kāi)發(fā)
一般我們本地開(kāi)發(fā)通過(guò) maven 引入依賴的方式將 Spark SelectDB Connector 的包引入到我們的項(xiàng)目中,在此之前需要將下載的 jar 通過(guò)mvn install的方式安裝到本地倉(cāng)庫(kù),在 maven 中使用以下方式添加依賴。
<dependency>
<groupId>com.selectdb.spark</groupId>
<artifactId>spark-selectdb-connector-3.1_2.12</artifactId>
<version>1.0</version>
</dependency>
Spark Standalone & Cluster 方式
如果使用 Spark Standalone 或者 Spark Cluster 的方式運(yùn)行我們的 Spark 程序,只需要將下載的jar 放到 Spark 安裝目錄的 jars 目錄下即可。
注意:
如果多節(jié)點(diǎn)Spark,需要在每個(gè)Spark節(jié)點(diǎn)的jars目錄下放一份Spark SelectDB Connector的jar包。
Spark On Yarn
Yarn集群模式運(yùn)行的Spark,則將此文件放入預(yù)部署包中。例如將 spark-selectdb-connector-2.3.4-2.11-1.0.-SNAPSHOT.jar 上傳到 hdfs并在spark.yarn.jars參數(shù)上添加 hdfs上的Jar包路徑
- 上傳
spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar到hdfs。
hdfs dfs -mkdir /spark-jars/ hdfs dfs -put /your_local_path/spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar /spark-jars/
- 在集群中添加
spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar依賴。
spark.yarn.jars=hdfs:///spark-jars/doris-spark-connector-3.1.2-2.12-1.0.0.jar
使用場(chǎng)景
通過(guò) sparksql 的方式寫(xiě)入
val selectdbHttpPort = "127.0.0.1:47968"
val selectdbJdbc = "jdbc:mysql://127.0.0.1:18836/test"
val selectdbUser = "admin"
val selectdbPwd = "selectdb2022"
val selectdbTable = "test.test_order"
CREATE TEMPORARY VIEW test_order
USING selectdb
OPTIONS(
"table.identifier"="test.test_order",
"jdbc.url"="${selectdbJdbc}",
"http.port"="${selectdbHttpPort}",
"user"="${selectdbUser}",
"password"="${selectdbPwd}",
"sink.properties.file.type"="json"
);
insert into test_order select order_id,order_amount,order_status from tmp_tb ;
通過(guò) DataFrame 方式寫(xiě)入
val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
("1", 100, "待付款"),
("2", 200, null),
("3", 300, "已收貨")
)).toDF("order_id", "order_amount", "order_status")
df.write
.format("selectdb")
.option("selectdb.http.port", selectdbHttpPort)
.option("selectdb.table.identifier", selectdbTable)
.option("user", selectdbUser)
.option("password", selectdbPwd)
.option("sink.batch.size", 4)
.option("sink.max-retries", 2)
.option("sink.properties.file.column_separator", "\t")
.option("sink.properties.file.line_delimiter", "\n")
.save()
具體案例
本章節(jié)我們以一個(gè)例子來(lái)演示如何使用 Spark SelectDB Connector,演示的環(huán)境各版本如下:
| Java | Spark | Scala | SelectDB Cloud |
| 1.8 | 3.1.2 | 2.12 | 2.2.1 |
在開(kāi)始導(dǎo)入之前,我們需要做幾項(xiàng)準(zhǔn)備工作:
- Spark 環(huán)境構(gòu)建,從官網(wǎng)下載 Spark 安裝包,本次演示所用 Spark 安裝包 spark-3.1.2-bin-hadoop3.2.tgz;
- 構(gòu)造導(dǎo)入的數(shù)據(jù),此次我們只是測(cè)試,構(gòu)造4條數(shù)據(jù)來(lái)完成導(dǎo)入;
- Selectdb Cloud 創(chuàng)建倉(cāng)庫(kù)以及集群,設(shè)置admin 的密碼,開(kāi)通公網(wǎng)連接,將我們 Spark 環(huán)境的公網(wǎng)ip配置到ip白名單中;
我們先來(lái)看構(gòu)建我們的 Spark 環(huán)境,下載spark-3.1.2-bin-hadoop3.2.tgz安裝包,解壓安裝包;
wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
tar xvzf spark-3.1.2-bin-hadoop3.2.tgz
將spark-selectdb-connector-3.1.2_2.12-1.0-SNAPSHOT.jar放到/opt/selectdb/spark-3.1.2-bin-hadoop3.2/jars目錄下

導(dǎo)入的原始數(shù)據(jù)如下:
1,100,已下單
2,200,已付款
3,300,已發(fā)貨
4,400,已收貨
SelectDB Cloud 中創(chuàng)建數(shù)據(jù)表:
CREATE TABLE `spark_selectdb_connector` (
`order_id` varchar(30) NULL,
`order_amount` int(11) NULL,
`order_status` varchar(30) NULL
) ENGINE=OLAP
DUPLICATE KEY(`order_id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`order_id`) BUCKETS 10
PROPERTIES (
"persistent" = "false"
);

我們以 spark-shell 的方式將我們的測(cè)試數(shù)據(jù)導(dǎo)入到 SelectDB Cloud 的數(shù)據(jù)表中:
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
val session = SparkSession.builder().master("local[*]").getOrCreate()
val scam = StructType(StructField("order_id",StringType)::StructField("order_amount",IntegerType)::StructField("order_status",StringType)::Nil)
val df = spark.read.schema(scam).csv("file:///opt/selectdb/data/test.txt")
df.write.format("selectdb")
.option("selectdb.http.port", "81.70.4.52:36511")
.option("selectdb.table.identifier", "test.spark_selectdb_connector")
.option("user", "admin")
.option("password", "Admin12345")
.option("sink.batch.size", 4)
.option("sink.max-retries", 2)
.save()

Spark 任務(wù)執(zhí)行完成后,我們可以通過(guò) mysql-client 連接 Selectdb Cloud,查看我們通過(guò)導(dǎo)入的數(shù)據(jù)。

至此,我們通過(guò) Spark SelectDB Connector 導(dǎo)入數(shù)據(jù)的案例就結(jié)束了。
總結(jié)
本篇我們從Spark SelectDB Connector的原理以及實(shí)踐等各方面做了詳細(xì)介紹,大家有以下幾種場(chǎng)景需求的情況可以使用這種連接器:
- 以 Spark 為計(jì)算引擎構(gòu)建的技術(shù)架構(gòu)體系,減少其他組件引入的成本;
- 大規(guī)模數(shù)據(jù) ETL 離線寫(xiě)入SelectDB Cloud,利用 Spark 分布式計(jì)算的特性,降低doris集群資源消耗成本;
Spark SelectDB Connector 以 Spark 這個(gè)大數(shù)據(jù)計(jì)算的優(yōu)秀組件作為核心,實(shí)現(xiàn)了利用 Spark 將外部數(shù)據(jù)源的大數(shù)據(jù)量同步到 SelectDB Cloud,便于我們實(shí)現(xiàn)大批量數(shù)據(jù)的快速同步,繼而利用 SelectDB Cloud 為基石構(gòu)建新一代的云原生數(shù)據(jù)倉(cāng)庫(kù),結(jié)合 SelectDB Cloud 強(qiáng)大的分析計(jì)算性能,能夠?yàn)槠髽I(yè)帶來(lái)業(yè)務(wù)便捷性以及增效將本的目標(biāo)。