背景
隨著公司業(yè)務(wù)數(shù)據(jù)增長和對數(shù)據(jù)分析能力的增強。以往T+1/H+1的離線數(shù)據(jù)提供已經(jīng)不能滿足公司的業(yè)務(wù)需求。比如客戶分配數(shù)據(jù)和線索數(shù)據(jù),誰會希望分配的客戶或線索幾個小時后才會被跟進呢。不僅如此,千萬級、億級的大表同步問題也非常頭疼。以往的小表(同步方案:業(yè)務(wù)低峰期度業(yè)務(wù)庫)隨著業(yè)務(wù)發(fā)展變成了千萬級大變帶來的慢SQL一直被DBA詬病。針對這類表目前還是只能申請只讀實例進行同步,成本比較高。
下圖所示是公司內(nèi)部實時鏈路架構(gòu)圖

這個架構(gòu)因為復雜度高、連路長出現(xiàn)了不少穩(wěn)定性問題,導致數(shù)據(jù)會漏掉。特別是集群資源使用高峰期sparkstreaming會因為hbase寫入壓力觸發(fā)反壓機制,導致數(shù)據(jù)延遲甚至sparkstreaming程序宕機重啟。另外一點是如果需要其它hive內(nèi)部表和這類外部表做聯(lián)合查詢,還需要把外部表經(jīng)過跑批任務(wù),轉(zhuǎn)換成內(nèi)部表。性能差不說還繁瑣。
問題
- 大表同步難成本高
- 實時性要求更高
調(diào)研
基于上述背景我們希望有一套技術(shù)體系可以解決至少其中的一個問題。如果能同時解決這兩個問題那么就更好了。作為大數(shù)據(jù)工程師站在技術(shù)人的角度,首先還是要關(guān)注數(shù)據(jù)同步的問題,畢竟數(shù)據(jù)都不準實時性的價值也就無從談起。經(jīng)過一段時間的調(diào)研我們發(fā)現(xiàn)業(yè)界比較新穎的數(shù)據(jù)同步手段是通過Flink CDC進行。我們經(jīng)過了一兩周的時間在測試環(huán)境搭建了flink環(huán)境,在flink sql client中對flink+flinkcdc進行了功能測試。發(fā)現(xiàn)它完全符合我們的功能需求。這套方案的好處是:
-
簡化了數(shù)據(jù)同步鏈路
簡化后
- 容錯和故障恢復。完善的checkpoint和savepoint機制
- 支持 Exactly Once
- 增量同步效率更高
與此同時我們發(fā)現(xiàn)了這套方案帶來的新問題,難道我要基于flink cdc再去整合?把它集成到公司數(shù)據(jù)中臺上來嗎?整合要考慮到監(jiān)控、web可視化操作、學習flink java doc、任務(wù)管理等等。這些絕對是不小的工作量而且是摸著石頭過河。希望借助一個平臺能對flink的操作進行可視化操作并且對常用操作提供腳手架,能開箱即用。這時候我們發(fā)現(xiàn)Streamspark,并基于它快速完成了flink cdc任務(wù)的編輯、提交和日常管理。
技術(shù)方案
最終為了解決大表同步難和實時性我們的方案是

