rocketmq入門的demo

最簡(jiǎn)單的demo,編寫一個(gè)消息監(jiān)聽和消息發(fā)送。namesrv,broker,producer,consuer都是一個(gè)。本文的意思在于初學(xué)者可以根據(jù)文章的代碼,操作復(fù)制出一個(gè)入門例子出來。

  • producer。 該節(jié)點(diǎn)是用于發(fā)送消息。
  • consumer。該節(jié)點(diǎn)用于接受發(fā)送的消息。
  • namesrv。 rocketmq的生產(chǎn)者和消費(fèi)者都不會(huì)記錄broker的實(shí)際地址,所以broker的地址會(huì)放在namesrv節(jié)點(diǎn)。broker啟動(dòng)的時(shí)候,把自己的地址寫進(jìn)namesrv,producer和consumer啟動(dòng)的時(shí)候會(huì)從namesrv中讀取broker的地址。
  • broker。 該節(jié)點(diǎn)主要是接受生產(chǎn)者的消息,然后發(fā)送給消費(fèi)者,并且還會(huì)存儲(chǔ)記錄消息。

一,下載

http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip

二,啟動(dòng)rocketmq服務(wù)

  • 2.1 先啟動(dòng)nameserver
>>bin/nameserver
  • 2.2 啟動(dòng)broker
>>bin/mqbroker -n localhost:9876 
>>nohup sh mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true > broker_log.log 2>&1 &

三,編寫簡(jiǎn)單java代碼

  • 3.1 maven 依賴
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>
  • 3.2 java代碼生產(chǎn)者和消費(fèi)者
public class ConsumerMain {
    public static void main(String[] args) throws  Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("topicName","*");
        consumer.registerMessageListener(new MessageListenerConcurrently(){
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println(list);
                System.out.println(consumeConcurrentlyContext);
                return null;
            }
        });
        consumer.start();
        System.out.println("消費(fèi)者啟動(dòng)");
    }
}
public class ProducerMain {
    public static void main( String[] args ) throws     Exception {
         DefaultMQProducer producer = new DefaultMQProducer("ConsumerGroup");
         producer.setNamesrvAddr("127.0.0.1:9876");
         producer.start();
         producer.setSendMsgTimeout(30000);
         for (int i = 0; i < 50000000; i++) {
             Message msg = new Message("topicName" ,("Hello_RocketMQ " + i).getBytes("UTF-8"));
             SendResult sendResult = producer.send(msg);
             System.out.printf("%s%n", sendResult);
             Thread.sleep(3000);
         }
         System.out.println("生產(chǎn)者發(fā)送了");
         producer.shutdown();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • rocketMq的部署架構(gòu)模型 RocketMQ是一個(gè)分布式開放消息中間件,底層基于隊(duì)列模型來實(shí)現(xiàn)消息收發(fā)功能。R...
    CgySHFF閱讀 1,238評(píng)論 0 1
  • 1 架構(gòu)原理 1.1 應(yīng)用場(chǎng)景 只支持發(fā)布訂閱模式。 大數(shù)據(jù)量的消息堆積能力,最終數(shù)據(jù)是持久化到磁盤上,理論上無限...
    可笑可樂閱讀 9,630評(píng)論 0 2
  • 核心組件(4個(gè)組件+消息存儲(chǔ)結(jié)構(gòu)) 客戶端消費(fèi)模式 1. MQ的使用場(chǎng)景 昨天在寫完之后,有些讀者在評(píng)論中提出:到...
    樓亭樵客閱讀 1,142評(píng)論 0 3
  • RocketMq的部署方式 NameServer集群 提供輕量級(jí)的服務(wù)發(fā)現(xiàn)和路由。每個(gè)NameServer記錄完整...
    Patrick_e604閱讀 8,810評(píng)論 0 1
  • 短瀏海在2018強(qiáng)勢(shì)回歸,不只顯得五官立體,而且超減齡,復(fù)古又可愛,不管是燙卷的QQ瀏海還是各種短瀏海,都讓名人也...
    老金博客閱讀 449評(píng)論 0 1

友情鏈接更多精彩內(nèi)容