最簡(jiǎn)單的demo,編寫一個(gè)消息監(jiān)聽和消息發(fā)送。namesrv,broker,producer,consuer都是一個(gè)。本文的意思在于初學(xué)者可以根據(jù)文章的代碼,操作復(fù)制出一個(gè)入門例子出來。
- producer。 該節(jié)點(diǎn)是用于發(fā)送消息。
- consumer。該節(jié)點(diǎn)用于接受發(fā)送的消息。
- namesrv。 rocketmq的生產(chǎn)者和消費(fèi)者都不會(huì)記錄broker的實(shí)際地址,所以broker的地址會(huì)放在namesrv節(jié)點(diǎn)。broker啟動(dòng)的時(shí)候,把自己的地址寫進(jìn)namesrv,producer和consumer啟動(dòng)的時(shí)候會(huì)從namesrv中讀取broker的地址。
- broker。 該節(jié)點(diǎn)主要是接受生產(chǎn)者的消息,然后發(fā)送給消費(fèi)者,并且還會(huì)存儲(chǔ)記錄消息。
一,下載
http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip
二,啟動(dòng)rocketmq服務(wù)
- 2.1 先啟動(dòng)nameserver
>>bin/nameserver
- 2.2 啟動(dòng)broker
>>bin/mqbroker -n localhost:9876
>>nohup sh mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true > broker_log.log 2>&1 &
三,編寫簡(jiǎn)單java代碼
- 3.1 maven 依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
- 3.2 java代碼生產(chǎn)者和消費(fèi)者
public class ConsumerMain {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("topicName","*");
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(list);
System.out.println(consumeConcurrentlyContext);
return null;
}
});
consumer.start();
System.out.println("消費(fèi)者啟動(dòng)");
}
}
public class ProducerMain {
public static void main( String[] args ) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ConsumerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
producer.setSendMsgTimeout(30000);
for (int i = 0; i < 50000000; i++) {
Message msg = new Message("topicName" ,("Hello_RocketMQ " + i).getBytes("UTF-8"));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
Thread.sleep(3000);
}
System.out.println("生產(chǎn)者發(fā)送了");
producer.shutdown();
}
}