快速開始
本快速入門指南是在本地計(jì)算機(jī)上設(shè)置RocketMQ消息傳遞系統(tǒng)以發(fā)送和接收消息的詳細(xì)說明。
環(huán)境要求(版本一定要正確)
- 建議使用64位操作系統(tǒng),Linux / Unix / Mac;
- 64位JDK 1.8+;
- Maven 3.2.x;
- Git的;
- 4g +免費(fèi)磁盤用于Broker服務(wù)器
從發(fā)布下載和構(gòu)建
單擊此處下載4.4.0源代碼版本。您也可以從這里下載二進(jìn)制版本。
現(xiàn)在執(zhí)行以下命令來解壓縮4.4.0源代碼版本并構(gòu)建二進(jìn)制工件。
> unzip rocketmq-all-4.4.0-source-release.zip
> cd rocketmq-all-4.4.0/
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/apache-rocketmq
啟動(dòng)Name Server
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
啟動(dòng)Broker
> nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...
發(fā)送和接收消息
在發(fā)送/接收消息之前,我們需要告訴客戶端名稱服務(wù)器的位置。RocketMQ提供了多種方法來實(shí)現(xiàn)這一目標(biāo)。為簡(jiǎn)單起見,我們使用環(huán)境變量NAMESRV_ADDR
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
關(guān)機(jī)服務(wù)器
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
簡(jiǎn)單消息示例
- 使用RocketMQ以三種方式發(fā)送消息:可靠的同步,可靠的異步和單向傳輸。
- 使用RocketMQ來使用消息
1.添加依賴關(guān)系
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
注意:rocketMQ-client的版本號(hào)必須和 rocketMQ的服務(wù)端的版本號(hào)一致,不然可能會(huì)導(dǎo)致消息發(fā)送不出去
gradle這個(gè):
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
2.1同步發(fā)送消息
可靠的同步傳輸用于廣泛的場(chǎng)景,如重要的通知消息,短信通知,短信營(yíng)銷系統(tǒng)等。
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.247.132:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
2.2異步發(fā)送消息
異步傳輸通常用于響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.247.132:9876");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
2.3以單向模式發(fā)送消息
單向傳輸用于需要中等可靠性的情況,例如日志收集。
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.247.132:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
3.消費(fèi)消息
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.247.132:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}