otter安裝筆記
一、環(huán)境準(zhǔn)備:
1、下載初始化otter庫(kù)sql,地址:https://raw.github.com/alibaba/otter/master/manager/deployer/src/main/resources/sql/otter-manager-schema.sql,執(zhí)行即可
2、下載manage和node,地址:https://github.com/alibaba/otter/releases/
3、manger和node可以分不同機(jī)器安裝,可以放一臺(tái)機(jī)器安裝,所在機(jī)器必須安裝jdk,配置好環(huán)境變量path,classpath都必須配置好
4、依賴(lài)zk,需要zk環(huán)境,單臺(tái),集群都可以
二、manger配置
1、mkdir ~/manager
2、tar zxvf manager.deployer-4.2.17.tar.gz
3、 vi ~/manager/conf/otter.properties
##修改為正確訪問(wèn)ip,生成URL使用,node的配置需要用到
otter.domainName= 內(nèi)網(wǎng)ip
##為node連接manager的端口, node的配置需要用到
otter.communication.manager.port= 1099
##manage頁(yè)面的訪問(wèn)端口
otter.port =8080
##修改為正確數(shù)據(jù)庫(kù)信息
otter.database.driver.class.name = com.mysql.jdbc.Driver
otter.database.driver.url = jdbc:mysql://127.0.01:3306/ottermanager
otter.database.driver.username = root
otter.database.driver.password = hello
##配置zookeeper集群機(jī)器,配置一個(gè)即可
otter.zookeeper.cluster.default= 127.0.0.1:2181
其他配置默認(rèn)即可
4、啟動(dòng) ~/manager/bin/startup.sh,看日志tail -f ~/manager/logs/manager.log
5、訪問(wèn)otter.domainName:otter.port看看控制臺(tái),默認(rèn)登錄是admin/admin,sql中寫(xiě)死的,可以執(zhí)行sql時(shí)修改
6、登錄進(jìn)去先配置zk集群

集群名字隨便取
ZooKeeper集群:如果是單臺(tái),就配置一個(gè)也可以,多個(gè)以;號(hào)結(jié)束,單個(gè)也要分號(hào)結(jié)束
7、再配置node

機(jī)器名稱(chēng):可以隨意定義,方便自己記憶即可
機(jī)器ip:對(duì)應(yīng)node節(jié)點(diǎn)將要部署的機(jī)器ip,如果有多ip時(shí),可選擇其中一個(gè)ip進(jìn)行暴露. (此ip是整個(gè)集群通訊的入口,實(shí)際情況千萬(wàn)別使用127.0.0.1,否則多個(gè)機(jī)器的node節(jié)點(diǎn)會(huì)無(wú)法識(shí)別)
機(jī)器端口:對(duì)應(yīng)node節(jié)點(diǎn)將要部署時(shí)啟動(dòng)的數(shù)據(jù)通訊端口,建議值:2088
下載端口:對(duì)應(yīng)node節(jié)點(diǎn)將要部署時(shí)啟動(dòng)的數(shù)據(jù)下載端口,建議值:9090
外部ip :對(duì)應(yīng)node節(jié)點(diǎn)將要部署的機(jī)器ip,存在的一個(gè)外部ip,允許通訊的時(shí)候走公網(wǎng)處理。
zookeeper集群:為提升通訊效率,不同機(jī)房的機(jī)器可選擇就近的zookeeper集群.
node這種設(shè)計(jì),是為解決單機(jī)部署多實(shí)例而設(shè)計(jì)的,允許單機(jī)多node指定不同的端口
注意點(diǎn):
otter.domainName配置的地址,頁(yè)面訪問(wèn)都會(huì)使用這個(gè)地址,如果配置了內(nèi)網(wǎng),或者ip外網(wǎng)不能訪問(wèn),可以在manager.deployer-4.2.15\webapp\WEB-INF\common\uris.xml 配置文件中,修改一行 <serverURI>http://192.168.99.1:${otter.port}/</serverURI>,即可解決
三、node配置
1、 mkdir ~ /node
2、tar zxvf node.deployer-4.2.15.tar.gz
3、vi ~/otter/conf/otter.properties
# node的安裝目錄
otter.nodeHome = ${user.dir}/node
#manager的服務(wù)地址,manage中otter.properties配置的otter.domainName:otter.port
otter.manager.address = 127.0.0.1:1099
4、配置nid

通過(guò)manage配置node操作后,獲取到了node節(jié)點(diǎn)對(duì)應(yīng)的唯一標(biāo)示,稱(chēng)之為node id,簡(jiǎn)稱(chēng)nid,比如我添加的機(jī)器對(duì)應(yīng)序號(hào)為1
執(zhí)行echo 1 >~/node/conf/nid , 保存到conf目錄下的nid文件;一個(gè)node一個(gè)序號(hào)一個(gè)文件
執(zhí)行好如圖:

