otter相關(guān)

otter安裝筆記

一、環(huán)境準(zhǔn)備:

1、下載初始化otter庫(kù)sql,地址:https://raw.github.com/alibaba/otter/master/manager/deployer/src/main/resources/sql/otter-manager-schema.sql,執(zhí)行即可

2、下載manage和node,地址:https://github.com/alibaba/otter/releases/

3、manger和node可以分不同機(jī)器安裝,可以放一臺(tái)機(jī)器安裝,所在機(jī)器必須安裝jdk,配置好環(huán)境變量path,classpath都必須配置好

4、依賴(lài)zk,需要zk環(huán)境,單臺(tái),集群都可以

二、manger配置

1、mkdir ~/manager

2、tar zxvf manager.deployer-4.2.17.tar.gz

3、 vi ~/manager/conf/otter.properties

         ##修改為正確訪問(wèn)ip,生成URL使用,node的配置需要用到
         otter.domainName= 內(nèi)網(wǎng)ip
        ##為node連接manager的端口, node的配置需要用到
         otter.communication.manager.port= 1099
         ##manage頁(yè)面的訪問(wèn)端口
         otter.port =8080
         ##修改為正確數(shù)據(jù)庫(kù)信息
         otter.database.driver.class.name = com.mysql.jdbc.Driver
         otter.database.driver.url = jdbc:mysql://127.0.01:3306/ottermanager
         otter.database.driver.username = root
         otter.database.driver.password = hello
       
         ##配置zookeeper集群機(jī)器,配置一個(gè)即可
         otter.zookeeper.cluster.default= 127.0.0.1:2181

其他配置默認(rèn)即可

4、啟動(dòng) ~/manager/bin/startup.sh,看日志tail -f ~/manager/logs/manager.log

5、訪問(wèn)otter.domainName:otter.port看看控制臺(tái),默認(rèn)登錄是admin/admin,sql中寫(xiě)死的,可以執(zhí)行sql時(shí)修改

6、登錄進(jìn)去先配置zk集群

image.png

集群名字隨便取
ZooKeeper集群:如果是單臺(tái),就配置一個(gè)也可以,多個(gè)以;號(hào)結(jié)束,單個(gè)也要分號(hào)結(jié)束

7、再配置node


image.png

機(jī)器名稱(chēng):可以隨意定義,方便自己記憶即可
機(jī)器ip:對(duì)應(yīng)node節(jié)點(diǎn)將要部署的機(jī)器ip,如果有多ip時(shí),可選擇其中一個(gè)ip進(jìn)行暴露. (此ip是整個(gè)集群通訊的入口,實(shí)際情況千萬(wàn)別使用127.0.0.1,否則多個(gè)機(jī)器的node節(jié)點(diǎn)會(huì)無(wú)法識(shí)別)
機(jī)器端口:對(duì)應(yīng)node節(jié)點(diǎn)將要部署時(shí)啟動(dòng)的數(shù)據(jù)通訊端口,建議值:2088
下載端口:對(duì)應(yīng)node節(jié)點(diǎn)將要部署時(shí)啟動(dòng)的數(shù)據(jù)下載端口,建議值:9090
外部ip :對(duì)應(yīng)node節(jié)點(diǎn)將要部署的機(jī)器ip,存在的一個(gè)外部ip,允許通訊的時(shí)候走公網(wǎng)處理。
zookeeper集群:為提升通訊效率,不同機(jī)房的機(jī)器可選擇就近的zookeeper集群.
node這種設(shè)計(jì),是為解決單機(jī)部署多實(shí)例而設(shè)計(jì)的,允許單機(jī)多node指定不同的端口

注意點(diǎn):
otter.domainName配置的地址,頁(yè)面訪問(wèn)都會(huì)使用這個(gè)地址,如果配置了內(nèi)網(wǎng),或者ip外網(wǎng)不能訪問(wèn),可以在manager.deployer-4.2.15\webapp\WEB-INF\common\uris.xml 配置文件中,修改一行 <serverURI>http://192.168.99.1:${otter.port}/</serverURI>,即可解決

三、node配置
1、 mkdir ~ /node
2、tar zxvf node.deployer-4.2.15.tar.gz
3、vi ~/otter/conf/otter.properties
# node的安裝目錄
otter.nodeHome = ${user.dir}/node
#manager的服務(wù)地址,manage中otter.properties配置的otter.domainName:otter.port
otter.manager.address = 127.0.0.1:1099
4、配置nid


image.png

通過(guò)manage配置node操作后,獲取到了node節(jié)點(diǎn)對(duì)應(yīng)的唯一標(biāo)示,稱(chēng)之為node id,簡(jiǎn)稱(chēng)nid,比如我添加的機(jī)器對(duì)應(yīng)序號(hào)為1
執(zhí)行echo 1 >~/node/conf/nid , 保存到conf目錄下的nid文件;一個(gè)node一個(gè)序號(hào)一個(gè)文件
執(zhí)行好如圖:


