一、otter介紹
????阿里巴巴B2B公司,因為業(yè)務的特性,賣家主要集中在國內,買家主要集中在國外,所以衍生出了杭州和美國異地機房的需求,同時為了提升用戶體驗,整個機房的架構為雙A,兩邊均可寫,由此誕生了otter這樣一個產(chǎn)品。
????otter第一版本可追溯到04~05年,此次外部開源的版本為第4版,開發(fā)時間從2011年7月份一直持續(xù)到現(xiàn)在,目前阿里巴巴B2B內部的本地/異地機房的同步需求基本全上了otte4。
目前同步規(guī)模:
- 同步數(shù)據(jù)量6億
- 文件同步1.5TB(2000w張圖片)
- 涉及200+個數(shù)據(jù)庫實例之間的同步
- 80+臺機器的集群規(guī)模
Otter項目地址:https://github.com/alibaba/otter
Otter文檔地址:https://github.com/alibaba/otter/wiki
二、基礎概念
Pipeline:從源端到目標端的整個過程描述,主要由一些同步映射過程組成;
Channel:同步通道,單向同步中一個Pipeline組成,在雙向同步中有兩個Pipeline組成;
DataMediaPair:根據(jù)業(yè)務表定義映射關系,比如源表和目標表,字段映射,字段組等;
DataMedia: 抽象的數(shù)據(jù)介質概念,可以理解為數(shù)據(jù)表/mq隊列定義;
DataMediaSource: 抽象的數(shù)據(jù)介質源信息,補充描述DateMedia;
ColumnPair: 定義字段映射關系;
ColumnGroup: 定義字段映射組;
Node: 處理同步過程的工作節(jié)點,對應一個jvm;
基礎概念直接的關系如圖:

三、架構設計
下圖是關于Otter運行原理圖:

根據(jù)上圖里面關鍵幾個元素進行介紹
- db : 數(shù)據(jù)源以及需要同步到的庫
- Canal : 用戶獲取數(shù)據(jù)庫增量日志
- manager : 配置同步規(guī)則設置數(shù)據(jù)源同步源等
- zookeeper : 協(xié)調node進行協(xié)調工作
- node : 負責任務處理,即根據(jù)任務配置對數(shù)據(jù)源進行解析并同步到目標數(shù)據(jù)庫的操作。
原理描述:
基于Canal開源產(chǎn)品,獲取數(shù)據(jù)庫增量日志數(shù)據(jù)。
a. 開源鏈接地址:http://github.com/alibaba/canal
b. 推薦一個講Canal源碼的博客:http://www.tianshouzhi.com/api/tutorials/canal/380-
典型管理系統(tǒng)架構,manager(web管理)+node(工作節(jié)點)
a. manager運行時推送同步配置到node節(jié)點
b. node節(jié)點將同步狀態(tài)反饋到manager上
基于zookeeper,解決分布式狀態(tài)調度的,允許多node節(jié)點之間協(xié)同工作.
流程:
- 定義數(shù)據(jù)源
- 定義數(shù)據(jù)介質
- 建立映射規(guī)則
- 建立字段映射
- 建立字段組
四、代碼結構
工程結構:

包含三部分:Share | Node | Manager。 其中Share是Node和Manager共享的子系統(tǒng),并不是獨立部署的節(jié)點。Node和Manager是獨立部署的。
Node:一個獨立部署的節(jié)點,比如兩個機房需要做通訊,則每個機房至少要部署一個Node節(jié)點(不考慮HA的話),數(shù)據(jù)同步的過程實際上都發(fā)生在Node之間
Manager:管理的節(jié)點,邏輯上只有一個(一個Manager管理多個Node節(jié)點),如果不考慮HA的話。負責管理同步的數(shù)據(jù)定義,包括數(shù)據(jù)源、Channel、PipeLine、數(shù)據(jù)映射等,各個Node節(jié)點從Manager處獲取并執(zhí)行這些信息。另外還有監(jiān)控等信息。
Manger各個子系統(tǒng)說明:
- biz 對系統(tǒng)數(shù)據(jù)加載(即初始化時候執(zhí)行SQL初始化的系統(tǒng)表數(shù)據(jù))
- deployer 本地啟動
- web 配置管理的webUI
Node各個子系統(tǒng)的說明:
- Common:公共內容定義
- Canal: Canal的封裝,Otter采用的是Embed的方式引入Canal(Canal有Embed和獨立運行兩種模式)
- Deployer:內置Jetty的啟動
- etl: S.E.T.L 調度、處理的實現(xiàn),是Otter最復雜、也是最核心的部分。(Select、Extract、Transform、Load)
SETL過程功能說明如圖:

Share各個子系統(tǒng)的說明:
- Common: 公共內容定義
- Arbitrate: 用于Manager與Node之間、Node與Node之間的調度、S.E.T.L幾個過程的調度等;
- Communication 數(shù)據(jù)傳輸?shù)牡讓樱蠈拥腜ipe、一些調度等都是依賴于Communication的, 簡單點說它負責點對點的Event發(fā)送和接收
- etl:實際上并不負責ETL的具體實現(xiàn),只是一些接口&數(shù)據(jù)結構的定義而已,具體的實現(xiàn)在Node里面。
五、基本操作
這部分操作參考官網(wǎng):https://github.com/alibaba/otter/wiki
1. Manger和Node的配置和部署
2. 配置任務
六、本地調試環(huán)境搭建
1. 代碼下載和導入idea
環(huán)境搭建:
- 進入$otter_home目錄
- 執(zhí)行:mvn clean install
- 導入maven項目。如果eclipse下報"Missing artifact com.oracle:ojdbc14:jar:10.2.0.3.0",修改
{user.dir}/lib/ojdbc14-10.2.0.3.0.jar"為絕對路徑,比如"d:/lib/ojdbc14-10.2.0.3.0.jar"
導入idea之后的結構如圖:

