RocketMQ-生產(chǎn)者使用

1.創(chuàng)建一個(gè)maven工程

導(dǎo)入依賴:

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>

2.新建一個(gè)Const類和一個(gè)Produce類,并發(fā)送消息

package com.young.rocketmq.constants;

public class Const {

    public static final String NAMESRV_ADDR = "192.168.80.188:9876";
    
}
package com.young.rocketmq.quickstart;

import com.young.rocketmq.constants.Const;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class Producer {

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {

        DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");

        producer.setNamesrvAddr(Const.NAMESRV_ADDR);

        producer.start();

        for (int i = 0; i < 5; i++) {

            //1.創(chuàng)建消息
            Message message = new Message(
                    "test_quick_topic",//主題
                    "TagA",//標(biāo)簽
                    "key" + i,//用戶自定義的key,唯一的標(biāo)識
                    ("Hello RocketMQ" + i).getBytes()//消息內(nèi)容實(shí)體(byte[])
            );

            //2.發(fā)送消息
            SendResult sr = producer.send(message);
            System.out.println("消息發(fā)出: " + sr);
        }

        producer.shutdown();

    }

}

3.查看控制臺輸出

消息發(fā)出: SendResult [sendStatus=SEND_OK, msgId=C0A801655E0418B4AAC20F0643570000, offsetMsgId=C0A850BC00002A9F0000000000000000, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=2], queueOffset=0]
消息發(fā)出: SendResult [sendStatus=SEND_OK, msgId=C0A801655E0418B4AAC20F0643F10001, offsetMsgId=C0A850BC00002A9F00000000000000C2, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=3], queueOffset=0]
消息發(fā)出: SendResult [sendStatus=SEND_OK, msgId=C0A801655E0418B4AAC20F0644000002, offsetMsgId=C0A850BC00002A9F0000000000000184, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=0], queueOffset=0]
消息發(fā)出: SendResult [sendStatus=SEND_OK, msgId=C0A801655E0418B4AAC20F0644060003, offsetMsgId=C0A850BC00002A9F0000000000000246, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=1], queueOffset=0]
消息發(fā)出: SendResult [sendStatus=SEND_OK, msgId=C0A801655E0418B4AAC20F0644120004, offsetMsgId=C0A850BC00002A9F0000000000000308, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=2], queueOffset=1]
22:01:11.899 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
22:01:11.903 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.80.188:9876] result: true
22:01:11.903 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.80.188:10909] result: true
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容