image.png

5、啟動(dòng)node,/node/bin/startup.sh
6、查看node是否啟動(dòng)
image.png

注意:因?yàn)閚ode是注冊(cè)到zk可能會(huì)慢,需要等待一會(huì)才能變成已啟動(dòng)

四、配置同步任務(wù)
1、環(huán)境準(zhǔn)備
搭建一個(gè)數(shù)據(jù)庫(kù)同步任務(wù),源數(shù)據(jù)庫(kù)必須開(kāi)啟binlog,并且binlog_format為ROW,設(shè)置server_id,即在mysql的配置文件新增/修改以下配置
log-bin=mysql-bin
binlog-format=ROW
server_id=12314123 //保證同步的源庫(kù)/目標(biāo)庫(kù) id不一樣即可
2、添加canal


image.png

注意:如果zk修改了名稱(chēng),這里需要把zookeeper集群下拉框重新選擇編輯下,否則會(huì)出現(xiàn)同步異常相關(guān)的錯(cuò)誤
3、添加數(shù)據(jù)源


image.png

一般我按庫(kù)名新增,也可以從實(shí)例級(jí)別配置即可
4、配置表

image.png

table示例說(shuō)明
單表配置:alibaba.product
分表配置:alibaba[1-64].product , alibaba.product[01-32]
正則配置:(.).(.)
schema name和table name都設(shè)置成.*表示全庫(kù)同步
5、配置channel
image.png

6、配置pipeline
image.png

image.png

7、添加映射關(guān)系
image.png

如果不是自定義,就不需要填寫(xiě)EventProcessor
8、啟動(dòng)channel即可
啟動(dòng)后,點(diǎn)擊channel看pipeline是否工作中,并且最后位點(diǎn)時(shí)間有時(shí)間,無(wú)時(shí)間顯示代表未成功


image.png

擴(kuò)展功能-自定義EventProcessor

因?yàn)楣臼嵌囫R甲,而且主鍵id沒(méi)有設(shè)置步長(zhǎng)和起始id,導(dǎo)致無(wú)法用自帶的otter字段映射進(jìn)行主鍵同步,所以用了otter的擴(kuò)展功能,根據(jù)主鍵id和馬甲字段projectName進(jìn)行聯(lián)合主鍵同步,此功能不支持新增唯一判斷,需要在目標(biāo)表手動(dòng)建立聯(lián)合唯一索引

1、新建maven project
2、添加pom
    <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>shared.etl</artifactId>
            <version>4.2.15</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba.otter/node.extend -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>node.extend</artifactId>
            <version>4.2.15</version>
        </dependency>

3、自己抽象一個(gè)父類(lèi)

import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.alibaba.otter.node.extend.processor.AbstractEventProcessor;
import com.alibaba.otter.shared.etl.model.EventColumn;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.EventType;

/**
 * 通用otter同步類(lèi),必須存在馬甲字段,否則同步失敗
 * 
 * @author weihui
 *
 */
public class OtterEventProcessor extends AbstractEventProcessor {

    public OtterEventProcessor() {

    }

    public OtterEventProcessor(String[] replaceColumnArr) {
        if (replaceColumnArr != null) {
            Set<String> set = new HashSet<String>();
            for (String columnName : replaceColumnArr) {
                set.add(columnName);
            }
            this.replaceColumnList = set;
        }
    }

    private EventColumn key;// 源表主鍵ID

    private Map<String, EventColumn> map = new HashMap<String, EventColumn>();// 所有列

    private String typeName = "project_name";// 馬甲名,默認(rèn)project_name

    private Set<String> replaceColumnList;// 自定義同步字段

    private void init(EventData eventData) {
        key = eventData.getKeys().get(0);

        for (EventColumn column : eventData.getColumns()) {
            map.put(column.getColumnName(), column);
        }
    }

    @Override
    public boolean process(EventData eventData) {
        System.out.println("db=" + eventData.getSchemaName() + ",table="
                + eventData.getTableName() + ",event="
                + eventData.getEventType());
        if (eventData.getEventType() == EventType.INSERT
                || eventData.getEventType() == EventType.UPDATE
                || eventData.getEventType() == EventType.DELETE) {

            init(eventData);

            replaceKey(eventData);

            replaceCols(eventData);
            return true;
        } else {
            return false;
        }
    }

