FlinkSql讀取Map類型數(shù)據(jù)

家樓頂

需求

源kafka消息數(shù)據(jù)中有個(gè)字段是Map類型, 我希望讀取該字段并且寫入clickhouse中, 這是個(gè)Map<String, Object>類型的數(shù)據(jù), 還有可能是嵌套結(jié)構(gòu)。就像這樣

{
    "name":"hello",
    "info": {
        "age": 18,
        "gender": "male",
        "other": {
            "car": "川A8888888",
            "what": 100
        }
    }
}

問題

flinksql原生支持Map類型, 但是必須制定key和value的類型, 無法滿足需求。所以打算以字符串的方式寫入clickhouse, 查詢的時(shí)候再解析, 于是我在sql定義中將map類型的數(shù)據(jù)類型寫成String。就像這樣

CREATE TABLE long_long_ago (
    name STRING,
    info STRING
) WITH (…)

可事情沒有向預(yù)計(jì)的方向發(fā)展, 程序不報(bào)錯(cuò), 可是寫入到clickhouse中的info字段為空, 一臉懵逼, 開始進(jìn)入正題。

方案

為什么將Map類型的數(shù)據(jù)定義成String后解析出來的數(shù)據(jù)是空呢?直接看代碼, 不一會(huì)兒就定位到所在代碼的位置了。

所在包:flink-json
所在類:org.apache.flink.formats.json.JsonRowDataDeserializationSchema
所在方法:convertToString
原因: 
  因?yàn)閒link解析出來的info本質(zhì)還是jsonNode, 即使我們?cè)趕ql中定義其為String。     
  而jsonNode的asText方法是沒實(shí)現(xiàn)的(也就是空)。
  所以適當(dāng)?shù)男薷囊幌麓a就可以了, jsonNode實(shí)現(xiàn)了toString類。

修改如下

private StringData convertToString(JsonNode jsonNode) {
   if (jsonNode.asText() == "") {
      return StringData.fromString(jsonNode.toString());
   } else {
      return StringData.fromString(jsonNode.asText());
   }
}

修改完上面的方法后, 打包flink-json包, 替換jar包。

好了, 現(xiàn)在flink程序就可以將Map對(duì)象轉(zhuǎn)成String, 然后落地到clickhouse了

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容