使用GO語言通過Stream Load實(shí)現(xiàn)Doris數(shù)據(jù)導(dǎo)入

Apache Doris 代碼倉庫地址:apache/incubator-doris 歡迎大家關(guān)注加星


本文使用的GO是1.17.2

Doris 0.15.0 release版

Doris的數(shù)據(jù)導(dǎo)入有各種語言的版本,但是GO語言版本的基本見不到,簡(jiǎn)單學(xué)了一下,寫了一個(gè)簡(jiǎn)單的Stream Load入庫的示例,僅供參考

示例中使用的表結(jié)構(gòu):

CREATE TABLE IF NOT EXISTS user_info
(
    user_id LARGEINT NOT NULL COMMENT "用戶id",
    username varchar(50) NOT NULL COMMENT "用戶名",
    city VARCHAR(20) COMMENT "用戶所在城市",
    age SMALLINT COMMENT "用戶年齡",
    sex TINYINT COMMENT "用戶性別",
    phone LARGEINT COMMENT "電話",
    address VARCHAR(500) COMMENT "地址",
    register_time datetime COMMENT "用戶注冊(cè)時(shí)間"
)
Unique KEY(user_id, username)
DISTRIBUTED BY HASH(user_id) BUCKETS 3
PROPERTIES (
"replication_num" = "3"
);

下面是GO的示例代碼,其中支持從文件導(dǎo)入,從內(nèi)存數(shù)據(jù)導(dǎo)入,同時(shí)提供了獲取BE節(jié)點(diǎn)列表的方法,你在導(dǎo)入的時(shí)候可以從這里隨機(jī)獲取一個(gè)BE節(jié)點(diǎn)IP及端口,直連BE進(jìn)行導(dǎo)入

package main
import (
   "container/list"
   "encoding/base64"
   "encoding/json"
   "fmt"
   "github.com/gofrs/uuid"
   "io/ioutil"
   "log"
   "net/http"
   "strconv"
   "strings"
)

type StreamLoad struct {
   url       string
   dbName    string
   tableName string
   data      string
   userName  string
   password  string
}

//實(shí)現(xiàn)Doris用戶認(rèn)證信息
func auth(load StreamLoad) string {
   s := load.userName + ":" + load.password
   b := []byte(s)

   sEnc := base64.StdEncoding.EncodeToString(b)
   fmt.Printf("enc=[%s]\n", sEnc)

   sDec, err := base64.StdEncoding.DecodeString(sEnc)
   if err != nil {
      fmt.Printf("base64 decode failure, error=[%v]\n", err)
   } else {
      fmt.Printf("dec=[%s]\n", sDec)
   }
   return sEnc
}

//使用Stream load將文件數(shù)據(jù)導(dǎo)入到Doris對(duì)應(yīng)的數(shù)據(jù)表中
func batch_load_file(load StreamLoad, file string) {
   client := &http.Client{}
   //生成要訪問的url
   url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
   //fmt.Formatter(.Format(url,load.dbName,l))
   fileContext, err := ioutil.ReadFile(file)
   if err != nil {
      log.Println("Failed to Read the File", file, err)
   }
   record := strings.NewReader(string(fileContext))
   //提交請(qǐng)求
   reqest, err := http.NewRequest(http.MethodPut, url, record)

   //增加header選項(xiàng)
   reqest.Header.Add("Authorization", "basic "+auth(load))
   reqest.Header.Add("EXPECT", "100-continue")
   var u1 = uuid.Must(uuid.NewV4())
   reqest.Header.Add("label", u1.String())
   reqest.Header.Add("column_separator", ",")

   if err != nil {
      panic(err)
   }
   //處理返回結(jié)果
   response, _ := client.Do(reqest)
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      responseBody := ResponseBody{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &responseBody)
      if err != nil {
         fmt.Println(err.Error())
      }
      if responseBody.Status == "Success" {
         //如果有被過濾的數(shù)據(jù),打印錯(cuò)誤的URL
         if responseBody.NumberFilteredRows > 0 {
            fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
         } else {
            fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows)
         }
      }
      fmt.Println(string(body))
   }

   defer response.Body.Close()
}

