1.創(chuàng)建一個(gè)maven工程
導(dǎo)入依賴:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
2.新建一個(gè)Const類和一個(gè)Produce類,并發(fā)送消息
package com.young.rocketmq.constants;
public class Const {
public static final String NAMESRV_ADDR = "192.168.80.188:9876";
}
package com.young.rocketmq.quickstart;
import com.young.rocketmq.constants.Const;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
producer.setNamesrvAddr(Const.NAMESRV_ADDR);
producer.start();
for (int i = 0; i < 5; i++) {
//1.創(chuàng)建消息
Message message = new Message(
"test_quick_topic",//主題
"TagA",//標(biāo)簽
"key" + i,//用戶自定義的key,唯一的標(biāo)識
("Hello RocketMQ" + i).getBytes()//消息內(nèi)容實(shí)體(byte[])
);
//2.發(fā)送消息
SendResult sr = producer.send(message);
System.out.println("消息發(fā)出: " + sr);
}
producer.shutdown();
}
}
3.查看控制臺輸出
消息發(fā)出: SendResult [sendStatus=SEND_OK, msgId=C0A801655E0418B4AAC20F0643570000, offsetMsgId=C0A850BC00002A9F0000000000000000, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=2], queueOffset=0]
消息發(fā)出: SendResult [sendStatus=SEND_OK, msgId=C0A801655E0418B4AAC20F0643F10001, offsetMsgId=C0A850BC00002A9F00000000000000C2, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=3], queueOffset=0]
消息發(fā)出: SendResult [sendStatus=SEND_OK, msgId=C0A801655E0418B4AAC20F0644000002, offsetMsgId=C0A850BC00002A9F0000000000000184, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=0], queueOffset=0]
消息發(fā)出: SendResult [sendStatus=SEND_OK, msgId=C0A801655E0418B4AAC20F0644060003, offsetMsgId=C0A850BC00002A9F0000000000000246, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=1], queueOffset=0]
消息發(fā)出: SendResult [sendStatus=SEND_OK, msgId=C0A801655E0418B4AAC20F0644120004, offsetMsgId=C0A850BC00002A9F0000000000000308, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=2], queueOffset=1]
22:01:11.899 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
22:01:11.903 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.80.188:9876] result: true
22:01:11.903 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.80.188:10909] result: true