利用RabbitMQ同步es和mysql數(shù)據(jù)

一、前言:

??? 最近學(xué)習(xí)了消息隊(duì)列,剛好最近有一個(gè)需求:mysql數(shù)據(jù)更新或刪除時(shí),需要同步更新到Elasticsearch。所以寫一篇簡書來記錄一下解決過程。如果還沒有了解過消息隊(duì)列,可以先翻看我前幾篇簡書。
???RabbitMQ入門之安裝???????????????????????????? RabbitMQ入門之六種模式介紹(一)
???RabbitMQ入門之六種模式介紹(二) ???SpringBoot中使用RabbitMQ
源碼地址:https://github.com/rjqamdaflf/synchronize

二、思路:

  1. 對于RabbitMQ,我選用Work queues模型,因?yàn)檫@個(gè)模型中隊(duì)列的消息一旦消費(fèi),就會消失,消息是不會被重復(fù)消費(fèi)的。
  2. 生產(chǎn)與消費(fèi)消息在兩個(gè)小項(xiàng)目中,因此發(fā)送消息類型選擇JSONObject,接收到消息后再利用反射機(jī)制創(chuàng)建對應(yīng)實(shí)體類對象。

三、代碼實(shí)現(xiàn):

生產(chǎn)者:

消息內(nèi)容:包含增刪改需要調(diào)用的DAO接口名稱,實(shí)體類名,以及實(shí)體類對象具體內(nèi)容轉(zhuǎn)成json字符串

//獲取消息內(nèi)容
public static JSONObject getEsMessage(String JpaBeanName, Object object) {
        JSONObject json = new JSONObject();
        //傳入DAO接口名稱
        json.put("JpaBeanName", JpaBeanName);
        //獲取類名,這個(gè)類名是給消費(fèi)者通過反射去創(chuàng)建實(shí)例化對象
        String[] split = object.getClass().toString().split("\\.|\\s+");
        json.put("ClassName", split[split.length - 1]);
        //將對象轉(zhuǎn)成json字符串
        json.put("Object", JSONObject.toJSONString(object));
        return json;
    }

發(fā)送消息:在需要對mysql進(jìn)行增刪改操作的函數(shù)中,增加發(fā)送消息操作,例如下面這個(gè)例子:convertAndSend()函數(shù)中,第一個(gè)參數(shù)為交換機(jī)名稱,這里不需要,第二個(gè)參數(shù)為key,第三個(gè)為發(fā)送內(nèi)容。

    @Override
    public int saveCourse(Node courseNode) {
        //存入數(shù)據(jù)庫,返回存入的數(shù)據(jù)
        Node newNode = nodeJpaRepository.save(courseNode);
        //發(fā)送消息到消息隊(duì)列 
        rabbitTemplate.convertAndSend("", "save", JsonUtils.getEsMessage(NameUtils.NODESERVICE_ES_BEAN_NAME, Node.toEsNode(newNode)));
        return newNode.getId();
    }

將DAO接口名字封裝成一個(gè)類

public class NameUtils {
    //接口名需要和消費(fèi)者的接口名一致
    public final static String CHAPTERSERVICE_ES_BEAN_NAME = "esChapterJpa";
    public final static String COMMENTSERVICE_ES_BEAN_NAME = "esCommentJpa";
    public final static String COURSESERVICE_ES_BEAN_NAME = "esCourseJpa";
    public final static String KGSERVICE_ES_BEAN_NAME = "esKgJpa";
    public final static String NODESERVICE_ES_BEAN_NAME = "esNodeJpa";
    public final static String SCHOOLSERVICE_ES_BEAN_NAME = "esSchoolJpa";
    public final static String SOURCESERVICE_ES_BEAN_NAME = "esSourceJpa";
}

消費(fèi)者:

消息消費(fèi)函數(shù)

@Component
public class EsCustomer {
    //隊(duì)列名
    public final static String SAVE = "Save";
    public final static String DELETE = "Delete";
    //類基礎(chǔ)路徑    
    public final static String BASE_ENTITY_PACKAGE = "com.demo.synchronize.es.esentity.";
    @Resource
    private BeanUtils beanUtils;

//當(dāng)此隊(duì)列有消息時(shí),說明mysql表有進(jìn)行增改操作,讀取隊(duì)列里面的json數(shù)據(jù),通過beanName獲取對應(yīng)bean實(shí)例
    @RabbitListener(queuesToDeclare = @Queue(EsCustomer.SAVE))
    public void NodeSave(JSONObject message) {
        System.out.println("接收到 save消息:" + message);
        String className = BASE_ENTITY_PACKAGE + message.getString("ClassName");
        //通過反射機(jī)制,得到實(shí)例化對象
        Object object = ClassUtils.getEsClass(className, message.getString("Object"));
        System.out.println("反射得到的object:" + object);
        //獲取對應(yīng)的bean
        ElasticsearchRepository elasticsearchRepository = (ElasticsearchRepository) beanUtils.getBeanByBeanName(message.getString("JpaBeanName"));
        //System.out.println(elasticsearchRepository);
        if (elasticsearchRepository != null && object != null) {
            //進(jìn)行保存或修改操作
            elasticsearchRepository.save(object);
        }
    }

    @RabbitListener(queuesToDeclare = @Queue(EsCustomer.DELETE))
    public void NodeDelete(JSONObject message) {
        System.out.println("接收到 delete消息:" + message);
        //獲取對應(yīng)的bean
        ElasticsearchRepository elasticsearchRepository = (ElasticsearchRepository) beanUtils.getBeanByBeanName(message.getString("JpaBeanName"));
        Object object = message.get("Object");
        if (elasticsearchRepository != null && object != null) {
            //刪除操作
            elasticsearchRepository.deleteById(object);
        }
    }
}

利用反射機(jī)制創(chuàng)建實(shí)體類對象

public static Object getEsClass(String className, String object) {
        Class beanClass = null;
        Object bean;
        try {
            // 加載 beanClass
            beanClass = Class.forName(className);
            //創(chuàng)建實(shí)體類對象
            bean = beanClass.newInstance();
        } catch (Exception e) {
            e.printStackTrace();
            //如果出錯(cuò)直接返回null,消費(fèi)者判斷null直接將消費(fèi)扔掉
            return null;
        }
        JSONObject jsonObject = JSONObject.parseObject(object);
        jsonObject.keySet().forEach(s -> {
            try {
                // 利用反射將 bean 相關(guān)字段訪問權(quán)限設(shè)為可訪問
                Field declaredField = bean.getClass().getDeclaredField(s);
                declaredField.setAccessible(true);
                //獲取對應(yīng)屬性值
                Object value = jsonObject.get(s);
                // 將屬性值填充到相關(guān)字段中
                declaredField.set(bean, value);
            } catch (Exception e) {
                //可能會有部分屬性名不匹配
                e.printStackTrace();
            }
        });
        return bean;
    }

獲取bean的方法,如果找不到這個(gè)bean或者有其他異常,直接返回null,消費(fèi)者丟棄這條消息

@Component
public class BeanUtils {
    @Resource
    private ApplicationContext applicationContext;

    public Object getBeanByBeanName(String beanName) {
        try {
            return applicationContext.getBean(beanName);
        } catch (Exception e) {
            return null;
        }
    }
}

四、效果:

1. 操作Node表:

控制臺輸出

es可視化工具查看

控制臺輸出

es可視化工具查看

2.操作chapter表:

控制臺輸出

es可視化工具查看

3.搞定!!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容