原來(lái)Canal也可以做HA?

前言

???????? 在做實(shí)時(shí)數(shù)倉(cāng)時(shí),數(shù)據(jù)量往往比較大的,如果使用Canal來(lái)監(jiān)聽MySQL的狀態(tài)當(dāng)Canal 是單節(jié)服務(wù)時(shí),服務(wù)器掛掉是就會(huì)造成數(shù)據(jù)丟失,這時(shí)Canal恰好可以配置HA這樣就能解決單點(diǎn)問(wèn)題,但是依賴于zookeeper,那我們就來(lái)配置一下Canal的HA。

一、Canal HA模式配置

1.1 服務(wù)器端HA模式配置

???????? canal是支持HA的,其實(shí)現(xiàn)機(jī)制也是依賴zookeeper來(lái)實(shí)現(xiàn)的,用到的特性有watcher和EPHEMERAL節(jié)點(diǎn)(和session生命周期綁定),與HDFS的HA類似。

??canal的ha分為兩部分,canal server和canal client分別有對(duì)應(yīng)的ha實(shí)現(xiàn)

  • canal server: 為了減少對(duì)mysql dump的請(qǐng)求,不同server上的instance(不同server上的相同instance)要求同一時(shí)間只能有一個(gè)處于running,其他的處于standby狀態(tài)(standby是instance的狀態(tài))。
  • canal client: 為了保證有序性,一份instance同一時(shí)間只能由一個(gè)canal client進(jìn)行g(shù)et/ack/rollback操作,否則客戶端接收無(wú)法保證有序。

1.2 環(huán)境準(zhǔn)備

  • Canal:node01,node02
  • zookeeper:node01,node02,node03
  • MySQL: node01

1.3 Canal HA服務(wù)器配置

按照部署和配置,在單臺(tái)機(jī)器上各自完成配置,演示時(shí)instance name為example修改canal.properties,加上zookeeper配置

canal.zkServers=node01:2181,node02:2181,node03:2181
#?需要價(jià)將這個(gè)文件關(guān)掉,打開default這個(gè)文件?這個(gè)文件配置了zookeeper地址
#canal.instance.global.spring.xml?=?classpath:spring/file-instance.xml
canal.instance.global.spring.xml?=?classpath:spring/default-instance.xml
# 目錄位置canal/conf/example/instance.properties
canal.instance.mysql.slaveId = 1235

注意:需要將Canal包拷貝到node02需要修改canal.instance.mysql.slaveId 這個(gè)需要這個(gè)數(shù)字不能跟node01上的機(jī)器上一樣,負(fù)責(zé)會(huì)出現(xiàn)問(wèn)題

1.4 Canal 環(huán)境啟動(dòng)

  1. 啟動(dòng)zookeeper
nohup?/export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh?start
  1. 啟動(dòng)node01上的Canal
./startup.bat
  1. 啟動(dòng)node02上的Canal
./startup.bat
-------
ssh?node1
sh?bin/startup.sh
--------
ssh?node2
sh?bin/startup.sh

啟動(dòng)后,可以查看logs/example/example.log,只會(huì)看到一臺(tái)機(jī)器上出現(xiàn)了啟動(dòng)成功的日志。比如這里啟動(dòng)成功的是node02

2020-10-17?03:13:24.211?[main]?INFO??c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer?-?Loading?properties?file?from?class?path?resource?[canal.properties]
2020-10-17?03:13:24.224?[main]?INFO??c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer?-?Loading?properties?file?from?class?path?resource?[example/instance.properties]
2020-10-17?03:13:24.340?[main]?WARN??org.springframework.beans.TypeConverterDelegate?-?PropertyEditor?[com.sun.beans.editors.EnumEditor]?found?through?deprecated?global?PropertyEditorManager?fallback?-?consider?using?a?more?isolated?form?of?registration,?e.g.?on?the?BeanWrapper/BeanFactory!
2020-10-17?03:13:24.486?[main]?INFO??c.a.otter.canal.instance.spring.CanalInstanceWithSpring?-?start?CannalInstance?for?1-example?
2020-10-17?03:13:24.509?[main]?INFO??c.a.otter.canal.instance.core.AbstractCanalInstance?-?start?successful....
2020-10-17?03:13:24.604?[destination?=?example?,?address?=?node02/192.168.100.202:3306?,?EventParser]?WARN??c.a.otter.canal.parse.inbound.mysql.MysqlEventParser?-?prepare?to?find?start?position?just?show?master?status