//內(nèi)存流數(shù)據(jù),通過Stream Load導(dǎo)入Doris表中
func batch_load_data(load StreamLoad, data string) {
   client := &http.Client{}
   //生成要訪問的url
   url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
   //fmt.Formatter(.Format(url,load.dbName,l))
   record := strings.NewReader(data)
   //提交請(qǐng)求
   reqest, err := http.NewRequest(http.MethodPut, url, record)

   //增加header選項(xiàng)
   reqest.Header.Add("Authorization", "basic "+auth(load))
   reqest.Header.Add("EXPECT", "100-continue")
   var u1 = uuid.Must(uuid.NewV4())
   reqest.Header.Add("label", u1.String())
   reqest.Header.Add("column_separator", ",")

   if err != nil {
      panic(err)
   }
   //處理返回結(jié)果
   response, _ := client.Do(reqest)
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      responseBody := ResponseBody{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &responseBody)
      if err != nil {
         fmt.Println(err.Error())
      }
      if responseBody.Status == "Success" {
         //如果有被過濾的數(shù)據(jù),打印錯(cuò)誤的URL
         if responseBody.NumberFilteredRows > 0 {
            fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
         } else {
            fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows)
         }
      } else {
         fmt.Printf("Error Message : %s \n", responseBody.Message)
         fmt.Printf("Error Data : %s ", responseBody.ErrorURL)
      }
      //fmt.Println(jsonStr)
   }
   defer response.Body.Close()
}

//獲取BE列表
func get_doris_be_list() *list.List {
   var load StreamLoad
   load.userName = "root"
   load.password = ""
   client := &http.Client{}
   //生成要訪問的url
   url := "http://10.220.146.10:8030/api/backends?is_alive=true"
   //提交請(qǐng)求
   reqest, err := http.NewRequest("GET", url, nil)
   //增加header選項(xiàng)
   reqest.Header.Add("Authorization", "basic "+auth(load))
   if err != nil {
      panic(err)
   }
   //處理返回結(jié)果
   response, _ := client.Do(reqest)
   bes := list.New()
   if response.StatusCode == 200 {
      body, _ := ioutil.ReadAll(response.Body)
      backends := Backend{}
      jsonStr := string(body)
      err := json.Unmarshal([]byte(jsonStr), &backends)
      if err != nil {
         fmt.Println(err.Error())
      }
      for _, beinfo := range backends.Data.Backends {
         be := beinfo.IP + ":" + strconv.Itoa(beinfo.HTTPPort)
         bes.PushBack(be)
      }
   }
   defer response.Body.Close()
   return bes
}

//Stream load返回消息結(jié)構(gòu)體
type ResponseBody struct {
   TxnID                  int    `json:"TxnId"`
   Label                  string `json:"Label"`
   Status                 string `json:"Status"`
   Message                string `json:"Message"`
   NumberTotalRows        int    `json:"NumberTotalRows"`
   NumberLoadedRows       int    `json:"NumberLoadedRows"`
   NumberFilteredRows     int    `json:"NumberFilteredRows"`
   NumberUnselectedRows   int    `json:"NumberUnselectedRows"`
   LoadBytes              int    `json:"LoadBytes"`
   LoadTimeMs             int    `json:"LoadTimeMs"`
   BeginTxnTimeMs         int    `json:"BeginTxnTimeMs"`
   StreamLoadPutTimeMs    int    `json:"StreamLoadPutTimeMs"`
   ReadDataTimeMs         int    `json:"ReadDataTimeMs"`
   WriteDataTimeMs        int    `json:"WriteDataTimeMs"`
   CommitAndPublishTimeMs int    `json:"CommitAndPublishTimeMs"`
   ErrorURL               string `json:"ErrorURL"`
}

//獲取BE列表返回結(jié)構(gòu)體
type Backend struct {
   Msg  string `json:"msg"`
   Code int    `json:"code"`
   Data struct {
      Backends []struct {
         IP       string `json:"ip"`
         HTTPPort int    `json:"http_port"`
         IsAlive  bool   `json:"is_alive"`
      } `json:"backends"`
   } `json:"data"`
   Count int `json:"count"`
}

func main() {
   var load StreamLoad
   load.userName = "root"
   load.password = ""
   //auth_info := auth(load)
   //fmt.Println(auth_info)
   //backends := get_doris_be_list()
   //for e := backends.Front(); e != nil; e = e.Next() {
   // fmt.Println(e.Value)
   //}
   data := "10001,張***,西安,30,1,133****760,陜西省**********,2021-03-12 12:34:12"
   batch_load_data(load, data)
   //batch_load_file(/load, "/Users/zhangfeng/Downloads/test.csv")
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容