實現(xiàn)過程
通過canal解析binlog,canal主要模擬了mysql的Slave向Master發(fā)送請求,當mysql有增刪改查時則會出發(fā)請求將數(shù)據(jù)發(fā)送到canal服務(wù)中,canal將數(shù)據(jù)存放到內(nèi)存,直到客戶端程序(canal服務(wù)端和客戶端程序都是由java編寫,且客戶端邏輯由我們借助com.alibaba.otter.canal工具包下的類完成開發(fā))通過發(fā)布-訂閱這種模式消費canal服務(wù)中的數(shù)據(jù)。
配置mysql
1.在my.ini文件里的【mysqld】下添加如下:

重啟mysql,注意如果是文件名是my-default.ini要改名成my.ini
2.創(chuàng)建mysql用戶,并配置canal權(quán)限,進入到mysql程序中,輸入下列命令,創(chuàng)建用戶,(用戶名和密碼都是:canal):
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON數(shù)據(jù)庫名.表名 TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON數(shù)據(jù)庫名.表名 TO 'canal'@'%' ;
FLUSH PRIVILEGES;
下載并配置啟動canal:
1.]官網(wǎng)地址:[https://github.com/alibaba/canal/releases,下載后解壓
2.配置canal:主要配置的文件有兩處,canal/conf/example/instance.properties和 canal/conf/canal.properties .下面對這兩處做修改:
cd canal/conf/example/目錄下對 instance.properties進行修改:

cd canal/conf/目錄下對 canal.properties 進行修改:

3.啟動canal:雙擊/bin/startup.bat文件啟動
Java連接canal執(zhí)行同步操作:
1.在maven項目中中加載canal和redis依賴包:
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
2.建立canal客戶端,從canal中獲取數(shù)據(jù),并將數(shù)據(jù)更新至Redis:
package canal;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
/**
* @Author: daijunchen
* @Date: 2019/3/12 17:30
*/
public class ClientSample {
public static void main(String args[]) {
// 創(chuàng)建鏈接,hostname位canal服務(wù)器ip port位canal服務(wù)器端口,username,password可不填
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認
// connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
}
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
redisUpdate(rowData.getAfterColumnsList());
printColumn(rowData.getAfterColumnsList());
}
}
}
}
/**
* 打印變化的數(shù)據(jù)
*
* @param columns
*/
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
/**
* 數(shù)據(jù)插入同步redis
*
* @param columns
*/
private static void redisInsert(List<Column> columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.stringSet("user:" + columns.get(0).getValue(), json.toJSONString());
}
}
/**
* 更新同步redis
*
* @param columns
*/
private static void redisUpdate(List<Column> columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.stringSet("user:" + columns.get(0).getValue(), json.toJSONString());
}
}
/**
* 數(shù)據(jù)刪除同步redis
*
* @param columns
*/
private static void redisDelete(List<Column> columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.delKey("user:" + columns.get(0).getValue());
}
}
}
3.RedisUtil工具類,用來連接redis:
package canal;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* @Author: daijunchen
* @Date: 2019/3/12 17:29
*/
public class RedisUtil {
// Redis服務(wù)器IP
private static String ADDR = "127.0.0.1";
// Redis的端口號
private static int PORT = 6379;
// 訪問密碼
private static String AUTH = "";
// 可用連接實例的最大數(shù)目,默認值為8;
// 如果賦值為-1,則表示不限制;如果pool已經(jīng)分配了maxActive個jedis實例,則此時pool的狀態(tài)為exhausted(耗盡)。
private static int MAX_ACTIVE = 1024;
// 控制一個pool最多有多少個狀態(tài)為idle(空閑的)的jedis實例,默認值也是8。
private static int MAX_IDLE = 200;
// 等待可用連接的最大時間,單位毫秒,默認值為-1,表示永不超時。如果超過等待時間,則直接拋出JedisConnectionException;
private static int MAX_WAIT = 10000;
// 過期時間
protected static int expireTime = 660 * 660 * 24;
// 連接池
protected static JedisPool pool;
/**
* 靜態(tài)代碼,只在初次調(diào)用一次
*/
static {
JedisPoolConfig config = new JedisPoolConfig();
//最大連接數(shù)
config.setMaxTotal(MAX_ACTIVE);
//最多空閑實例
config.setMaxIdle(MAX_IDLE);
//超時時間
config.setMaxWaitMillis(MAX_WAIT);
//
config.setTestOnBorrow(false);
pool = new JedisPool(config, ADDR, PORT, 1000);
}
/**
* 獲取jedis實例
*/
protected static synchronized Jedis getJedis() {
Jedis jedis = null;
try {
jedis = pool.getResource();
} catch (Exception e) {
e.printStackTrace();
if (jedis != null) {
pool.returnBrokenResource(jedis);
}
}
return jedis;
}
/**
* 釋放jedis資源
*
* @param jedis
* @param isBroken
*/
protected static void closeResource(Jedis jedis, boolean isBroken) {
try {
if (isBroken) {
pool.returnBrokenResource(jedis);
} else {
pool.returnResource(jedis);
}
} catch (Exception e) {
}
}
/**
* 是否存在key
*
* @param key
*/
public static boolean existKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
return jedis.exists(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return false;
}
/**
* 刪除key
*
* @param key
*/
public static void delKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
jedis.del(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
/**
* 取得key的值
*
* @param key
*/
public static String stringGet(String key) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.get(key);
jedis.expire(key, expireTime);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}
/**
* 添加string數(shù)據(jù)
*
* @param key
* @param value
*/
public static String stringSet(String key, String value) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.set(key, value);
jedis.expire(key, expireTime);
} catch (Exception e) {
e.printStackTrace();
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}
/**
* 添加hash數(shù)據(jù)
*
* @param key
* @param field
* @param value
*/
public static void hashSet(String key, String field, String value) {
boolean isBroken = false;
Jedis jedis = null;
try {
jedis = getJedis();
if (jedis != null) {
jedis.select(0);
jedis.hset(key, field, value);
jedis.expire(key, expireTime);
}
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
}
驗證修改mysql數(shù)據(jù)庫內(nèi)容后發(fā)現(xiàn)redis也實時更新了:
