
家樓頂
需求
源kafka消息數(shù)據(jù)中有個(gè)字段是Map類型, 我希望讀取該字段并且寫入clickhouse中, 這是個(gè)Map<String, Object>類型的數(shù)據(jù), 還有可能是嵌套結(jié)構(gòu)。就像這樣
{
"name":"hello",
"info": {
"age": 18,
"gender": "male",
"other": {
"car": "川A8888888",
"what": 100
}
}
}
問題
flinksql原生支持Map類型, 但是必須制定key和value的類型, 無法滿足需求。所以打算以字符串的方式寫入clickhouse, 查詢的時(shí)候再解析, 于是我在sql定義中將map類型的數(shù)據(jù)類型寫成String。就像這樣
CREATE TABLE long_long_ago (
name STRING,
info STRING
) WITH (…)
可事情沒有向預(yù)計(jì)的方向發(fā)展, 程序不報(bào)錯(cuò), 可是寫入到clickhouse中的info字段為空, 一臉懵逼, 開始進(jìn)入正題。
方案
為什么將Map類型的數(shù)據(jù)定義成String后解析出來的數(shù)據(jù)是空呢?直接看代碼, 不一會(huì)兒就定位到所在代碼的位置了。
所在包:flink-json
所在類:org.apache.flink.formats.json.JsonRowDataDeserializationSchema
所在方法:convertToString
原因:
因?yàn)閒link解析出來的info本質(zhì)還是jsonNode, 即使我們?cè)趕ql中定義其為String。
而jsonNode的asText方法是沒實(shí)現(xiàn)的(也就是空)。
所以適當(dāng)?shù)男薷囊幌麓a就可以了, jsonNode實(shí)現(xiàn)了toString類。
修改如下
private StringData convertToString(JsonNode jsonNode) {
if (jsonNode.asText() == "") {
return StringData.fromString(jsonNode.toString());
} else {
return StringData.fromString(jsonNode.asText());
}
}
修改完上面的方法后, 打包flink-json包, 替換jar包。
好了, 現(xiàn)在flink程序就可以將Map對(duì)象轉(zhuǎn)成String, 然后落地到clickhouse了