RocketMQ-發(fā)送消息

RocketMQ-消息發(fā)送.jpeg

運行NameServer和Broker后,我們可以嘗試用代碼示例寫一下:消息發(fā)送的三種方式

  • 同步發(fā)送,適用場景:對可靠性高的業(yè)務(wù)場景
  • 異步發(fā)送,適用場景:對響應(yīng)時間比較敏感的業(yè)務(wù)場景,就是不允許長時間等待結(jié)果的場景
  • 發(fā)送一次,適用場景:不關(guān)心發(fā)送結(jié)果場景。比如日志發(fā)送

依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>

同步發(fā)送

public class SyncProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        //設(shè)置生產(chǎn)者組
        defaultMQProducer.setProducerGroup("syncProducer");

        //設(shè)置nameserver
        defaultMQProducer.setNamesrvAddr("localhost:9876");

        //啟動生產(chǎn)者
        defaultMQProducer.start();

        for (int i = 0; i < 10; i++) {
            //構(gòu)建消息 topic tag 內(nèi)容
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //同步發(fā)送,且返回結(jié)果
            SendResult sendResult = defaultMQProducer.send(msg);
            System.out.println("發(fā)送結(jié)果"+sendResult);
        }
        //關(guān)閉生產(chǎn)者
        defaultMQProducer.shutdown();
    }
}
//運行結(jié)果
發(fā)送結(jié)果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2040000, offsetMsgId=C0A81FF100002A9F0000000000045402, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=350]
發(fā)送結(jié)果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2230001, offsetMsgId=C0A81FF100002A9F00000000000454CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=350]
發(fā)送結(jié)果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2260002, offsetMsgId=C0A81FF100002A9F0000000000045594, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=350]
發(fā)送結(jié)果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22A0003, offsetMsgId=C0A81FF100002A9F000000000004565D, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=350]
發(fā)送結(jié)果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22C0004, offsetMsgId=C0A81FF100002A9F0000000000045726, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=351]
發(fā)送結(jié)果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22E0005, offsetMsgId=C0A81FF100002A9F00000000000457EF, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=351]
發(fā)送結(jié)果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2300006, offsetMsgId=C0A81FF100002A9F00000000000458B8, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=351]
發(fā)送結(jié)果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2320007, offsetMsgId=C0A81FF100002A9F0000000000045981, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=351]
發(fā)送結(jié)果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2340008, offsetMsgId=C0A81FF100002A9F0000000000045A4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=352]
發(fā)送結(jié)果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2360009, offsetMsgId=C0A81FF100002A9F0000000000045B13, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=352]
14:29:56.930 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true

異步發(fā)送

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //設(shè)置生產(chǎn)者名字
        DefaultMQProducer producer = new DefaultMQProducer("asyncproducer");
        // 設(shè)置nameserver的地址
        producer.setNamesrvAddr("localhost:9876");
        //生產(chǎn)者啟動
        producer.start();
        //發(fā)送失敗重試次數(shù)
        producer.setRetryTimesWhenSendAsyncFailed(0);
        
        int messageCount = 10;
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("async_topic",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                //異步發(fā)送,這里多了一個SendCallBack回調(diào)函數(shù)
                producer.send(msg, new SendCallback() {
                    //發(fā)送成功
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("發(fā)送成功:"+sendResult);
                    }
                    //發(fā)送失敗
                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("發(fā)送失敗:"+e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        TimeUnit.SECONDS.sleep(3);
        producer.shutdown();
    }
}
//運行結(jié)果
發(fā)送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868BF40001, offsetMsgId=C0A81FF100002A9F0000000000046438, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=3], queueOffset=2]
發(fā)送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C1A0004, offsetMsgId=C0A81FF100002A9F0000000000046790, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=1]
發(fā)送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868BFA0002, offsetMsgId=C0A81FF100002A9F000000000004650E, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=3], queueOffset=3]
發(fā)送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C0B0003, offsetMsgId=C0A81FF100002A9F00000000000466BA, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=2], queueOffset=5]
發(fā)送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868BF10000, offsetMsgId=C0A81FF100002A9F00000000000465E4, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=1], queueOffset=2]
發(fā)送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C1C0005, offsetMsgId=C0A81FF100002A9F0000000000046866, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=2]
發(fā)送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C1C0006, offsetMsgId=C0A81FF100002A9F000000000004693C, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=3]
發(fā)送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C280007, offsetMsgId=C0A81FF100002A9F0000000000046A12, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=1], queueOffset=3]
發(fā)送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C2D0008, offsetMsgId=C0A81FF100002A9F0000000000046AE8, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=1], queueOffset=4]
發(fā)送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C330009, offsetMsgId=C0A81FF100002A9F0000000000046BBE, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=4]
14:49:17.262 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:49:17.265 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:49:17.265 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true

發(fā)送一次

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //初始化生產(chǎn)者
        DefaultMQProducer producer = new DefaultMQProducer("onewayproducer");
        // 置頂nameserver名字
        producer.setNamesrvAddr("localhost:9876");
        //啟動生產(chǎn)者
        producer.start();
        for (int i = 0; i < 10; i++) {
            //構(gòu)建消息體
            Message msg = new Message("oneway_topic" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //發(fā)送一次性消息
            producer.sendOneway(msg);
        }

        Thread.sleep(5000);        
        producer.shutdown();
    }
}

同步發(fā)送,異步發(fā)送,一次性發(fā)送是RocketMQ主要的三種消息發(fā)送方式。

來一個拓展:類似的Kafka和RabbitMQ有哪些消息發(fā)送方式,歡迎留言回答。

工作中,具體用什么發(fā)送方式,得取決你的業(yè)務(wù)場景。需要可靠性的,就用同步發(fā)送。需要低延時的,就用異步發(fā)送,如果壓根不care發(fā)送結(jié)果,直接用一次性發(fā)送。

后續(xù)文章

  • RocketMQ-入門(已更新)
  • RocketMQ-消息發(fā)送(已更新)
  • RocketMQ-消費信息
  • RocketMQ-消費者的廣播模式和集群模式
  • RocketMQ-順序消息
  • RocketMQ-延遲消息
  • RocketMQ-批量消息
  • RocketMQ-過濾消息
  • RocketMQ-事務(wù)消息
  • RocketMQ-消息存儲
  • RocketMQ-高可用
  • RocketMQ-高性能
  • RocketMQ-主從復(fù)制
  • RocketMQ-刷盤機制
  • RocketMQ-冪等性
  • RocketMQ-消息重試
  • RocketMQ-死信隊列
    ...

歡迎各位入(guan)股(zhu),后續(xù)文章干貨多多。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容