RocketMQ 基本使用

目錄

[TOC]


介紹

RocketMQ是阿里巴巴自研的第三代分布式消息中間件。2016年11月,阿里將RocketMQ捐獻給Apache軟件基金會,正式成為孵化項目。阿里稱會將其打造成頂級項目。

2017年2月20日,RocketMQ正式發(fā)布4.0版本,專家稱新版本適用于電商領(lǐng)域,金融領(lǐng)域,大數(shù)據(jù)領(lǐng)域,兼有物聯(lián)網(wǎng)領(lǐng)域的編程模型。


相關(guān)地址


小試牛刀

可通過自己下載源碼編譯或下載編譯好的文件,地址見上。
假設(shè)是自己下載源碼進行編譯

下載源碼并進行編譯

> git clone https://github.com/apache/incubator-rocketmq.git
> cd incubator-rocketmq
> mvn clean package install -Prelease-all assembly:assembly -U
> cd target/apache-rocketmq-all/

Start Name Server

> nohup sh bin/mqnamesrv &
> tailf nohup.out

Start Broker

> nohup sh bin/mqbroker -n localhost:9876 &
> tailf nohup.out

注意如果這里啟動失敗,看一下內(nèi)存是否足夠,可以看一下“runbroker.sh”這個文件,對應(yīng)的修改參數(shù),如下

JAVA_OPT="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"

測試發(fā)送與接收

 > export NAMESRV_ADDR=localhost:9876
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

關(guān)閉服務(wù)

> sh bin/mqshutdown broker
> sh bin/mqshutdown namesrv

在Java項目中的使用

pom.xml

<properties>
    <rocketmq_ver>4.0.0-incubating</rocketmq_ver>
</properties>
<dependencies>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>${rocketmq_ver}</version>
</dependency>
</dependencies>

生產(chǎn)者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {

    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("Producer");
        producer.setNamesrvAddr(Config.ADDR);
        try {
            producer.start();

            Message msg = new Message("PushTopic", "push", "1", "Just for push1.".getBytes());

            SendResult result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());

            msg = new Message("PushTopic", "push", "2", "Just for push2.".getBytes());

            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());

            msg = new Message("PushTopic", "pull", "1", "Just for pull.".getBytes());

            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }
}

消費者

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args) {
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer("PushConsumer");
        consumer.setNamesrvAddr(Config.ADDR);
        try {
            //訂閱PushTopic下Tag為push的消息
            consumer.subscribe("PushTopic", "push");
           /**
            * 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
            * 如果非第一次啟動,那么按照上次消費的位置繼續(xù)消費
            */
            consumer.setConsumeFromWhere(
                    ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(
                    new MessageListenerConcurrently() {
                        public ConsumeConcurrentlyStatus consumeMessage(
                                List<MessageExt> msgs,
                                ConsumeConcurrentlyContext Context) {
                            for (Message msg : msgs) {
                                System.out.println(new String(msg.getBody()) + ":" + msg.toString());
                            }
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                    }
            );
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

結(jié)果

id:C0A801663174723279CF77AF3C6E0000 result:SEND_OK
id:C0A801663174723279CF77AF3C7B0001 result:SEND_OK
id:C0A801663174723279CF77AF3C7D0002 result:SEND_OK
Just for push1.:MessageExt [queueId=2, storeSize=184, queueOffset=14, sysFlag=0, bornTimestamp=1490348772974, bornHost=/192.168.127.1:53238, storeTimestamp=1490348775615, storeHost=/192.168.127.128:10911, msgId=C0A87F8000002A9F000000000002EDE8, commitLogOffset=191976, bodyCRC=1396413800, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=15, KEYS=1, CONSUME_START_TIME=1490348782880, UNIQ_KEY=C0A801663174723279CF77AF3C6E0000, WAIT=true, TAGS=push}, body=15]]
Just for push2.:MessageExt [queueId=3, storeSize=184, queueOffset=14, sysFlag=0, bornTimestamp=1490348772987, bornHost=/192.168.127.1:53238, storeTimestamp=1490348775620, storeHost=/192.168.127.128:10911, msgId=C0A87F8000002A9F000000000002EEA0, commitLogOffset=192160, bodyCRC=2014758571, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=15, KEYS=2, CONSUME_START_TIME=1490348782882, UNIQ_KEY=C0A801663174723279CF77AF3C7B0001, WAIT=true, TAGS=push}, body=15]]

參考

我的CSDN博客:http://blog.csdn.net/zhongxianyao/article/details/65634985

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容