查看一下zookeeper中的節(jié)點(diǎn)信息,也可以知道當(dāng)前工作的節(jié)點(diǎn)為node01:11111

[zk:?localhost:2181(CONNECTED)?2]??get?/otter/canal/destinations/example/running
{"active":true,"address":"192.168.100.201:11111","cid":1}
cZxid?=?0x6800000013
ctime?=?Mon?Oct?12?05:13:29?CST?2020
mZxid?=?0x6800000013
mtime?=?Mon?Oct?12?05:13:29?CST?2020
pZxid?=?0x6800000013
cversion?=?0
dataVersion?=?0
aclVersion?=?0
ephemeralOwner?=?0x2751980dfb80000
dataLength?=?57
numChildren?=?0

二、客戶端連接

package?com.canal.Test;

/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.canal.Test
?*?@File?:CanalTest.java
?*?
?*?@date?2021/1/11?21:54?*/

import?com.alibaba.otter.canal.client.CanalConnector;
import?com.alibaba.otter.canal.client.CanalConnectors;
import?com.alibaba.otter.canal.protocol.CanalEntry;
import?com.alibaba.otter.canal.protocol.Message;
import?com.google.protobuf.InvalidProtocolBufferException;
import?java.net.InetSocketAddress;
import?java.util.List;