本地調試環(huán)境搭建:
-
- 配置Manager
- 1.1 打開Manger下的deployer模塊下的otter.properties文件
-
1.2 修改如下屬性:
圖片.png
圖片.png- 1.3 運行OtterManagerLauncher.java 啟動Manager
- 1.4 訪問http://localhost:8080,配置集群
- 1.4 訪問http://localhost:8080,配置Node,ip為本機ip
-
配置Node
2.1 打開node下的deployer模塊下的otter.properties文件
-
2.2 修改屬性,這個node要連接去那個mananger,我們是本機調試寫本機Ip,或者127.0.0.1
圖片.png -
2.3 運行OtterLauncher.java時候設置運行參數(shù)-Dnid=id,這個id為manager,web界面中配置node對應的id。
如圖:
圖片.png 2.4 運行OtterLauncher.java
2.5 啟動完成之后可以在manager的web界面看到node已經(jīng)啟動
-
啟動
- 3.1 配置同步任務
- 3.2 啟動同步任務,修改數(shù)據(jù)源的表,可以查看輸出端的表是否有數(shù)據(jù)同步過去了。
七、改造支持kafka的代碼介紹
修改分為三大部分:
1. manager端的改造
- 修改頁面 addDataSource.vm 添加 Kafka 選項;
- dbCheck.js 引用了 Hello.js,這里應用了dwr技術,通過js去調用后端java代碼。
配置文件manager中的biz模塊下的otter-manager-service.xml文件:
<bean id="dataSourceChecker" class="com.alibaba.otter.manager.biz.utils.DataSourceChecker">
<property name="dataMediaSourceService">
<ref bean="dataMediaSourceService" />
</property>
<property name="dataSourceCreator">
<ref bean="dataSourceCreator" />
</property>
<dwr:remote javascript="Hello">
<dwr:include method="check" />
<dwr:include method="checkMap" />
<dwr:include method="checkNamespaceTables" />
</dwr:remote>
</bean>
dbCheck.js調用DataSourceChecker.java中的方法去校驗數(shù)據(jù)庫是否可用,增加kafka的校驗方法。
dwr簡單教程
editDataSource.vm頁面類型選項添加 Kafka 類型
DataSourceList.java中增加kafka數(shù)據(jù)源處理
addDataMedia.vm頁面中的配置kafka的主題時如何驗證主題是否存在?
addDataMedia.vm頁面中的kafka的主題無法驗證是否存在
-
addColumnPair.vm中如果存在kafka的數(shù)據(jù)源時候,列直接復制mysql表的列。
數(shù)據(jù)來源:AddColumnPairGroup.java中的execute方法調用buildColumnPairFromDataMedia方法。
在kafka中沒有列的概念,要把數(shù)據(jù)源的列直接賦值給kafka,這一步操作需要在幾個類中進行:
com.alibaba.otter.manager.web.home.module.screen.AddColumnPair
com.alibaba.otter.manager.web.home.module.screen.AddColumnPairGroup
- ColumnPairAction.java 中的doSave方法中kafka沒有所謂的列名,這里要處理一下進行保存。
- 添加 Kafka 數(shù)據(jù)源類型
com.alibaba.otter.shared.common.model.config.data.kafka.KafkaDataMedia.java
com.alibaba.otter.shared.common.model.config.data.kafka.KafkaMediaSource.java
到這里,前端的改造基本完成??梢酝ㄟ^manager的web界面新建出kafka的數(shù)據(jù)源。
2. node端的改造 selector部分
- com.alibaba.otter.node.etl.select.selector.MessageParser類負責解析數(shù)據(jù)對象解析,將對應canal送出來的Entry對象解析為otter使用的內部對象。這里對配置的數(shù)據(jù)源做了一個檢查:源和目標的庫名表名是否是一致。這里需要排除kafka數(shù)據(jù)源做判斷。
3. node端的改造 transform部分
- node中的T部分工作的代碼在com.alibaba.otter.node.etl.transform包下。
- 新增com.alibaba.otter.node.etl.transform.transformer.NoStructTransformer.java類處理RowData轉到Kafka需要的數(shù)據(jù)格式。
- 在com.alibaba.otter.node.etl.transform.OtterTransformerFactory類中的lookup方法中增加針對KafkaDataMedia的處理。
- 在node中的etl模塊中的otter-node-transform.xml增加NoStructTransformer的注入配置。
4. node端的改造 load部分
- 增加Kafka數(shù)據(jù)源的方言處理類:com.alibaba.otter.node.etl.common.db.dialect.kafka.KafkaDialect,構建kafka的producer來發(fā)送消息。
- com.alibaba.otter.node.etl.load.loader.db.DbLoadAction類中的doCall方法增加對kafka方言的判斷和處理。
- kafka分區(qū)有序,所以在發(fā)送binlog數(shù)據(jù)時候需要根據(jù)數(shù)據(jù)情況設置好分區(qū)的方式,可以通過設定分區(qū)字段,然后自定義分區(qū)去完成
至此整體改造完成。
可以通過如下步驟進行驗證:
- manager啟動
- manager中配置mysql=>kafka的任務
- 啟動node
- manager中啟動同步任務
- mysql中修改數(shù)據(jù)
- 在kafka的customer中查看是否有消息發(fā)送過來



