Flink 自定義UDTF函數(shù) 同步數(shù)組類型到ES中

將Mysql中 test表同步到ES中,并且將tags(逗號(hào)分隔的字符串)轉(zhuǎn)化數(shù)組同步到ES中的數(shù)組。
Mysql中test表結(jié)構(gòu)


CREATE TABLE `test` (
    `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
    `name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
    `tags` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',
    PRIMARY KEY (`id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8mb4 COLLATE=utf8mb4_general_ci

數(shù)據(jù)如下:


image.png

ES中數(shù)據(jù)結(jié)構(gòu)


PUT info-flow-test3
{
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword"
      },
      "tags": {
        "type": "keyword"
      }
    }
  }
}

Flink 中

CREATE TABLE es_info_flow_test3 (
    id string,
    tags ARRAY < string >,
    PRIMARY KEY (id) NOT ENFORCED -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機(jī)值。
  )
WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = '127.0.0.1:9200',
    'index' = 'info-flow-test3',
    'username' = 'elastic',
    'password' = '123456'
  );

CREATE TABLE mysqlcdc_test (
    id INT, tags string, PRIMARY KEY (id) NOT ENFORCED
  )
WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '127.0.0.1',
    'port' = '3306',
    'username' = 'admin',
    'password' = '123456',
    'database-name' = 'test_db',
    'table-name' = 'test'
  );

運(yùn)行Flink任務(wù)腳本如下:

insert into es_info_flow_test3 (id, tags)
select CAST(t.id as STRING) as id, t
from
  mysqlcdc_test t, lateral table (ASI_UDTF (`tags`)) as T (t)

自定義UDTF函數(shù)參考阿里云鏈接,注意需要使用java8
https://help.aliyun.com/document_detail/188055.html
上傳jar包后,如果返回如下表明包可以上傳。

image.png

查看tags的類型

POST info-flow-test3/_search
返回值:
{
  "took" : 0,
  "timed_out" : false,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "info-flow-test3",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : "2",
          "tags" : [
            "3",
            "4",
            "5"
          ]
        }
      },
      {
        "_index" : "info-flow-test3",
        "_type" : "_doc",
        "_id" : "3",
        "_score" : 1.0,
        "_source" : {
          "id" : "3",
          "tags" : [
            "6"
          ]
        }
      }
    ]
  }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容