/**
?*?測(cè)試canal配置是否成功?*/
public?class?CanaHAlTest?{

????public?static?void?main(String[]?args)?{
????????//1.創(chuàng)建連接

????????CanalConnector?connect?=?CanalConnectors.newClusterConnector("node01:2181,node02:2181,node03:2181",?"example",?"",?"");
????????//指定一次性讀取的條數(shù)
????????int?bachChSize?=?1000;
????????//?設(shè)置轉(zhuǎn)態(tài)
????????boolean?running?=?true;
????????while?(running)?{
????????????//2.建立連接
????????????connect.connect();
????????????//回滾上次請(qǐng)求的信息放置防止數(shù)據(jù)丟失
????????????connect.rollback();
????????????//?訂閱匹配日志
????????????connect.subscribe();
????????????while?(running)?{
????????????????Message?message?=?connect.getWithoutAck(bachChSize);
????????????????//?獲取batchId
????????????????long?batchId?=?message.getId();
????????????????//?獲取binlog數(shù)據(jù)的條數(shù)
????????????????int?size?=?message.getEntries().size();
????????????????if?(batchId?==?-1?||?size?==?0)?{

????????????????}?else?{
????????????????????printSummary(message);
????????????????}
????????????????//?確認(rèn)指定的batchId已經(jīng)消費(fèi)成功
????????????????connect.ack(batchId);
????????????}
????????}
????}

????private?static?void?printSummary(Message?message)?{
????????//?遍歷整個(gè)batch中的每個(gè)binlog實(shí)體
????????for?(CanalEntry.Entry?entry?:?message.getEntries())?{
????????????//?事務(wù)開始
????????????if?(entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONBEGIN?||?entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONEND)?{
????????????????continue;
????????????}

????????????//?獲取binlog文件名
????????????String?logfileName?=?entry.getHeader().getLogfileName();
????????????//?獲取logfile的偏移量
????????????long?logfileOffset?=?entry.getHeader().getLogfileOffset();
????????????//?獲取sql語(yǔ)句執(zhí)行時(shí)間戳
????????????long?executeTime?=?entry.getHeader().getExecuteTime();
????????????//?獲取數(shù)據(jù)庫(kù)名
????????????String?schemaName?=?entry.getHeader().getSchemaName();
????????????//?獲取表名
????????????String?tableName?=?entry.getHeader().getTableName();
????????????//?獲取事件類型?insert/update/delete
????????????String?eventTypeName?=?entry.getHeader().getEventType().toString().toLowerCase();

????????????System.out.println("logfileName"?+?":"?+?logfileName);
????????????System.out.println("logfileOffset"?+?":"?+?logfileOffset);
????????????System.out.println("executeTime"?+?":"?+?executeTime);
????????????System.out.println("schemaName"?+?":"?+?schemaName);
????????????System.out.println("tableName"?+?":"?+?tableName);
????????????System.out.println("eventTypeName"?+?":"?+?eventTypeName);

????????????CanalEntry.RowChange?rowChange?=?null;
????????????try?{
????????????????//?獲取存儲(chǔ)數(shù)據(jù),并將二進(jìn)制字節(jié)數(shù)據(jù)解析為RowChange實(shí)體
????????????????rowChange?=?CanalEntry.RowChange.parseFrom(entry.getStoreValue());
????????????}?catch?(InvalidProtocolBufferException?e)?{
????????????????e.printStackTrace();
????????????}


????????????//?迭代每一條變更數(shù)據(jù)
????????????for?(CanalEntry.RowData?rowData?:?rowChange.getRowDatasList())?{
????????????????//?判斷是否為刪除事件
????????????????if?(entry.getHeader().getEventType()?==?CanalEntry.EventType.DELETE)?{
????????????????????System.out.println("---delete---");
????????????????????printColumnList(rowData.getBeforeColumnsList());
????????????????????System.out.println("---");
????????????????}
????????????????//?判斷是否為更新事件
????????????????else?if?(entry.getHeader().getEventType()?==?CanalEntry.EventType.UPDATE)?{
????????????????????System.out.println("---update---");
????????????????????printColumnList(rowData.getBeforeColumnsList());
????????????????????System.out.println("---");
????????????????????printColumnList(rowData.getAfterColumnsList());
????????????????}
????????????????//?判斷是否為插入事件
????????????????else?if?(entry.getHeader().getEventType()?==?CanalEntry.EventType.INSERT)?{
????????????????????System.out.println("---insert---");
????????????????????printColumnList(rowData.getAfterColumnsList());
????????????????????System.out.println("---");
????????????????}
????????????}
????????}
????}

????//?打印所有列名和列值
????private?static?void?printColumnList(List<CanalEntry.Column>?columnList)?{
????????for?(CanalEntry.Column?column?:?columnList)?{
????????????System.out.println(column.getName()?+?"\t"?+?column.getValue());
????????}
????}
}

運(yùn)行測(cè)試

14:24:50.371?[main-SendThread(node02:2181)]?DEBUG?org.apache.zookeeper.ClientCnxn?-?Got?ping?response?for?sessionid:?0x2751980dfb80001?after?1ms
14:25:03.704?[main-SendThread(node02:2181)]?DEBUG?org.apache.zookeeper.ClientCnxn?-?Got?ping?response?for?sessionid:?0x2751980dfb80001?after?1ms

去數(shù)據(jù)庫(kù)中修改一條數(shù)據(jù)進(jìn)行測(cè)試

logfileName:mysql-bin.000002
logfileOffset:761
executeTime:1602452082000
schemaName:zw
tableName:dw_t_product
eventTypeName:update
---update---
goods_id?007
goods_status?待審核
createtime?2019-12-22
modifytime?2019-12-22
cdat?20191222
---
goods_id?007
goods_status?待審核
createtime?33
modifytime?2019-12-22
cdat?20191222