技術(shù)實施
本地編譯環(huán)境
如果要嘗試RC版本或者其他Bata版本需要自己編譯的話需要在本機具備以下條件
- mysql5.7、jdk1.8、macos13.2.1
-
高版本maven。作者本來使用的3.2.3版本結(jié)果提示版本太低,果斷升級到3.8.7最新版 - 如果是前后端一起編譯需要安裝
npm和nodejs環(huán)境,根據(jù)提示安裝即可 - 編譯請運行項目根目錄下的
bulid.sh文件,編譯后的tar包在/dist下面。別再傻傻的mvn package然后到群里問為啥沒有編譯出來了[手動狗頭] - 最后非常非常重要的一點,首次編譯會下載堆依賴,比較慢。你最好準備一個
梯子
服務(wù)器環(huán)境
Flink : 1.13.6
Streampark : 2.0
Ambari : 2.7
HDP:3.1.4.0
Java:1.8
Linux : CentOS 7
MySQL: 5.7
Step 1 : 安裝Flink
- 去官網(wǎng)下載flink安裝包
flink-1.13.6-bin-scala_2.11.tgz - 解壓到
/opt下
[root@tabr1 flink] pwd
/opt/flink
[root@tabr1 flink] ll
total 484
drwxr-xr-x 2 flink flink 4096 Feb 13 14:57 bin
drwxr-xr-x 2 flink flink 295 Feb 22 18:33 conf
drwxr-xr-x 7 flink flink 76 Feb 4 2022 examples
drwxr-xr-x 2 flink flink 4096 Feb 15 12:53 lib
-rw-r--r-- 1 flink flink 11357 Aug 2 2019 LICENSE
drwxr-xr-x 2 flink flink 4096 Feb 4 2022 licenses
drwxr-xr-x 2 flink flink 4096 Feb 16 14:13 log
-rw-r--r-- 1 flink flink 455193 Feb 4 2022 NOTICE
drwxr-xr-x 3 flink flink 4096 Feb 4 2022 opt
drwxr-xr-x 10 flink flink 210 Feb 4 2022 plugins
-rw-r--r-- 1 flink flink 1309 Sep 9 2020 README.txt
flink環(huán)境還是比較簡單,如果對此有疑問,請閱讀官網(wǎng)幫助文檔。這里著重說streampark
Step 2 : 安裝Streampark
- 去官網(wǎng)下載Streampark安裝包
apache-streampark_2.11-2.0.0-incubating-bin.tar.gz - 解壓到
/opt下
[root@tabr1 apache-streampark_2.11-2.0.0-incubating-bin] pwd
/opt/apache-streampark_2.11-2.0.0-incubating-bin
[root@tabr1 apache-streampark_2.11-2.0.0-incubating-bin] ll
total 108
drwxr-xr-x 3 root root 127 Feb 14 12:19 bin
drwxr-xr-x 2 root root 71 Feb 21 11:12 client
drwxr-xr-x 2 root root 218 Feb 23 14:03 conf
-rw-r--r-- 1 root root 557 Feb 14 12:19 DISCLAIMER
drwxr-xr-x 2 root root 16384 Feb 21 11:42 lib
-rw-r--r-- 1 root root 56822 Feb 14 12:19 LICENSE
drwxr-xr-x 3 root root 4096 Feb 21 11:12 licenses
-rw-r--r-- 1 root root 1393 Feb 14 12:19 LICENSE.tpl
drwxr-xr-x 2 root root 305 Feb 23 00:00 logs
-rw-r--r-- 1 root root 615 Feb 14 12:19 NOTICE
drwxr-xr-x 2 root root 24 Feb 14 12:19 plugins
-rw-r--r-- 1 root root 5714 Feb 14 12:19 README.md
drwxr-xr-x 5 root root 64 Feb 14 12:19 script
drwxr-xr-x 6 root root 4096 Feb 22 19:11 temp
- 創(chuàng)建Streampark數(shù)據(jù)庫到Mysql
分別執(zhí)行/opt/apache-streampark_2.11-2.0.0-incubating-bin/script/下的schema/mysql-schema.sql和data/mysql-data.sql - 添加mysql驅(qū)動(mysql8的用戶可以忽略這一步)
添加mysql5.7的驅(qū)動jar包到/opt/apache-streampark_2.11-2.0.0-incubating-bin/lib下
Streampark2.0默認支持的是mysql8,在application-mysql.yml的配置文件中默認的mysql驅(qū)動地址是com.mysql.cj.jdbc.Driver.故:如果你的數(shù)據(jù)庫和我一樣是5.7版本,那你就要像我一樣,添加5.7的驅(qū)動,并且修改驅(qū)動地址為com.mysql.jdbc.Driver。
這里在刷streampark腳本到mysql的時候可能會出現(xiàn):mysql Index column size too large
遇到這個問題,檢查下你的mysql是不是5.7 。由于mysql5.7索引的長度限制,所以遇到這個問題的時候。請修改報錯表主鍵和索引長度是255的字段,改成128。就這個mysql8的問題,作者可以在github上弄個FAQ,并且簡單提供兼容mysql5.7的辦法和配置文件修改方法。我司89個RDS實例只有5個mysql8的版本,說明mysql8在生產(chǎn)環(huán)境使用的并沒有5.7多。當然,也許是我司個例。作者酌情考慮即可
- 修改StreamparkMysql配置和環(huán)境信息
1、修改/opt/apache-streampark_2.11-2.0.0-incubating-bin/conf/application-mysql.yml填寫對應(yīng)的mysql賬號密碼和環(huán)境信息
mysql5.7的用戶注意修改driver-class-name為com.mysql.jdbc.Driver
2、修改/opt/apache-streampark_2.11-2.0.0-incubating-bin/conf/application.yml
以下幾項著重檢查,其它的配置視情況而定
server:
port: 10000 #檢查端口不要沖突。比如hive server的端口就是10000號端口
spring:
profiles.active: mysql #改成mysql
workspace:
local: /opt/streampark_workspace #看情況修改
remote: hdfs://testclsuter/streampark #修改成自己環(huán)境下的hdfs地址
- 修改
/etc/profile環(huán)境變量
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_HOME=/usr/hdp/3.1.4.0-315/hadoop #根據(jù)實際情況填寫
export HADOOP_CONF_DIR=/etc/hadoop/conf #根據(jù)實際情況填寫
export HIVE_HOME=$HADOOP_HOME/../hive
export HBASE_HOME=$HADOOP_HOME/../hbase
export HADOOP_HDFS_HOME=$HADOOP_HOME/../hadoop-hdfs
export HADOOP_MAPRED_HOME=$HADOOP_HOME/../hadoop-mapreduce
export HADOOP_YARN_HOME=$HADOOP_HOME/../hadoop-yarn
- 啟動Streampark
[root@tabr1 bin] ./startup.sh
默認賬號密碼:admin/streampark
登陸后的效果如下:

