Apache Doris 代碼倉庫地址:apache/incubator-doris 歡迎大家關(guān)注加星
本文使用的GO是1.17.2
Doris 0.15.0 release版
Doris的數(shù)據(jù)導(dǎo)入有各種語言的版本,但是GO語言版本的基本見不到,簡(jiǎn)單學(xué)了一下,寫了一個(gè)簡(jiǎn)單的Stream Load入庫的示例,僅供參考
示例中使用的表結(jié)構(gòu):
CREATE TABLE IF NOT EXISTS user_info
(
user_id LARGEINT NOT NULL COMMENT "用戶id",
username varchar(50) NOT NULL COMMENT "用戶名",
city VARCHAR(20) COMMENT "用戶所在城市",
age SMALLINT COMMENT "用戶年齡",
sex TINYINT COMMENT "用戶性別",
phone LARGEINT COMMENT "電話",
address VARCHAR(500) COMMENT "地址",
register_time datetime COMMENT "用戶注冊(cè)時(shí)間"
)
Unique KEY(user_id, username)
DISTRIBUTED BY HASH(user_id) BUCKETS 3
PROPERTIES (
"replication_num" = "3"
);
下面是GO的示例代碼,其中支持從文件導(dǎo)入,從內(nèi)存數(shù)據(jù)導(dǎo)入,同時(shí)提供了獲取BE節(jié)點(diǎn)列表的方法,你在導(dǎo)入的時(shí)候可以從這里隨機(jī)獲取一個(gè)BE節(jié)點(diǎn)IP及端口,直連BE進(jìn)行導(dǎo)入
package main
import (
"container/list"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/gofrs/uuid"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
)
type StreamLoad struct {
url string
dbName string
tableName string
data string
userName string
password string
}
//實(shí)現(xiàn)Doris用戶認(rèn)證信息
func auth(load StreamLoad) string {
s := load.userName + ":" + load.password
b := []byte(s)
sEnc := base64.StdEncoding.EncodeToString(b)
fmt.Printf("enc=[%s]\n", sEnc)
sDec, err := base64.StdEncoding.DecodeString(sEnc)
if err != nil {
fmt.Printf("base64 decode failure, error=[%v]\n", err)
} else {
fmt.Printf("dec=[%s]\n", sDec)
}
return sEnc
}
//使用Stream load將文件數(shù)據(jù)導(dǎo)入到Doris對(duì)應(yīng)的數(shù)據(jù)表中
func batch_load_file(load StreamLoad, file string) {
client := &http.Client{}
//生成要訪問的url
url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
//fmt.Formatter(.Format(url,load.dbName,l))
fileContext, err := ioutil.ReadFile(file)
if err != nil {
log.Println("Failed to Read the File", file, err)
}
record := strings.NewReader(string(fileContext))
//提交請(qǐng)求
reqest, err := http.NewRequest(http.MethodPut, url, record)
//增加header選項(xiàng)
reqest.Header.Add("Authorization", "basic "+auth(load))
reqest.Header.Add("EXPECT", "100-continue")
var u1 = uuid.Must(uuid.NewV4())
reqest.Header.Add("label", u1.String())
reqest.Header.Add("column_separator", ",")
if err != nil {
panic(err)
}
//處理返回結(jié)果
response, _ := client.Do(reqest)
if response.StatusCode == 200 {
body, _ := ioutil.ReadAll(response.Body)
responseBody := ResponseBody{}
jsonStr := string(body)
err := json.Unmarshal([]byte(jsonStr), &responseBody)
if err != nil {
fmt.Println(err.Error())
}
if responseBody.Status == "Success" {
//如果有被過濾的數(shù)據(jù),打印錯(cuò)誤的URL
if responseBody.NumberFilteredRows > 0 {
fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
} else {
fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows)
}
}
fmt.Println(string(body))
}
defer response.Body.Close()
}
//內(nèi)存流數(shù)據(jù),通過Stream Load導(dǎo)入Doris表中
func batch_load_data(load StreamLoad, data string) {
client := &http.Client{}
//生成要訪問的url
url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
//fmt.Formatter(.Format(url,load.dbName,l))
record := strings.NewReader(data)
//提交請(qǐng)求
reqest, err := http.NewRequest(http.MethodPut, url, record)
//增加header選項(xiàng)
reqest.Header.Add("Authorization", "basic "+auth(load))
reqest.Header.Add("EXPECT", "100-continue")
var u1 = uuid.Must(uuid.NewV4())
reqest.Header.Add("label", u1.String())
reqest.Header.Add("column_separator", ",")
if err != nil {
panic(err)
}
//處理返回結(jié)果
response, _ := client.Do(reqest)
if response.StatusCode == 200 {
body, _ := ioutil.ReadAll(response.Body)
responseBody := ResponseBody{}
jsonStr := string(body)
err := json.Unmarshal([]byte(jsonStr), &responseBody)
if err != nil {
fmt.Println(err.Error())
}
if responseBody.Status == "Success" {
//如果有被過濾的數(shù)據(jù),打印錯(cuò)誤的URL
if responseBody.NumberFilteredRows > 0 {
fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
} else {
fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows)
}
} else {
fmt.Printf("Error Message : %s \n", responseBody.Message)
fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
}
//fmt.Println(jsonStr)
}
defer response.Body.Close()
}
//獲取BE列表
func get_doris_be_list() *list.List {
var load StreamLoad
load.userName = "root"
load.password = ""
client := &http.Client{}
//生成要訪問的url
url := "http://10.220.146.10:8030/api/backends?is_alive=true"
//提交請(qǐng)求
reqest, err := http.NewRequest("GET", url, nil)
//增加header選項(xiàng)
reqest.Header.Add("Authorization", "basic "+auth(load))
if err != nil {
panic(err)
}
//處理返回結(jié)果
response, _ := client.Do(reqest)
bes := list.New()
if response.StatusCode == 200 {
body, _ := ioutil.ReadAll(response.Body)
backends := Backend{}
jsonStr := string(body)
err := json.Unmarshal([]byte(jsonStr), &backends)
if err != nil {
fmt.Println(err.Error())
}
for _, beinfo := range backends.Data.Backends {
be := beinfo.IP + ":" + strconv.Itoa(beinfo.HTTPPort)
bes.PushBack(be)
}
}
defer response.Body.Close()
return bes
}
//Stream load返回消息結(jié)構(gòu)體
type ResponseBody struct {
TxnID int `json:"TxnId"`
Label string `json:"Label"`
Status string `json:"Status"`
Message string `json:"Message"`
NumberTotalRows int `json:"NumberTotalRows"`
NumberLoadedRows int `json:"NumberLoadedRows"`
NumberFilteredRows int `json:"NumberFilteredRows"`
NumberUnselectedRows int `json:"NumberUnselectedRows"`
LoadBytes int `json:"LoadBytes"`
LoadTimeMs int `json:"LoadTimeMs"`
BeginTxnTimeMs int `json:"BeginTxnTimeMs"`
StreamLoadPutTimeMs int `json:"StreamLoadPutTimeMs"`
ReadDataTimeMs int `json:"ReadDataTimeMs"`
WriteDataTimeMs int `json:"WriteDataTimeMs"`
CommitAndPublishTimeMs int `json:"CommitAndPublishTimeMs"`
ErrorURL string `json:"ErrorURL"`
}
//獲取BE列表返回結(jié)構(gòu)體
type Backend struct {
Msg string `json:"msg"`
Code int `json:"code"`
Data struct {
Backends []struct {
IP string `json:"ip"`
HTTPPort int `json:"http_port"`
IsAlive bool `json:"is_alive"`
} `json:"backends"`
} `json:"data"`
Count int `json:"count"`
}
func main() {
var load StreamLoad
load.userName = "root"
load.password = ""
//auth_info := auth(load)
//fmt.Println(auth_info)
//backends := get_doris_be_list()
//for e := backends.Front(); e != nil; e = e.Next() {
// fmt.Println(e.Value)
//}
data := "10001,張***,西安,30,1,133****760,陜西省**********,2021-03-12 12:34:12"
batch_load_data(load, data)
//batch_load_file(/load, "/Users/zhangfeng/Downloads/test.csv")
}