5、啟動(dòng)node,/node/bin/startup.sh
6、查看node是否啟動(dòng)

注意:因?yàn)閚ode是注冊(cè)到zk可能會(huì)慢,需要等待一會(huì)才能變成已啟動(dòng)
四、配置同步任務(wù)
1、環(huán)境準(zhǔn)備
搭建一個(gè)數(shù)據(jù)庫(kù)同步任務(wù),源數(shù)據(jù)庫(kù)必須開(kāi)啟binlog,并且binlog_format為ROW,設(shè)置server_id,即在mysql的配置文件新增/修改以下配置
log-bin=mysql-bin
binlog-format=ROW
server_id=12314123 //保證同步的源庫(kù)/目標(biāo)庫(kù) id不一樣即可
2、添加canal

注意:如果zk修改了名稱(chēng),這里需要把zookeeper集群下拉框重新選擇編輯下,否則會(huì)出現(xiàn)同步異常相關(guān)的錯(cuò)誤
3、添加數(shù)據(jù)源

一般我按庫(kù)名新增,也可以從實(shí)例級(jí)別配置即可
4、配置表

table示例說(shuō)明
單表配置:alibaba.product
分表配置:alibaba[1-64].product , alibaba.product[01-32]
正則配置:(.).(.)
schema name和table name都設(shè)置成.*表示全庫(kù)同步
5、配置channel

6、配置pipeline


7、添加映射關(guān)系

如果不是自定義,就不需要填寫(xiě)EventProcessor
8、啟動(dòng)channel即可
啟動(dòng)后,點(diǎn)擊channel看pipeline是否工作中,并且最后位點(diǎn)時(shí)間有時(shí)間,無(wú)時(shí)間顯示代表未成功