    private void replaceKey(EventData eventData) {
        EventColumn projectNameColumn = new EventColumn();
        String typeValue = map.get(typeName).getColumnValue();
        System.out.println(key.getColumnName() + "=" + key.getColumnValue()
                + "," + typeName + "=" + typeValue);
        projectNameColumn.setColumnValue(typeValue);
        projectNameColumn.setColumnType(Types.VARCHAR);
        projectNameColumn.setColumnName(typeName);
        projectNameColumn.setKey(true);

        List<EventColumn> keys = new ArrayList<EventColumn>();
        keys.add(key);
        keys.add(projectNameColumn);
        eventData.setKeys(keys);
        eventData.setOldKeys(keys);//必須設(shè)置oldkeys,否則修改不了
    }

    private void replaceCols(EventData eventData) {
        if (replaceColumnList == null || replaceColumnList.isEmpty()) {// 沒(méi)有自定義同步字段,默認(rèn)同步所有字段
            replaceColumnList = map.keySet();
        }
        List<EventColumn> cols = new ArrayList<EventColumn>();
        for (String columnName : replaceColumnList) {
            EventColumn column = map.get(columnName);
            if (typeName.equals(column.getColumnName())) {// 排除馬甲字段,因?yàn)樽兂陕?lián)合主鍵已經(jīng)會(huì)替換此字段
                continue;
            }
            if (column != null) {
                cols.add(column);
            }
        }
        eventData.setColumns(cols);
    }
}

4、具體使用

  • 如果是同步所有源表列,在自定義的Event Processor選擇SOURCE寫(xiě)上如下代碼即可
public class ChannelInfoProcessor extends OtterEventProcessor {
}

  • 如果是同步源表指定列,在自定義的Event Processor選擇SOURCE寫(xiě)上如下代碼即可

public class AssetRepaymentProcessor extends OtterEventProcessor {
    public AssetRepaymentProcessor() {
        super(new String[] { "user_id", "asset_order_id", "repayment_amount",
                "repaymented_amount", "repayment_principal",
                "repayment_interest", "plan_late_fee", "true_late_fee",
                "late_fee_apr", "credit_repayment_time", "period",
                "repayment_time", "repayment_real_time", "late_fee_start_time",
                "interest_update_time", "late_day", "created_at", "updated_at",
                "auto_debit_fail_times", "renewal_count", "status",
                "collection", "repayment_no", "grant_time",
                "first_repayment_time" });

    }
}

必須先把OtterEventProcessor 這個(gè)類(lèi)可以打包一個(gè)jar,放入node的lib目錄下,如果是相同的表同步可以用同一個(gè)類(lèi)名,如果是不同表一定要新的類(lèi)名,否則會(huì)同步出問(wèn)題,因?yàn)樽侄尾灰粯樱?當(dāng)然也可以直接寫(xiě)源碼,不需要打jar包,這是封裝的寫(xiě)法

自帶監(jiān)控報(bào)警功能

目前支持郵件發(fā)送
1、添加監(jiān)控,發(fā)送人key在系統(tǒng)管理-系統(tǒng)參數(shù)中配置


image.png

2、配置接收郵箱


image.png

3、manage中的otter.properties配置發(fā)送郵箱,郵箱必須要開(kāi)啟smtp服務(wù),然后配置授權(quán)碼,有的郵箱是直接登錄密碼(比如QQ企業(yè)郵箱)
image.png

manage配置完,要重啟,然后看看日志
image.png

注意:可以會(huì)報(bào)錯(cuò),郵件發(fā)布出去504等報(bào)錯(cuò)

日常積累

1.node的logs目錄下1,2,3目錄,代表pipeline的序號(hào),哪里pipeline報(bào)錯(cuò),就在哪個(gè)目錄下查看日志


image.png

image.png

2.一臺(tái)機(jī)器可以配置多個(gè)node,直接啟動(dòng)即可,前提是同一臺(tái)機(jī)器的端口配置不一樣,從manage的node配置不同端口即可,不同機(jī)器可以配置同一個(gè)端口


image.png

假設(shè)同一臺(tái)多node啟動(dòng)報(bào)錯(cuò),可能是端口占用報(bào)錯(cuò),需要修改上面的端口,找個(gè)不存在的端口使用

3.老版本可以直接升級(jí)新版本,注意manage的端口,node會(huì)配置這個(gè)端口,假設(shè)manage掛了,node沒(méi)掛,同步的時(shí)候,頁(yè)面統(tǒng)計(jì)的數(shù)字沒(méi)法增長(zhǎng),可以看node.log有報(bào)錯(cuò)信息,dubbo連manage失敗,但是不影響數(shù)據(jù)同步,啟動(dòng)manage后就好了

4.pipeline優(yōu)化


image.png

image.png

5.升級(jí)node,manage可以直接升級(jí)

6.pipeline切換node,有可能會(huì)出現(xiàn)同步進(jìn)度中的node還是老node,假設(shè)出現(xiàn)這種問(wèn)題可以先停掉老node,在啟動(dòng)channel

