一、前言:
??? 最近學(xué)習(xí)了消息隊(duì)列,剛好最近有一個(gè)需求:mysql數(shù)據(jù)更新或刪除時(shí),需要同步更新到Elasticsearch。所以寫一篇簡書來記錄一下解決過程。如果還沒有了解過消息隊(duì)列,可以先翻看我前幾篇簡書。
???RabbitMQ入門之安裝???????????????????????????? RabbitMQ入門之六種模式介紹(一)
???RabbitMQ入門之六種模式介紹(二) ???SpringBoot中使用RabbitMQ
源碼地址:https://github.com/rjqamdaflf/synchronize
二、思路:
- 對于RabbitMQ,我選用Work queues模型,因?yàn)檫@個(gè)模型中隊(duì)列的消息一旦消費(fèi),就會消失,消息是不會被重復(fù)消費(fèi)的。
- 生產(chǎn)與消費(fèi)消息在兩個(gè)小項(xiàng)目中,因此發(fā)送消息類型選擇JSONObject,接收到消息后再利用反射機(jī)制創(chuàng)建對應(yīng)實(shí)體類對象。
三、代碼實(shí)現(xiàn):
生產(chǎn)者:
消息內(nèi)容:包含增刪改需要調(diào)用的DAO接口名稱,實(shí)體類名,以及實(shí)體類對象具體內(nèi)容轉(zhuǎn)成json字符串
//獲取消息內(nèi)容
public static JSONObject getEsMessage(String JpaBeanName, Object object) {
JSONObject json = new JSONObject();
//傳入DAO接口名稱
json.put("JpaBeanName", JpaBeanName);
//獲取類名,這個(gè)類名是給消費(fèi)者通過反射去創(chuàng)建實(shí)例化對象
String[] split = object.getClass().toString().split("\\.|\\s+");
json.put("ClassName", split[split.length - 1]);
//將對象轉(zhuǎn)成json字符串
json.put("Object", JSONObject.toJSONString(object));
return json;
}
發(fā)送消息:在需要對mysql進(jìn)行增刪改操作的函數(shù)中,增加發(fā)送消息操作,例如下面這個(gè)例子:convertAndSend()函數(shù)中,第一個(gè)參數(shù)為交換機(jī)名稱,這里不需要,第二個(gè)參數(shù)為key,第三個(gè)為發(fā)送內(nèi)容。
@Override
public int saveCourse(Node courseNode) {
//存入數(shù)據(jù)庫,返回存入的數(shù)據(jù)
Node newNode = nodeJpaRepository.save(courseNode);
//發(fā)送消息到消息隊(duì)列
rabbitTemplate.convertAndSend("", "save", JsonUtils.getEsMessage(NameUtils.NODESERVICE_ES_BEAN_NAME, Node.toEsNode(newNode)));
return newNode.getId();
}
將DAO接口名字封裝成一個(gè)類
public class NameUtils {
//接口名需要和消費(fèi)者的接口名一致
public final static String CHAPTERSERVICE_ES_BEAN_NAME = "esChapterJpa";
public final static String COMMENTSERVICE_ES_BEAN_NAME = "esCommentJpa";
public final static String COURSESERVICE_ES_BEAN_NAME = "esCourseJpa";
public final static String KGSERVICE_ES_BEAN_NAME = "esKgJpa";
public final static String NODESERVICE_ES_BEAN_NAME = "esNodeJpa";
public final static String SCHOOLSERVICE_ES_BEAN_NAME = "esSchoolJpa";
public final static String SOURCESERVICE_ES_BEAN_NAME = "esSourceJpa";
}
消費(fèi)者:
消息消費(fèi)函數(shù)
@Component
public class EsCustomer {
//隊(duì)列名
public final static String SAVE = "Save";
public final static String DELETE = "Delete";
//類基礎(chǔ)路徑
public final static String BASE_ENTITY_PACKAGE = "com.demo.synchronize.es.esentity.";
@Resource
private BeanUtils beanUtils;
//當(dāng)此隊(duì)列有消息時(shí),說明mysql表有進(jìn)行增改操作,讀取隊(duì)列里面的json數(shù)據(jù),通過beanName獲取對應(yīng)bean實(shí)例
@RabbitListener(queuesToDeclare = @Queue(EsCustomer.SAVE))
public void NodeSave(JSONObject message) {
System.out.println("接收到 save消息:" + message);
String className = BASE_ENTITY_PACKAGE + message.getString("ClassName");
//通過反射機(jī)制,得到實(shí)例化對象
Object object = ClassUtils.getEsClass(className, message.getString("Object"));
System.out.println("反射得到的object:" + object);
//獲取對應(yīng)的bean
ElasticsearchRepository elasticsearchRepository = (ElasticsearchRepository) beanUtils.getBeanByBeanName(message.getString("JpaBeanName"));
//System.out.println(elasticsearchRepository);
if (elasticsearchRepository != null && object != null) {
//進(jìn)行保存或修改操作
elasticsearchRepository.save(object);
}
}
@RabbitListener(queuesToDeclare = @Queue(EsCustomer.DELETE))
public void NodeDelete(JSONObject message) {
System.out.println("接收到 delete消息:" + message);
//獲取對應(yīng)的bean
ElasticsearchRepository elasticsearchRepository = (ElasticsearchRepository) beanUtils.getBeanByBeanName(message.getString("JpaBeanName"));
Object object = message.get("Object");
if (elasticsearchRepository != null && object != null) {
//刪除操作
elasticsearchRepository.deleteById(object);
}
}
}
利用反射機(jī)制創(chuàng)建實(shí)體類對象
public static Object getEsClass(String className, String object) {
Class beanClass = null;
Object bean;
try {
// 加載 beanClass
beanClass = Class.forName(className);
//創(chuàng)建實(shí)體類對象
bean = beanClass.newInstance();
} catch (Exception e) {
e.printStackTrace();
//如果出錯(cuò)直接返回null,消費(fèi)者判斷null直接將消費(fèi)扔掉
return null;
}
JSONObject jsonObject = JSONObject.parseObject(object);
jsonObject.keySet().forEach(s -> {
try {
// 利用反射將 bean 相關(guān)字段訪問權(quán)限設(shè)為可訪問
Field declaredField = bean.getClass().getDeclaredField(s);
declaredField.setAccessible(true);
//獲取對應(yīng)屬性值
Object value = jsonObject.get(s);
// 將屬性值填充到相關(guān)字段中
declaredField.set(bean, value);
} catch (Exception e) {
//可能會有部分屬性名不匹配
e.printStackTrace();
}
});
return bean;
}
獲取bean的方法,如果找不到這個(gè)bean或者有其他異常,直接返回null,消費(fèi)者丟棄這條消息
@Component
public class BeanUtils {
@Resource
private ApplicationContext applicationContext;
public Object getBeanByBeanName(String beanName) {
try {
return applicationContext.getBean(beanName);
} catch (Exception e) {
return null;
}
}
}
四、效果:
1. 操作Node表:

控制臺輸出

es可視化工具查看

控制臺輸出

es可視化工具查看
2.操作chapter表:

控制臺輸出

es可視化工具查看