擴(kuò)展功能-自定義EventProcessor
因?yàn)楣臼嵌囫R甲,而且主鍵id沒(méi)有設(shè)置步長(zhǎng)和起始id,導(dǎo)致無(wú)法用自帶的otter字段映射進(jìn)行主鍵同步,所以用了otter的擴(kuò)展功能,根據(jù)主鍵id和馬甲字段projectName進(jìn)行聯(lián)合主鍵同步,此功能不支持新增唯一判斷,需要在目標(biāo)表手動(dòng)建立聯(lián)合唯一索引
1、新建maven project
2、添加pom
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>shared.etl</artifactId>
<version>4.2.15</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/node.extend -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>node.extend</artifactId>
<version>4.2.15</version>
</dependency>
3、自己抽象一個(gè)父類(lèi)
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.alibaba.otter.node.extend.processor.AbstractEventProcessor;
import com.alibaba.otter.shared.etl.model.EventColumn;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.EventType;
/**
* 通用otter同步類(lèi),必須存在馬甲字段,否則同步失敗
*
* @author weihui
*
*/
public class OtterEventProcessor extends AbstractEventProcessor {
public OtterEventProcessor() {
}
public OtterEventProcessor(String[] replaceColumnArr) {
if (replaceColumnArr != null) {
Set<String> set = new HashSet<String>();
for (String columnName : replaceColumnArr) {
set.add(columnName);
}
this.replaceColumnList = set;
}
}
private EventColumn key;// 源表主鍵ID
private Map<String, EventColumn> map = new HashMap<String, EventColumn>();// 所有列
private String typeName = "project_name";// 馬甲名,默認(rèn)project_name
private Set<String> replaceColumnList;// 自定義同步字段
private void init(EventData eventData) {
key = eventData.getKeys().get(0);
for (EventColumn column : eventData.getColumns()) {
map.put(column.getColumnName(), column);
}
}
@Override
public boolean process(EventData eventData) {
System.out.println("db=" + eventData.getSchemaName() + ",table="
+ eventData.getTableName() + ",event="
+ eventData.getEventType());
if (eventData.getEventType() == EventType.INSERT
|| eventData.getEventType() == EventType.UPDATE
|| eventData.getEventType() == EventType.DELETE) {
init(eventData);
replaceKey(eventData);
replaceCols(eventData);
return true;
} else {
return false;
}
}
private void replaceKey(EventData eventData) {
EventColumn projectNameColumn = new EventColumn();
String typeValue = map.get(typeName).getColumnValue();
System.out.println(key.getColumnName() + "=" + key.getColumnValue()
+ "," + typeName + "=" + typeValue);
projectNameColumn.setColumnValue(typeValue);
projectNameColumn.setColumnType(Types.VARCHAR);
projectNameColumn.setColumnName(typeName);
projectNameColumn.setKey(true);
List<EventColumn> keys = new ArrayList<EventColumn>();
keys.add(key);
keys.add(projectNameColumn);
eventData.setKeys(keys);
eventData.setOldKeys(keys);//必須設(shè)置oldkeys,否則修改不了
}
private void replaceCols(EventData eventData) {
if (replaceColumnList == null || replaceColumnList.isEmpty()) {// 沒(méi)有自定義同步字段,默認(rèn)同步所有字段
replaceColumnList = map.keySet();
}
List<EventColumn> cols = new ArrayList<EventColumn>();
for (String columnName : replaceColumnList) {
EventColumn column = map.get(columnName);
if (typeName.equals(column.getColumnName())) {// 排除馬甲字段,因?yàn)樽兂陕?lián)合主鍵已經(jīng)會(huì)替換此字段
continue;
}
if (column != null) {
cols.add(column);
}
}
eventData.setColumns(cols);
}
}
4、具體使用
- 如果是同步所有源表列,在自定義的Event Processor選擇SOURCE寫(xiě)上如下代碼即可
public class ChannelInfoProcessor extends OtterEventProcessor {
}
- 如果是同步源表指定列,在自定義的Event Processor選擇SOURCE寫(xiě)上如下代碼即可
public class AssetRepaymentProcessor extends OtterEventProcessor {
public AssetRepaymentProcessor() {
super(new String[] { "user_id", "asset_order_id", "repayment_amount",
"repaymented_amount", "repayment_principal",
"repayment_interest", "plan_late_fee", "true_late_fee",
"late_fee_apr", "credit_repayment_time", "period",
"repayment_time", "repayment_real_time", "late_fee_start_time",
"interest_update_time", "late_day", "created_at", "updated_at",
"auto_debit_fail_times", "renewal_count", "status",
"collection", "repayment_no", "grant_time",
"first_repayment_time" });
}
}
必須先把OtterEventProcessor 這個(gè)類(lèi)可以打包一個(gè)jar,放入node的lib目錄下,如果是相同的表同步可以用同一個(gè)類(lèi)名,如果是不同表一定要新的類(lèi)名,否則會(huì)同步出問(wèn)題,因?yàn)樽侄尾灰粯樱?當(dāng)然也可以直接寫(xiě)源碼,不需要打jar包,這是封裝的寫(xiě)法
自帶監(jiān)控報(bào)警功能
目前支持郵件發(fā)送
1、添加監(jiān)控,發(fā)送人key在系統(tǒng)管理-系統(tǒng)參數(shù)中配置

2、配置接收郵箱

3、manage中的otter.properties配置發(fā)送郵箱,郵箱必須要開(kāi)啟smtp服務(wù),然后配置授權(quán)碼,有的郵箱是直接登錄密碼(比如QQ企業(yè)郵箱)

manage配置完,要重啟,然后看看日志

注意:可以會(huì)報(bào)錯(cuò),郵件發(fā)布出去504等報(bào)錯(cuò)
日常積累
1.node的logs目錄下1,2,3目錄,代表pipeline的序號(hào),哪里pipeline報(bào)錯(cuò),就在哪個(gè)目錄下查看日志


2.一臺(tái)機(jī)器可以配置多個(gè)node,直接啟動(dòng)即可,前提是同一臺(tái)機(jī)器的端口配置不一樣,從manage的node配置不同端口即可,不同機(jī)器可以配置同一個(gè)端口

假設(shè)同一臺(tái)多node啟動(dòng)報(bào)錯(cuò),可能是端口占用報(bào)錯(cuò),需要修改上面的端口,找個(gè)不存在的端口使用
3.老版本可以直接升級(jí)新版本,注意manage的端口,node會(huì)配置這個(gè)端口,假設(shè)manage掛了,node沒(méi)掛,同步的時(shí)候,頁(yè)面統(tǒng)計(jì)的數(shù)字沒(méi)法增長(zhǎng),可以看node.log有報(bào)錯(cuò)信息,dubbo連manage失敗,但是不影響數(shù)據(jù)同步,啟動(dòng)manage后就好了
4.pipeline優(yōu)化