7.新增node保存后可能會(huì)沒(méi)有數(shù)據(jù),可以看看是否有其他node有問(wèn)題,目前我這邊是把老node都停掉,再新增node就沒(méi)問(wèn)題

  1. node的同步進(jìn)度,存在zk中
    [zk: localhost:2181(CONNECTED) 23] get /otter/canal/destinations/crm_canal/2/cursor
    {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"
    10.1.1.17","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000006","position":391302782,"serverId":160623,"timestamp":1545880850000}}

相關(guān)報(bào)錯(cuò)積累

image.png

配置了jdk,但沒(méi)有配置classpath

2018-07-18 09:58:13.392 [New I/O server worker #1-4] WARN  c.a.d.common.threadpool.support.AbortPolicyWithReport -  [DUBBO] Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-127.0.0.1:2088, Pool Size: 50 (active: 50, core: 50, max: 50, largest: 50), Task: 113 (completed: 63), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://127.0.0.1:2088!, dubbo version: 2.5.3, current host: 127.0.0.1

線程池報(bào)錯(cuò),聽(tīng)說(shuō)是15版本的bug,14版本沒(méi)問(wèn)題

pid:2 nid:2 exception:setl:com.google.common.collect.ComputationException: java.lang.ArrayIndexOutOfBoundsException: 0
at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:889)
at com.alibaba.otter.canal.common.zookeeper.ZkClientx.getZkClient(ZkClientx.java:34)
at com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.getZkclientx(CanalInstanceWithManager.java:401)
at com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.initMetaManager(CanalInstanceWithManager.java:121)
at com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.(CanalInstanceWithManager.java:76)
at com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector$1$1.(CanalEmbedSelector.java:139)
at com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector$1.generate(CanalEmbedSelector.java:139)
at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded$1.apply(CanalServerWithEmbedded.java:68)
at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded$1.apply(CanalServerWithEmbedded.java:65)
at com.google.common.collect.ComputingConcurrentHashMap$ComputingValueReference.compute(ComputingConcurrentHashMap.java:356)
at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.compute(ComputingConcurrentHashMap.java:182)
at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.getOrCompute(ComputingConcurrentHashMap.java:151)
at com.google.common.collect.ComputingConcurrentHashMap.getOrCompute(ComputingConcurrentHashMap.java:67)
at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:885)
at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded.start(CanalServerWithEmbedded.java:98)
at com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector.start(CanalEmbedSelector.java:206)
at com.alibaba.otter.node.etl.select.SelectTask.startup(SelectTask.java:170)
at com.alibaba.otter.node.etl.select.SelectTask.run(SelectTask.java:126)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at java.util.Arrays$ArrayList.get(Arrays.java:3841)
at com.alibaba.otter.canal.common.zookeeper.ZooKeeperx.connect(ZooKeeperx.java:68)

zk名稱(chēng)被修改,需要在canal和node等重新選擇一遍(即使一模一樣),在保存下即可

pid:5 nid:1 exception:canal:有零花(ulinghua_cs_online):java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:94)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:137)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:220)
at java.lang.Thread.run(Thread.java:745)

可能是之前同步任務(wù)沒(méi)成功,我們用的rds,默認(rèn)只保留18個(gè)小時(shí),然后后面在啟動(dòng)的時(shí)候,同步進(jìn)度已經(jīng)過(guò)了18個(gè)小時(shí),導(dǎo)致一直報(bào)這個(gè)錯(cuò)誤,解放方案:
停掉同步任務(wù)
進(jìn)入對(duì)應(yīng) Pipeline ,刪除同步進(jìn)度,重新啟動(dòng)即可,但是會(huì)丟失之前的數(shù)據(jù),需要人工同步丟失的數(shù)據(jù)


image.png
pid:8 nid:1 exception:canal:微現(xiàn)金(vxianjin_online):com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.google.common.util.concurrent.UncheckedExecutionException: java.io.IOException: should execute connector.connect() first
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: java.io.IOException: should execute connector.connect() first
at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4832)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache.getTableMeta(TableMetaCache.java:160)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.getTableMeta(LogEventConvert.java:759)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parseRowsEvent(LogEventConvert.java:428)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parse(LogEventConvert.java:114)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parse(LogEventConvert.java:66)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser.parseAndProfilingIfNecessary(AbstractEventParser.java:337)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3$1.sink(AbstractEventParser.java:184)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:145)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:220)
at java.lang.Thread.run(Thread.java:745)

排查下canel的用戶(hù)名密碼是否正確、數(shù)據(jù)源是否連接失敗、白名單配置等等,實(shí)在都沒(méi)問(wèn)題就重啟node解決

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容