說(shuō)明
RabbitMQ中MQ是MessageQueue的簡(jiǎn)寫(xiě),整體的意思應(yīng)該是像兔子一樣快的高效消息中間件,組件本質(zhì)上就是生產(chǎn)者消費(fèi)者模式,由一邊接受消息,另一邊轉(zhuǎn)發(fā)消息,其中可以對(duì)消息進(jìn)行緩存,轉(zhuǎn)發(fā)或清除。
一般實(shí)際中使用情況是推送和IM(及時(shí)通訊)
如何使用
首先要含有封裝底層的jar包,下載RabbitMQ-java-Client,下載之后放在AndroidStudio項(xiàng)目中l(wèi)ibs包中編譯,或者在module中build.gradle中添加依賴(lài):
dependencies {
compile 'com.rabbitmq:amqp-client:4.2.0'
}
RabbitMQ是由pivotal公司開(kāi)源的一個(gè)項(xiàng)目,在Github上可以查看的源碼。
需要訪問(wèn)網(wǎng)絡(luò),不要忘記在清單文件中添加網(wǎng)絡(luò)權(quán)限:
<uses-permission android:name="android.permission.INTERNET"/>
一張簡(jiǎn)易的圖表示工作原理:

連接
首先配置連接服務(wù)端基本信息:
ConnectionFactory factory = new ConnectionFactory(); //創(chuàng)建連接工廠
//設(shè)置服務(wù)端連接認(rèn)證:ip號(hào),端口號(hào),登錄名,密碼
factory.setHost(“HOST_ID");
factory.setPort(PORT);
factory.setUsername("USERNAME");
factory.setPassword("PASSWORD")
factory.setAutomaticRecoveryEnabled(true); //設(shè)置自動(dòng)恢復(fù)連接
factory.setNetworkRecoveryInterval(5000); //設(shè)置自動(dòng)連接間隔(毫秒)
以上都是一些基本配置,設(shè)置服務(wù)端上認(rèn)證信息,設(shè)置恢復(fù)連接機(jī)制,在已經(jīng)連接過(guò)之后斷開(kāi)連接,會(huì)主動(dòng)嘗試連接服務(wù),直至連接上服務(wù)或者關(guān)閉連接工廠。
如果第一沒(méi)有連接上,則會(huì)報(bào)錯(cuò)。需要循環(huán)創(chuàng)建連接,直至第一次連接很成功之后才會(huì)自動(dòng)連接,包括所有的通道(Channel)。
現(xiàn)在有了連接工廠,創(chuàng)建一個(gè)連接就夠多個(gè)通道使用,也是高效率的方式:
Connection connection = factory.newConnection();
注意這個(gè)需要在子線程中執(zhí)行,否則會(huì)報(bào)錯(cuò)誤: Android.os.NetworkOnMainThreadException。
發(fā)送消息
- 建立通道及配置
- 發(fā)送消息
Channel channelSend = connection.createChannel();
//第一種方式,直接指定消費(fèi)隊(duì)列
1.channelSend.queueDeclare(queueName, false, false, false, null);
channelSend.basicPublish("", queueName, null, message.getBytes());
//第二種方式,指定轉(zhuǎn)換器和綁定密鑰(routingkey)
2.channelSend.exchangeDeclare(EXCHANGENAME, "topic"); //類(lèi)型四種:fanout,direct,topic,handers
channelSend.basicPublish(EXCHANGENAME, routingkey, null, message.getBytes());
就是這么簡(jiǎn)單,但還是需要簡(jiǎn)單說(shuō)明。
完整的流程:發(fā)送者->轉(zhuǎn)換器->隊(duì)列->接受者
第一種情況,雖然沒(méi)有聲明轉(zhuǎn)換器,但是會(huì)使用匿名轉(zhuǎn)換器,發(fā)送到routingKey為隊(duì)列名的隊(duì)列中。
第二種情況,只說(shuō)明轉(zhuǎn)換器的四種類(lèi)型特點(diǎn):
- fanout類(lèi)型:忽略routingkey的值發(fā)送給所有綁定該類(lèi)型的隊(duì)列中,
- direct類(lèi)型:根據(jù)routingkey的值發(fā)送給匹配該值的隊(duì)列中,
- topic類(lèi)型:它的routingKey可以使用*和#來(lái)表示,前者表示一個(gè)連著的單詞,如work;后者表示零個(gè)或多個(gè)連著的單詞,如work.first。只要發(fā)送消息的routingKey匹配接受者設(shè)置綁定exchange的routingKey的值,接受者隊(duì)列就可以收到到由exchange發(fā)送的消息。
- headers類(lèi)型:以匹配鍵值對(duì)的形式發(fā)送和接受消息。匹配有兩種方式all和any。用的比較少,所以就沒(méi)有深入了解。
接受消息
- 建立通道及配置
- 設(shè)置監(jiān)聽(tīng)通道消息
Channel channel = connection.createChannel();
//不定義裝換器(默認(rèn)匿名裝換器),命名隊(duì)列,獲取實(shí)時(shí)和緩存在隊(duì)列中的消息
1. final Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
Log.d(TAG, queueName + "接受消息->" + message);
channel.basicAck(envelope.getDeliveryTag(), false); //消息應(yīng)答
}
});
//自定義裝換器名稱(chēng)和類(lèi)型,匿名隊(duì)列,監(jiān)聽(tīng)符合routingKey的消息,只能獲取實(shí)時(shí)消息
2. final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
Log.d(TAG, routingKey + "接受消息->" + message);
channel.basicAck(envelope.getDeliveryTag(), false); //消息應(yīng)答
}
});
//自定義裝換器名稱(chēng)和類(lèi)型,命名隊(duì)列,監(jiān)聽(tīng)符合routingKey的消息,獲取實(shí)時(shí)和緩存在隊(duì)列中的消息
3. final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey); //設(shè)置綁定
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
Log.d(TAG, routingKey + queueName + "接受消息->" + message);
channel.basicAck(envelope.getDeliveryTag(), false); //消息應(yīng)答
}
});
以上是主要的三種監(jiān)聽(tīng)消息方式:
第一種,匿名轉(zhuǎn)換器,命名隊(duì)列,監(jiān)聽(tīng)隊(duì)列中的消息。只能一對(duì)一發(fā)送并且接受,局限比較大。
第二種,指定轉(zhuǎn)換器類(lèi)型,匿名隊(duì)列,監(jiān)聽(tīng)實(shí)時(shí)routingKey的消息,篩選出匹配routingKey消息進(jìn)行發(fā)送,所以有可能發(fā)送到多個(gè)隊(duì)列中。每次連接都是獲取新的空的隊(duì)列,并且失去連接后隊(duì)列將被刪除,所以只能獲取實(shí)時(shí)的消息。
第三種,指定裝換器類(lèi)型,命名隊(duì)列,監(jiān)聽(tīng)隊(duì)列中routingKey的消息,篩選出匹配routingKey消息進(jìn)行發(fā)送,所以有可能發(fā)送到多個(gè)隊(duì)列中。獲取實(shí)時(shí)和緩存在隊(duì)列中的消息
匿名隊(duì)列的特點(diǎn):
- 一旦被創(chuàng)建就是一個(gè)新的空的隊(duì)列
- 一旦失去消費(fèi)者連接該隊(duì)列就會(huì)被服務(wù)端刪除
命名隊(duì)列特點(diǎn):
- 默認(rèn)循環(huán)均勻發(fā)送消息給多個(gè)消費(fèi)者,但是可以設(shè)置channel.basicQos(1)高效的發(fā)送消息給多個(gè)消費(fèi)者。
- 消息應(yīng)答機(jī)制,服務(wù)端確認(rèn)消息被銷(xiāo)毀才會(huì)移除該消息。
- 隊(duì)列和消息持久化,在服務(wù)端奔潰或者重啟是可以保存隊(duì)列和消息。
- 一個(gè)消息只能被一個(gè)消費(fèi)者消費(fèi)一次。
關(guān)閉
if (channelSend != null && channelSend.isOpen()) {
try {
channelSend.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (channelReceive != null && channelReceive.isOpen()) {
try {
channelReceive.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
封裝RabbitMQ-Android的簡(jiǎn)單使用,歡迎下載使用,并且反饋。
注:這是RabbitMQ-java版Client的指導(dǎo)教程翻譯系列文章,歡迎大家批評(píng)指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介紹隊(duì)列的使用
第三篇Publish/Subscribe介紹轉(zhuǎn)換器以及其中fanout類(lèi)型
第四篇Routing介紹direct類(lèi)型轉(zhuǎn)換器
第五篇Topics介紹topic類(lèi)型轉(zhuǎn)換器
第六篇RPC介紹遠(yuǎn)程調(diào)用