5.升級(jí)node,manage可以直接升級(jí)
6.pipeline切換node,有可能會(huì)出現(xiàn)同步進(jìn)度中的node還是老node,假設(shè)出現(xiàn)這種問(wèn)題可以先停掉老node,在啟動(dòng)channel
7.新增node保存后可能會(huì)沒(méi)有數(shù)據(jù),可以看看是否有其他node有問(wèn)題,目前我這邊是把老node都停掉,再新增node就沒(méi)問(wèn)題
- node的同步進(jìn)度,存在zk中
[zk: localhost:2181(CONNECTED) 23] get /otter/canal/destinations/crm_canal/2/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"
10.1.1.17","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000006","position":391302782,"serverId":160623,"timestamp":1545880850000}}
相關(guān)報(bào)錯(cuò)積累

配置了jdk,但沒(méi)有配置classpath
2018-07-18 09:58:13.392 [New I/O server worker #1-4] WARN c.a.d.common.threadpool.support.AbortPolicyWithReport - [DUBBO] Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-127.0.0.1:2088, Pool Size: 50 (active: 50, core: 50, max: 50, largest: 50), Task: 113 (completed: 63), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://127.0.0.1:2088!, dubbo version: 2.5.3, current host: 127.0.0.1
線程池報(bào)錯(cuò),聽(tīng)說(shuō)是15版本的bug,14版本沒(méi)問(wèn)題
pid:2 nid:2 exception:setl:com.google.common.collect.ComputationException: java.lang.ArrayIndexOutOfBoundsException: 0
at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:889)
at com.alibaba.otter.canal.common.zookeeper.ZkClientx.getZkClient(ZkClientx.java:34)
at com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.getZkclientx(CanalInstanceWithManager.java:401)
at com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.initMetaManager(CanalInstanceWithManager.java:121)
at com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager.(CanalInstanceWithManager.java:76)
at com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector$1$1.(CanalEmbedSelector.java:139)
at com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector$1.generate(CanalEmbedSelector.java:139)
at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded$1.apply(CanalServerWithEmbedded.java:68)
at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded$1.apply(CanalServerWithEmbedded.java:65)
at com.google.common.collect.ComputingConcurrentHashMap$ComputingValueReference.compute(ComputingConcurrentHashMap.java:356)
at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.compute(ComputingConcurrentHashMap.java:182)
at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.getOrCompute(ComputingConcurrentHashMap.java:151)
at com.google.common.collect.ComputingConcurrentHashMap.getOrCompute(ComputingConcurrentHashMap.java:67)
at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:885)
at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded.start(CanalServerWithEmbedded.java:98)
at com.alibaba.otter.node.etl.select.selector.canal.CanalEmbedSelector.start(CanalEmbedSelector.java:206)
at com.alibaba.otter.node.etl.select.SelectTask.startup(SelectTask.java:170)
at com.alibaba.otter.node.etl.select.SelectTask.run(SelectTask.java:126)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at java.util.Arrays$ArrayList.get(Arrays.java:3841)
at com.alibaba.otter.canal.common.zookeeper.ZooKeeperx.connect(ZooKeeperx.java:68)
zk名稱(chēng)被修改,需要在canal和node等重新選擇一遍(即使一模一樣),在保存下即可
pid:5 nid:1 exception:canal:有零花(ulinghua_cs_online):java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:94)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:137)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:220)
at java.lang.Thread.run(Thread.java:745)
可能是之前同步任務(wù)沒(méi)成功,我們用的rds,默認(rèn)只保留18個(gè)小時(shí),然后后面在啟動(dòng)的時(shí)候,同步進(jìn)度已經(jīng)過(guò)了18個(gè)小時(shí),導(dǎo)致一直報(bào)這個(gè)錯(cuò)誤,解放方案:
停掉同步任務(wù)
進(jìn)入對(duì)應(yīng) Pipeline ,刪除同步進(jìn)度,重新啟動(dòng)即可,但是會(huì)丟失之前的數(shù)據(jù),需要人工同步丟失的數(shù)據(jù)

pid:8 nid:1 exception:canal:微現(xiàn)金(vxianjin_online):com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.google.common.util.concurrent.UncheckedExecutionException: java.io.IOException: should execute connector.connect() first
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: java.io.IOException: should execute connector.connect() first
at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4832)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache.getTableMeta(TableMetaCache.java:160)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.getTableMeta(LogEventConvert.java:759)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parseRowsEvent(LogEventConvert.java:428)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parse(LogEventConvert.java:114)
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert.parse(LogEventConvert.java:66)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser.parseAndProfilingIfNecessary(AbstractEventParser.java:337)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3$1.sink(AbstractEventParser.java:184)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:145)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:220)
at java.lang.Thread.run(Thread.java:745)
排查下canel的用戶(hù)名密碼是否正確、數(shù)據(jù)源是否連接失敗、白名單配置等等,實(shí)在都沒(méi)問(wèn)題就重啟node解決