MQ應(yīng)用

消息發(fā)送方式

同步發(fā)送消息

同步發(fā)送消息是指,Producer發(fā)送一條消息后,會在收到MQ返回的ack后才發(fā)送下一條消息,該方式的消息可靠性最高,但是消息發(fā)送效率太低


同步發(fā)送消息.png
public class SyncProducer {
    public static void main(String[] args) throws Exception{
        //創(chuàng)建一個producer, 參數(shù)為producer group
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定namesever地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 設(shè)置當(dāng)發(fā)送失敗時重試發(fā)送的次數(shù),默認(rèn)兩次
        producer.setRetryTimesWhenSendFailed(3);

        // 設(shè)置發(fā)送超時時間
        producer.setSendMsgTimeout(5000);
        // 開啟生產(chǎn)者
        producer.start();

        // 發(fā)送消息
        for(int i =0;i<10;i++){
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("topic","tag",body);
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }
        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }
}

異步發(fā)送消息

異步發(fā)送消息是指,Producer發(fā)出消息后無需等待MQ返回ack,直接發(fā)送下一條消息,該方式的消息可靠性可以得到保障,消息發(fā)送效率也可以


異步發(fā)送消息.png
public class AsyncProducer {
    public static void main(String[] args) throws Exception{
        //創(chuàng)建一個producer, 參數(shù)為producer group
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定namesever地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 設(shè)置發(fā)送超時時間
        producer.setSendMsgTimeout(5000);
        // 開啟生產(chǎn)者
        producer.start();

        for(int i=0;i<10;i++){
            byte[] body = ("Hi," + i).getBytes();
            try{
                Message msg = new Message("async-topic","async-tag",body);
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        // 因為是異步的,所以需要主線程休眠一會等待異步任務(wù)
        TimeUnit.SECONDS.sleep(3);
        producer.shutdown();
    }
}

單向發(fā)送消息

單向發(fā)送消息是指,Producer僅發(fā)負(fù)責(zé)發(fā)送消息,不等待,不處理MQ的ack,該發(fā)送方式MQ也不返回ack,該方式消息發(fā)送效率最高,但是消息可靠性差。


單向發(fā)送.png
public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //創(chuàng)建一個producer, 參數(shù)為producer group
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定namesever地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 設(shè)置發(fā)送超時時間
        producer.setSendMsgTimeout(5000);
        // 開啟生產(chǎn)者
        producer.start();

        // 發(fā)送消息
        for(int i =0;i<10;i++){
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("oneway-topic","oneway-tag",body);
            // 方法沒有返回值
            producer.sendOneway(msg);
        }
        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }
}

消息消費

public class SomeConsumer {
    public static void main(String[] args) throws Exception{
        // 定義一個push消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("someTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            // 一但broker中有了其訂閱的消息就會觸發(fā)該方法的執(zhí)行
            // 方法返回值為當(dāng)前consumer消費的狀態(tài)
            // 這里雖然為一個列表,但是每次默認(rèn)只能消費一條消息,通過 consumer.getConsumeMessageBatchMaxSize();可以得到默認(rèn)值,也可以改成批量消費
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 逐條消費消息
                for(MessageExt msg:list){
                    System.out.println(msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }

        });
        consumer.start();
    }
}
Consumer的pullBatchSize屬性與consumeMessageBatchMaxSize屬性是否設(shè)置越大越好,當(dāng)然不是
- pullBatchSize值設(shè)置的越大,Consumer每拉取一次需要的時間就會越長,且在網(wǎng)絡(luò)上傳輸問題的可能性就越高,若在拉取過程中出現(xiàn)問題,那么本批次所有的消息都需要全部重新拉取。
- consumerMessageBatchMaxSize值設(shè)置的越大,Consumer的消息并發(fā)消費能力越低,且這批被消費的消息具有相同的消費結(jié)果,因為consumerMessageBatchSize指定的一批消息只會使用一個線程進行處理,且在處理過程中只要有一個消息處理異常,則這批消息需要全部重新再次消費處理。