這時(shí)我們就成功的獲取到了我們修改的數(shù)據(jù),這時(shí)有小伙伴說(shuō)不是HA嗎。把node01節(jié)點(diǎn)停掉看看任務(wù)會(huì)不會(huì)正常運(yùn)行。

[root@node01?bin]#?./stop.sh?
node01:?stopping?canal?6345?...?
Oook!?cost:1

去數(shù)據(jù)庫(kù)中修改一條數(shù)據(jù)看看能不能獲取到

logfileName:mysql-bin.000002
logfileOffset:1071
executeTime:1602452401000
schemaName:zw
tableName:dw_t_product
eventTypeName:update
---update---
goods_id?004
goods_status?已刪除
createtime?2019-12-15
modifytime?2019-12-20
cdat?20191222
---
goods_id?004
goods_status?已刪除
createtime?2019-2-15
modifytime?2019-12-20
cdat?20191222

發(fā)現(xiàn)也是可以獲取到數(shù)據(jù)的,我們現(xiàn)在去zookeeper中看看canal對(duì)外提供服務(wù)的是那臺(tái)節(jié)點(diǎn)

[zk:?localhost:2181(CONNECTED)?0]??get?/otter/canal/destinations/example/running??
{"active":true,"address":"192.168.100.202:11111","cid":1}
cZxid?=?0x680000002a
ctime?=?Mon?Oct?12?05:39:05?CST?2020
mZxid?=?0x680000002a
mtime?=?Mon?Oct?12?05:39:05?CST?2020
pZxid?=?0x680000002a
cversion?=?0
dataVersion?=?0
aclVersion?=?0
ephemeralOwner?=?0x17532d2b90c0000
dataLength?=?57
numChildren?=?0

可以清晰的發(fā)現(xiàn)成功切換到我們的node02節(jié)點(diǎn)上了

三、Canal Server HA的流程圖

  1. canal server要啟動(dòng)某個(gè)canal instance時(shí)都先向zookeeper進(jìn)行一次嘗試啟動(dòng)判斷 (實(shí)現(xiàn):創(chuàng)建EPHEMERAL節(jié)點(diǎn),誰(shuí)創(chuàng)建成功就允許誰(shuí)啟動(dòng))
  2. 創(chuàng)建zookeeper節(jié)點(diǎn)成功后,對(duì)應(yīng)的canal server就啟動(dòng)對(duì)應(yīng)的canal instance,沒(méi)有創(chuàng)建成功的canal instance就會(huì)處于standby狀態(tài)
  3. 一旦zookeeper發(fā)現(xiàn)canal server A創(chuàng)建的節(jié)點(diǎn)消失后,立即通知其他的canal server再次進(jìn)行步驟1的操作,重新選出一個(gè)canal server啟動(dòng)instance.

小結(jié)

???????? 到這里我們就解決了Canal的單點(diǎn)問(wèn)題,現(xiàn)在大多數(shù)的組件都會(huì)創(chuàng)建HA的,首先對(duì)于公司而言數(shù)據(jù)是最終要的是數(shù)據(jù),如果是一個(gè)單服務(wù)當(dāng)服務(wù)出現(xiàn)問(wèn)題時(shí)就會(huì)造成數(shù)據(jù)丟失,那這個(gè)損失就不一點(diǎn)點(diǎn)了。我在這里為大家提供大數(shù)據(jù)的資料需要的朋友可以去下面GitHub去下載,信自己,努力和汗水總會(huì)能得到回報(bào)的。我是大數(shù)據(jù)老哥,我們下期見~~~

資源獲取 獲取Flink面試題,Spark面試題,程序員必備軟件,hive面試題,Hadoop面試題,Docker面試題,簡(jiǎn)歷模板等資源請(qǐng)去GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigDataGitee 自行下載 ?https://gitee.com/li_hey_hey/dashboard/projects實(shí)時(shí)數(shù)倉(cāng)代碼GitHub:https://github.com/lhh2002/Real_Time_Data_WareHouse

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容