作者在使用過程中發(fā)現(xiàn)一個問題,在這個界面終止任務(wù)后。不知道出于什么原因在yarn上并沒有停止掉,新用戶在上線streampark后,一定要特別注意這點。最好是直接在flink管理界面終止,如果要做savepoint,那么在streampark上終止后在yarn上檢查下,確認application結(jié)束了。后期如果要streampark到內(nèi)部系統(tǒng)在這點上也需要多關(guān)注。
-
配置Flink環(huán)境
Flink環(huán)境
flink要和streampark安裝在同一臺機器。這里配置成功后才能在作業(yè)管理里面添加flinksql任務(wù)。
作者因為好奇,在這里添加了不同版本的flink,發(fā)現(xiàn)這里居然沒有刪除功能。不能刪除我不用的flink版本。我想sparkpark的作者一定是忙忘記了
Step 3 : CDC代碼以及依賴
- CDC代碼實例
MySQL表結(jié)構(gòu):
CREATE TABLE `alert_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`biz_type` int(11) NOT NULL COMMENT '業(yè)務(wù)類型,1:定時任務(wù),2:數(shù)據(jù)表',
...
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=860598 DEFAULT CHARSET=utf8;
Hive表結(jié)構(gòu)。提前創(chuàng)建,用戶接收最終的binlog的數(shù)據(jù):
CREATE TABLE ods.kafka_hive_cdc_v3 (
`id` bigint,
`biz_type` string,
...
`create_time` string,
`update_time` string,
`type` string
) PARTITIONED BY (dt string, hr string) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='5 S', #數(shù)據(jù)刷入hdfs的時間間隔
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', #設(shè)置時區(qū)
'sink.partition-commit.policy.kind'='metastore,success-file',
'auto-compaction'='true',
'compaction.file-size'='5MB',
'sink.rolling-policy.file-size'='5MB',
'sink.rolling-policy.rollover-interval'='20s',
'sink.rolling-policy.check-interval'='10s'
);
Flink SQL代碼:
### 讀取binlog。先確認自己要配置的mysql是否開啟了binlog row模式
CREATE TABLE test_mysql_alert_log_binlog_cdc_v3 (
`id` bigint,
`biz_type` int,
...
`create_time` STRING,
`update_time` STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '數(shù)據(jù)地址', # 修改
'port' = '3306',
'username' = '賬號', # 修改
'password' = '密碼', # 修改
'server-time-zone' = 'Asia/Shanghai',
'database-name' = '數(shù)據(jù)庫', # 修改
'table-name' = 'alert_log', # 修改
'scan.startup.mode' = 'latest-offset'
);
# 創(chuàng)建kafka表,用來接受binlog日志
# 請注意如果kafka集群沒有開啟自動創(chuàng)建topic功能,需要手動創(chuàng)建,命令如下:
# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic cdc_json_test_v3
CREATE TABLE test_kafka_alert_log_binlog_cdc_v3 (
`id` bigint,
`biz_type` int,
...
`create_time` STRING,
`update_time` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'cdc_json_test_v3', # 修改。如果topic關(guān)閉的自動創(chuàng)建,需要手動新創(chuàng)建topic
'properties.group.id' = 'flink', # 修改group id
'scan.startup.mode' = 'latest-offset',# 可以不修改,根據(jù)自身情況而定
'properties.bootstrap.servers' = 'Kafka brokerServer的地址,多個用逗號隔開', # 修改
'format' = 'canal-json',
'canal-json.ignore-parse-errors'='true'
);
# binlog寫入kafka表。這一步完成后會在會占用一個Slot,開啟一個類似sparkstreaming一樣常駐任務(wù)在jobs中執(zhí)行。
# 這時候可以使用`./kafka-console-consumer.sh --bootstrap-server brokerServer:port --topic cdc_json_test_v3`消費,看看數(shù)據(jù)有沒有到kafka
# 上面這些操作可以選擇在flink sql client中先測試,測試過程中直接查詢kafka表也可以查閱有沒有數(shù)據(jù)流入
INSERT INTO
test_kafka_alert_log_binlog_cdc_v3
SELECT
*
FROM
test_mysql_alert_log_binlog_cdc_v3;
# 轉(zhuǎn)換kafka binlog格式。不然寫不進hive。
#注意WATERMARK一定要設(shè)置,不然數(shù)據(jù)寫入hdfs后hive metastore無法感知,從而沒辦法查詢到數(shù)據(jù)
CREATE TABLE test_kafka_alert_log_binlog_cdc_convert3 (
`data` ARRAY<ROW<id bigint,biz_type string,...,create_time string,update_time string>>,
`type` string,
`ts` as TO_TIMESTAMP(CONVERT_TZ(RTRIM(REGEXP_REPLACE(data[1]['create_time'],'T|Z',' ')), 'UTC', 'Asia/Shanghai'),'yyyy-MM-dd HH:mm:ss'),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'cdc_json_test_v3',
'properties.group.id' = 'flink_convert',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'kafka地址',
'format' = 'json'
);
# 創(chuàng)建hive catalog。創(chuàng)建后flink就可以使用hive_catalog.庫名.表名的方式訪問hive數(shù)據(jù)了
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'default-database' = 'ods',
'hive-conf-dir' = '/usr/hdp/3.1.4.0-315/hive/conf' # 修改
);
# 數(shù)據(jù)從kafka寫入hive
insert into hive_catalog.ods.kafka_hive_cdc_v3
select
id,
biz_type,
...
create_time,
update_time,
type,
DATE_FORMAT(ts, 'yyyy-MM-dd'),
DATE_FORMAT(ts, 'HH')
from default_catalog.default_database.test_kafka_alert_log_binlog_cdc_convert3
cross join UNNEST(`data`) AS t(id, biz_type, ...,`create_time`,`update_time`);
- flinkSQL依賴
這些依賴有一部分在flink lib下面就有,直接拿出來引用即可。由于不是所有的依賴都可以在倉庫里面找到,并且還有一些修改過的jar包。生產(chǎn)環(huán)境建議自己在maven私服上傳這些包,然后通過pom坐標引用會比較方便
-rw-r--r--@ 1 limin staff 164K Nov 30 16:56 antlr-runtime-3.5.2.jar
-rw-r--r--@ 1 limin staff 7.4M Feb 7 16:46 flink-connector-hive_2.11-1.13.6.jar
-rw-r--r--@ 1 limin staff 90K Feb 4 2022 flink-csv-1.13.6.jar
-rw-r--r--@ 1 limin staff 110M Feb 4 2022 flink-dist_2.11-1.13.6.jar
-rw-r--r--@ 1 limin staff 145K Feb 4 2022 flink-json-1.13.6.jar
-rw-r--r--@ 1 limin staff 7.4M May 7 2021 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r--@ 1 limin staff 47M Dec 12 16:10 flink-sql-connector-hive-3.1.2_2.11-1.13.6_update.jar
-rw-r--r--@ 1 limin staff 3.5M Dec 14 15:20 flink-sql-connector-kafka_2.11-1.13.6.jar
-rw-r--r--@ 1 limin staff 22M Nov 28 10:57 flink-sql-connector-mysql-cdc-2.3.0.jar
-rw-r--r--@ 1 limin staff 39M Feb 4 2022 flink-table-blink_2.11-1.13.6.jar
-rw-r--r--@ 1 limin staff 35M Feb 4 2022 flink-table_2.11-1.13.6.jar
-rw-r--r--@ 1 limin staff 2.6M Dec 12 11:57 guava-28.0-jre.jar
-rw-r--r--@ 1 limin staff 22M Feb 7 18:23 hadoop-client-runtime-3.1.1.3.1.4.0-315.jar
-rw-r--r--@ 1 limin staff 1.6M Dec 12 17:16 hadoop-mapreduce-client-core-3.1.1.3.1.4.0-315.jar
-rw-r--r--@ 1 limin staff 40M Nov 30 16:55 hive-exec-3.1.0.3.1.4.0-315.jar
-rw-r--r--@ 1 limin staff 306K Nov 30 16:55 libfb303-0.9.3.jar
-rw-r--r--@ 1 limin staff 203K Jan 13 2022 log4j-1.2-api-2.17.1.jar
-rw-r--r--@ 1 limin staff 295K Jan 7 2022 log4j-api-2.17.1.jar
-rw-r--r--@ 1 limin staff 1.7M Jan 7 2022 log4j-core-2.17.1.jar
-rw-r--r--@ 1 limin staff 24K Jan 7 2022 log4j-slf4j-impl-2.17.1.jar
注意:由于guava包沖突問題,使用過程中報
com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Ob...
最小改動是僅替換Preconditions類即可。給它手動加入缺少的checkArgument(String,Object)方法 。修改后的flink-sql-connector-hive-3.1.2_2.11-1.13.6.jar增加了_update后綴。
-
Streampark上配置的效果。
flink SQL代碼如下圖的方式填寫即可。作業(yè)依賴可以填寫maven坐標也可以自己上傳jar包
完整效果
這里streampark考慮到每次做類似的作業(yè)時,jar包/pom反復上傳的問題,做了一個復制能功能??梢钥寺∫呀?jīng)存在的任務(wù)。如果能開發(fā)一個資源管理工具,我們把不同類型的作業(yè)需要的資源進行歸類,然后在創(chuàng)建作業(yè)的時候直接選擇資源,可能靈活性便捷性會更高。
完成
到這里就完成了flink sql cdc任務(wù)的創(chuàng)建。保存成功后可以在列表頁面編譯并運行了。在生產(chǎn)運行模式請選擇application模式,資源獨享,比較可靠。關(guān)鍵配置
配置1:在streampark的動態(tài)參數(shù)中設(shè)置checkpoint模式并啟用
-Dexecution.checkpointing.interval=5000 -Dexecution.checkpointing.mode=EXACTLY_ONCE
配置2: WATERMARK一定要設(shè)置
沒注意看的同學請自己往回翻flink sql,仔細查看。WATERMARK如果不設(shè)置,數(shù)據(jù)寫入hdfs后hive metastore無法感知,hive client查詢不到數(shù)據(jù)。查閱源碼發(fā)現(xiàn),原因是因為flink在判斷要不要通知hive metastore的時候,是根據(jù)WATERMARK的時間來判定要不要執(zhí)行commit操作通知hive metastore。這個在flink官網(wǎng)有介紹,不過我想大部分人沒什么耐性把文章仔細看一遍,所以這里特意指出來。
Step 4 : 生產(chǎn)環(huán)境配置注意點
1、Streampark
許多同學在嘗試CDC的時候會直接使用flink sql client進行嘗試,然后把代碼通過streampark進行托管提交。那么client中用的set語法在streampark可能是不管用的,比如checkpoint配置。那么就需要在動態(tài)參數(shù)中通過 -D參數(shù)=值的方式設(shè)置
2、Flink
以下幾個項請根據(jù)建議直接配置在/opt/flink/conf/flink-conf.yaml文件中。比較常用
state.checkpoints.dir:checkpoints寫出的目錄位置,比如:hdfs://ns/flink/flink-checkpoints
state.savepoints.dir:savepoints寫出的目錄位置,比如:hdfs://ns/flink/flink-checkpoints,一般與上面的值配置一致
execution.checkpointing.mode:檢查點模式,默認值EXACTLY_ONCE,可選項AT_LEAST_ONCE
execution.checkpointing.interval:checkpoint的時間間隔,單位毫秒,比如5000
state.checkpoints.num-retained:checkpoint保留的個數(shù),默認值1,不過建議比如設(shè)置為3,防止想要恢復更久前的狀態(tài),或者最近的一個checkpoint被誤刪除等情況。需要設(shè)置在flink-conf.yml文件中
execution.checkpointing.externalized-checkpoint-retention:該配置項定義了在任務(wù)取消(cancel,注意不是job failed是主動的cancel)時如何清理外部化的檢查點。一般我們會配置為RETAIN_ON_CANCELLATION,即cancel時保留檢查點。而DELETE_ON_CANCELLATION則表示cancel任務(wù)時刪除檢查點,只有在任務(wù)失敗時,才會被保留。
需要注意的是,雖然execution.checkpointing.mode等參數(shù)可以在Streampark的動態(tài)參數(shù)中設(shè)置,但state.checkpoints.num-retained和execution.checkpointing.interval在動態(tài)參數(shù)中設(shè)置,是沒有效果的(大坑)。需要直接寫在flink-conf.yaml中。所以如果不是經(jīng)常調(diào)的參數(shù),建議直接配置在flink-conf.yaml中。這個要點對新手來說非常不友好,完全不懂那些在streampark中配置動態(tài)參數(shù)不生效。如果streampark能完全托管,或者明確哪些配置在哪里配,會更加容易上手。
效果與總結(jié)
在生產(chǎn)環(huán)境試跑了1周左右的時間,數(shù)據(jù)幾乎沒有延遲(測試表每天大概1500w左右的更新量,延遲時間在checkpoint時間周期左右),也沒有出現(xiàn)宕機的問題。checkpoin成功率100%。我們打算在季度末結(jié)合幾個業(yè)務(wù)場景進行實戰(zhàn)測試。streampark2.0全程使用下來要比streamx好用很多。streamx存在hive語法支持不全、jar包文件類型識別不全等一系列小問題,這些問題都需要自己編譯修改源碼暫時規(guī)避。但在2.0的版本做了修復,還有一系列新功能和其它問題修復。 強烈推薦使用2.0版本。
后續(xù)打算
初次使用streampark到生產(chǎn)環(huán)境。前期打算就寫一個可以自動生成cdc腳本的代碼配合streampark在生產(chǎn)環(huán)境上先試運行,畢竟cdc的sql也太繁瑣了,自己寫真的很容易出錯。后期結(jié)合streampark的api和公司自建的數(shù)據(jù)中臺做集成。實現(xiàn)一站式開發(fā)的目的。一起期待我的后續(xù)文章吧