有序性分類

根據(jù)有序范圍的不同,Rocketmq可以嚴(yán)格的保證消息的有序性:分區(qū)有序性與全局有序性。

  • 當(dāng)發(fā)送和消費參與的queue只有一個時所保證的有序性是整個Topic中的消息順序,稱為全局有序。


    image.png
  • 如果有多個queue參與,其僅可保證在該queue分區(qū)隊列上的消息順序,則稱為分區(qū)有序。


    image.png
public class SyncProducer {
    public static void main(String[] args) throws Exception{
        //創(chuàng)建一個producer, 參數(shù)為producer group
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定namesever地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 開啟生產(chǎn)者
        producer.start();

        // 發(fā)送消息
        for(int i =0;i<10;i++){
            Integer orderId = i;
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("topic","tag",body);
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    Integer id = (Integer) o;
                    int index = id % list.size();
                    return list.get(index);
                }
            },orderId);
            System.out.println(sendResult);
        }
        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }
}

延遲消息

當(dāng)消息寫入到Broker后,在指定的時長后才可以被消費處理,稱為延遲消息
采用rocketmq的延遲消息可以實現(xiàn)定時任務(wù)的功能,而不用使用定時器,典型的應(yīng)用場景是,電商交易中超時未支付關(guān)閉訂單的場景


image.png

事務(wù)消息

代碼舉例

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                        new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
public class TransactionListenerImpl implements TransactionListener {

    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();


    /**
     * 回調(diào)方法
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 消息回查方法:
     * 1、回調(diào)操作返回UNKNOW
     * 2、TC沒有收到TM的最終全局事務(wù)確認(rèn)指令
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

事務(wù)消息場景舉例

工行用戶A向建行用戶B轉(zhuǎn)賬1萬元


image.png

問題點:這里 1,2,3 沒有實現(xiàn)原子性,那么A賬號沒有扣款成功,但是消息已經(jīng)發(fā)送成功了,這時候就會導(dǎo)致B的賬號增加了1萬元,就會出現(xiàn)問題,這時候就需要事務(wù)消息來解決這個問題。


image.png

該分布式事務(wù)的解決方案是依賴于XA模式的,上圖中的第三步與TC向Broker發(fā)送預(yù)提交消息,這里的預(yù)提交消息(半事務(wù)消息)就是消費者還不能消費的消息。當(dāng)執(zhí)行到圖中第9步驟的時候,才會真正的寫入消息到Broker中,簡單理解TC就是管理各個分支事務(wù)的狀態(tài),這里可以看到工行系統(tǒng),Broker系統(tǒng)是兩個分支事務(wù)。TM是事務(wù)管理者,一般由Producer擔(dān)任。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一. 認(rèn)識消息隊列 1. 隊列 隊列(queue)是只允許在一端進行插入操作,而在另一端進行刪除操作的線性表(數(shù)據(jù)...
    Serializable_dx閱讀 2,259評論 0 1
  • 說明 主要內(nèi)容是在網(wǎng)上的一些文章中整理出來; 加粗的字體是比較重要的內(nèi)容,部分是自己的經(jīng)驗和理解; 整理的目的主要...
    猴子頂呱呱閱讀 1,715評論 0 52
  • 消息隊列有什么優(yōu)點和缺點? 為什么使用消息隊列?假設(shè)你的業(yè)務(wù)場景遇到個技術(shù)挑戰(zhàn),如果不用 MQ 可能會很麻煩,但是...
    java高并發(fā)閱讀 735評論 0 0
  • 一、 關(guān)鍵特性 1 消息發(fā)送和消費 1)消息發(fā)送者步驟分析: 創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名 指...
    TiaNa_na閱讀 2,289評論 0 2
  • 各種MQ的比較 MQ有rabbitMq,rocketMq,kafka,activeMq MQ有什么優(yōu)缺點 導(dǎo)致系統(tǒng)...
    80fd1d54878f閱讀 169評論 0 0

友情鏈接更多精彩內(nèi)容