canal源碼解析 es adapter sql 字段解惑

當(dāng)使用 cannal adapter 時候 ,esMapping sql 使我感到疑惑 ,binlog 消息 通過這個sql 做了聚合 。 但是 下面這個sql 不是做了查詢所有數(shù)據(jù)? 然后在同步到es中去的?

dataSourceKey: defaultDS        # 源數(shù)據(jù)源的key, 對應(yīng)上面配置的srcDataSources中的值
destination: example            # cannal的instance或者M(jìn)Q的topic
esMapping:
  _index: mytest_user           # es 的索引名稱
  _type: _doc                   # es 的doc名稱
  _id: _id                      # es 的_id, 如果不配置該項(xiàng)必須配置下面的pk項(xiàng)_id則會由es自動分配
#  pk: id                       # 如果不需要_id, 則需要指定一個屬性為主鍵屬性
  # sql映射
  sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
        a.c_time as _c_time, c.labels as _labels from user a
        left join role b on b.id=a.role_id
        left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
        group by user_id) c on c.user_id=a.id"
#  objFields:
#    _labels: array:;           # 數(shù)組或者對象屬性, array:; 代表以;字段里面是以;分隔的
#    _obj: obj:{"test":"123"}
  etlCondition: "where a.c_time>='{0}'"     # etl 的條件參數(shù)
  commitBatch: 3000                         # 提交批大小

一路 從 CanalAdapterLoader , CanalAdapterWorker ,AbstractCanalAdapterWorker,ESAdapter ,最終在 ESSyncService sync 方法找到了答案 , 阿里同學(xué)的代碼 寫的還是很容易看懂的 。 注意下面方法 注釋

   // dml ,binlog msg 轉(zhuǎn)化而成  , config 即是你配置的Es config 文件的配置類
   // binlog msg 最終轉(zhuǎn)化為 對應(yīng)的es insert , update , delete 操作
   public void sync(ESSyncConfig config, Dml dml) {
        try {
            // 如果是按時間戳定時更新則返回
            if (config.getEsMapping().isSyncByTimestamp()) {
                return;
            }

            long begin = System.currentTimeMillis();

            String type = dml.getType();
            if (type != null && type.equalsIgnoreCase("INSERT")) {
                insert(config, dml);
            } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
                update(config, dml);
            } else if (type != null && type.equalsIgnoreCase("DELETE")) {
                delete(config, dml);
            } else {
                return;
            }

            if (logger.isTraceEnabled()) {
                logger.trace("Sync elapsed time: {} ms,destination: {}, es index: {}",
                    (System.currentTimeMillis() - begin),
                    dml.getDestination(),
                    config.getEsMapping().get_index());
            }
        } catch (Throwable e) {
            logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);
            throw new RuntimeException(e);
        }
    }

insert 方法 , 如果 esMapping sql 是單表 所有字段都為簡單字段且 binlog msg 對應(yīng) sql 主表 ,會調(diào)用 singleTableSimpleFiledInsert ,直接插入 es 對應(yīng)索引中去 ,如果binlog msg 對應(yīng)的是 sql 主表且 非簡單字段 ,則 會從esMapping 拿到主鍵(列表) 然后 和sql 中 , msg匹配后 拼接后 ,查詢 sql , 插入es ,如果 binlog msg 對應(yīng)的是sql 的 從表 ,有3種更新方法 ,有點(diǎn)小看阿里同學(xué)了 ,這塊設(shè)計,我之前沒想到 。


    /**
     * 插入操作dml
     *
     * @param config es配置
     * @param dml dml數(shù)據(jù)
     */
    private void insert(ESSyncConfig config, Dml dml) {
        List<Map<String, Object>> dataList = dml.getData();
        if (dataList == null || dataList.isEmpty()) {
            return;
        }
        SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
        for (Map<String, Object> data : dataList) {
            if (data == null || data.isEmpty()) {
                continue;
            }

            if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
                // ------單表 & 所有字段都為簡單字段------
                singleTableSimpleFiledInsert(config, dml, data);
            } else {
                // ------是主表 查詢sql來插入------
                if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
                    mainTableInsert(config, dml, data);
                }

                // 從表的操作
                for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
                    if (tableItem.isMain()) {
                        continue;
                    }
                    if (!tableItem.getTableName().equals(dml.getTable())) {
                        continue;
                    }
                    // 關(guān)聯(lián)條件出現(xiàn)在主表查詢條件是否為簡單字段
                    boolean allFieldsSimple = true;
                    for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                        if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
                            allFieldsSimple = false;
                            break;
                        }
                    }
                    // 所有查詢字段均為簡單字段
                    if (allFieldsSimple) {
                        // 不是子查詢
                        if (!tableItem.isSubQuery()) {
                            // ------關(guān)聯(lián)表簡單字段插入------
                            Map<String, Object> esFieldData = new LinkedHashMap<>();
                            for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                                Object value = esTemplate.getValFromData(config.getEsMapping(),
                                    data,
                                    fieldItem.getFieldName(),
                                    fieldItem.getColumn().getColumnName());
                                esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
                            }

                            joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
                        } else {
                            // ------關(guān)聯(lián)子表簡單字段插入------
                            subTableSimpleFieldOperation(config, dml, data, null, tableItem);
                        }
                    } else {
                        // ------關(guān)聯(lián)子表復(fù)雜字段插入 執(zhí)行全sql更新es------
                        wholeSqlOperation(config, dml, data, null, tableItem);
                    }
                }
            }
        }
    }

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容