Hive SQL 元數(shù)據(jù)血緣管理

模板概述

基于 Antlr4 編譯 hive 相關(guān) xxx.g 文件生成對(duì)應(yīng)的模板,如 hive 源碼中:

image.png

編譯完成生成對(duì)應(yīng) *.java 文件,Antlr4 詳見(jiàn):Antlr4

解析流程

Parser

image.png

重點(diǎn):獲取SELECT操作中的表和列的相關(guān)操作。其他操作這判斷到字段級(jí)別。

  • 實(shí)現(xiàn)思路:對(duì)AST深度優(yōu)先遍歷,遇到操作的token則判斷當(dāng)前的操作,遇到子句則壓棧當(dāng)前處理,處理子句。子句處理完,棧彈出。
  • 處理字句的過(guò)程中,遇到子查詢就保存當(dāng)前子查詢的信息,判斷與其父查詢的關(guān)系,最終形成樹(shù)形結(jié)構(gòu);
  • 遇到字段或者條件處理則記錄當(dāng)前的字段和條件信息、組成Block,嵌套調(diào)用。

TableBlood

重點(diǎn):TableBlood 主要包含 HiveTableNode 與 HiveTableEdge

  • 獲取節(jié)點(diǎn)信息:
    Node
  • 獲取邊緣信息:
    Edge

TableFields

重點(diǎn):來(lái)源字段與目標(biāo)字段依賴

核心邏輯處理過(guò)程:
Field

FieldBloodByTable

重點(diǎn):來(lái)源字段與目標(biāo)字段依賴;獲取血緣關(guān)系節(jié)點(diǎn)和邊緣;

  • 字段血緣
    Field
  • 節(jié)點(diǎn)與邊緣血緣
    BloodTree

使用案例

本項(xiàng)目采用 SQL 通過(guò) spark-submit 引入方式

整體架構(gòu)

Parser

SQL 用例

insert overwrite table temp.d1 select t1.id, t1.name, t2.age,t1.age+t2.age as age from temp.c1 t1 join temp.c2 t2 on t1.id = t2.id;

表依賴獲取

TableBlood tableBlood = bloodEngine.getTableBlood(Arrays.asList(hqls));
        printJsonString(tableBlood);
        SqlSession sqlSession = MybatisSessionFactory.getSqlSessionFactory(PROPERTIES_FILE).openSession();
        TableDependencyMapper mapper = sqlSession.getMapper(TableDependencyMapper.class);
        List<TableDependencyDO> tableDependencyList = new ArrayList<>();
        tableBlood.getEdges().forEach(edge -> {
            TableDependencyDO tableDependencyDO = new TableDependencyDO();
            tableDependencyDO.setDb(edge.getTarget().getDbName());
            tableDependencyDO.setTname(edge.getTarget().getTableName());
            tableDependencyDO.setParentDb(edge.getSource().getDbName());
            tableDependencyDO.setParentTname(edge.getSource().getTableName());
            tableDependencyDO.setExpr("");
            tableDependencyDO.setPtype("");
            tableDependencyDO.setCreateTime(new Date());
            tableDependencyList.add(tableDependencyDO);
        });
        try {
                mapper.batchInsert(tableDependencyList);
                sqlSession.commit();
            } catch (Throwable e) {
                //回滾事務(wù)
                sqlSession.rollback();
                System.out.println("MysqlSinkFunction cause Exception,sqlSession transaction rollback..." + e.getStackTrace());
            } finally {
                sqlSession.close();
            }

結(jié)果

result

獲取字段依賴

FieldBlood fieldBlood = bloodEngine.getFieldBloodByTable(Arrays.asList(hqls), new HiveTable("temp", "d1"));
        printJsonString(fieldBlood);
        SqlSession sqlSession = MybatisSessionFactory.getSqlSessionFactory(PROPERTIES_FILE).openSession();
        TableColumnDependencyMapper mapper = sqlSession.getMapper(TableColumnDependencyMapper.class);
        List<TableColumnDependencyDO> tableColumnDependencyList = new ArrayList<>();
        Set<FieldBloodTree> bloodTrees = new HashSet<>(fieldBlood.values());
        getFieldBloodGraph(tableColumnDependencyList, bloodTrees, null);
        List<TableColumnDependencyDO> suitableTableColumnDependencyList = tableColumnDependencyList.stream().filter(item -> item.getDb() != null).collect(Collectors.toList());
        try {
            mapper.batchInsert(suitableTableColumnDependencyList);
            sqlSession.commit();
        } catch (Throwable e) {
            //回滾事務(wù)
            sqlSession.rollback();
            e.getStackTrace();
        } finally {
            sqlSession.close();
        }

結(jié)果

result

下載地址

鏈接: https://pan.baidu.com/s/1vrQ1vEEI4RxST6Wqlcju0w 提取碼: txkd 復(fù)制這段內(nèi)容后打開(kāi)百度網(wǎng)盤手機(jī)App,操作更方便哦

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容