終篇:理解并使用RabbitMQ

說(shuō)明

RabbitMQ中MQ是MessageQueue的簡(jiǎn)寫(xiě),整體的意思應(yīng)該是像兔子一樣快的高效消息中間件,組件本質(zhì)上就是生產(chǎn)者消費(fèi)者模式,由一邊接受消息,另一邊轉(zhuǎn)發(fā)消息,其中可以對(duì)消息進(jìn)行緩存,轉(zhuǎn)發(fā)或清除。

一般實(shí)際中使用情況是推送和IM(及時(shí)通訊)

如何使用

首先要含有封裝底層的jar包,下載RabbitMQ-java-Client,下載之后放在AndroidStudio項(xiàng)目中l(wèi)ibs包中編譯,或者在module中build.gradle中添加依賴(lài):

dependencies {
    compile 'com.rabbitmq:amqp-client:4.2.0'
}

RabbitMQ是由pivotal公司開(kāi)源的一個(gè)項(xiàng)目,在Github上可以查看的源碼。

需要訪問(wèn)網(wǎng)絡(luò),不要忘記在清單文件中添加網(wǎng)絡(luò)權(quán)限:

<uses-permission android:name="android.permission.INTERNET"/>

一張簡(jiǎn)易的圖表示工作原理:

RabbitMQ.png

連接

首先配置連接服務(wù)端基本信息:

    ConnectionFactory factory = new ConnectionFactory();  //創(chuàng)建連接工廠
    
    //設(shè)置服務(wù)端連接認(rèn)證:ip號(hào),端口號(hào),登錄名,密碼
    factory.setHost(“HOST_ID");
    factory.setPort(PORT);
    factory.setUsername("USERNAME");
    factory.setPassword("PASSWORD")
    
    factory.setAutomaticRecoveryEnabled(true); //設(shè)置自動(dòng)恢復(fù)連接
    factory.setNetworkRecoveryInterval(5000);  //設(shè)置自動(dòng)連接間隔(毫秒)

以上都是一些基本配置,設(shè)置服務(wù)端上認(rèn)證信息,設(shè)置恢復(fù)連接機(jī)制,在已經(jīng)連接過(guò)之后斷開(kāi)連接,會(huì)主動(dòng)嘗試連接服務(wù),直至連接上服務(wù)或者關(guān)閉連接工廠。

如果第一沒(méi)有連接上,則會(huì)報(bào)錯(cuò)。需要循環(huán)創(chuàng)建連接,直至第一次連接很成功之后才會(huì)自動(dòng)連接,包括所有的通道(Channel)。

現(xiàn)在有了連接工廠,創(chuàng)建一個(gè)連接就夠多個(gè)通道使用,也是高效率的方式:

Connection connection = factory.newConnection();

注意這個(gè)需要在子線程中執(zhí)行,否則會(huì)報(bào)錯(cuò)誤: Android.os.NetworkOnMainThreadException。

發(fā)送消息

  • 建立通道及配置
  • 發(fā)送消息
Channel channelSend = connection.createChannel();
//第一種方式,直接指定消費(fèi)隊(duì)列
1.channelSend.queueDeclare(queueName, false, false, false, null);
  channelSend.basicPublish("", queueName, null, message.getBytes());
    
//第二種方式,指定轉(zhuǎn)換器和綁定密鑰(routingkey)
2.channelSend.exchangeDeclare(EXCHANGENAME, "topic"); //類(lèi)型四種:fanout,direct,topic,handers
  channelSend.basicPublish(EXCHANGENAME, routingkey, null, message.getBytes());

就是這么簡(jiǎn)單,但還是需要簡(jiǎn)單說(shuō)明。

完整的流程:發(fā)送者->轉(zhuǎn)換器->隊(duì)列->接受者

第一種情況,雖然沒(méi)有聲明轉(zhuǎn)換器,但是會(huì)使用匿名轉(zhuǎn)換器,發(fā)送到routingKey為隊(duì)列名的隊(duì)列中。

第二種情況,只說(shuō)明轉(zhuǎn)換器的四種類(lèi)型特點(diǎn):

  • fanout類(lèi)型:忽略routingkey的值發(fā)送給所有綁定該類(lèi)型的隊(duì)列中,
  • direct類(lèi)型:根據(jù)routingkey的值發(fā)送給匹配該值的隊(duì)列中,
  • topic類(lèi)型:它的routingKey可以使用*和#來(lái)表示,前者表示一個(gè)連著的單詞,如work;后者表示零個(gè)或多個(gè)連著的單詞,如work.first。只要發(fā)送消息的routingKey匹配接受者設(shè)置綁定exchange的routingKey的值,接受者隊(duì)列就可以收到到由exchange發(fā)送的消息。
  • headers類(lèi)型:以匹配鍵值對(duì)的形式發(fā)送和接受消息。匹配有兩種方式all和any。用的比較少,所以就沒(méi)有深入了解。

接受消息

  • 建立通道及配置
  • 設(shè)置監(jiān)聽(tīng)通道消息
Channel channel = connection.createChannel();

//不定義裝換器(默認(rèn)匿名裝換器),命名隊(duì)列,獲取實(shí)時(shí)和緩存在隊(duì)列中的消息
1. final Channel channel = connection.createChannel();
    channel.queueDeclare(queueName, false, false, false, null);

    channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            String message = new String(body, "UTF-8");
            Log.d(TAG, queueName + "接受消息->" + message);
            channel.basicAck(envelope.getDeliveryTag(), false);  //消息應(yīng)答
        }
    });

//自定義裝換器名稱(chēng)和類(lèi)型,匿名隊(duì)列,監(jiān)聽(tīng)符合routingKey的消息,只能獲取實(shí)時(shí)消息            
2. final Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, routingKey);

    channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            String message = new String(body, "UTF-8");
            Log.d(TAG, routingKey + "接受消息->" + message);
            channel.basicAck(envelope.getDeliveryTag(), false);  //消息應(yīng)答
        }
    });
        
//自定義裝換器名稱(chēng)和類(lèi)型,命名隊(duì)列,監(jiān)聽(tīng)符合routingKey的消息,獲取實(shí)時(shí)和緩存在隊(duì)列中的消息
3. final Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
    channel.queueDeclare(queueName, false, false, false, null);
    channel.queueBind(queueName, EXCHANGE_NAME, routingKey);  //設(shè)置綁定

    channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            String message = new String(body, "UTF-8");
            Log.d(TAG, routingKey + queueName + "接受消息->" + message);
            channel.basicAck(envelope.getDeliveryTag(), false);  //消息應(yīng)答
        }
    });

以上是主要的三種監(jiān)聽(tīng)消息方式:

第一種,匿名轉(zhuǎn)換器,命名隊(duì)列,監(jiān)聽(tīng)隊(duì)列中的消息。只能一對(duì)一發(fā)送并且接受,局限比較大。

第二種,指定轉(zhuǎn)換器類(lèi)型,匿名隊(duì)列,監(jiān)聽(tīng)實(shí)時(shí)routingKey的消息,篩選出匹配routingKey消息進(jìn)行發(fā)送,所以有可能發(fā)送到多個(gè)隊(duì)列中。每次連接都是獲取新的空的隊(duì)列,并且失去連接后隊(duì)列將被刪除,所以只能獲取實(shí)時(shí)的消息。

第三種,指定裝換器類(lèi)型,命名隊(duì)列,監(jiān)聽(tīng)隊(duì)列中routingKey的消息,篩選出匹配routingKey消息進(jìn)行發(fā)送,所以有可能發(fā)送到多個(gè)隊(duì)列中。獲取實(shí)時(shí)和緩存在隊(duì)列中的消息

匿名隊(duì)列的特點(diǎn):

  1. 一旦被創(chuàng)建就是一個(gè)新的空的隊(duì)列
  2. 一旦失去消費(fèi)者連接該隊(duì)列就會(huì)被服務(wù)端刪除

命名隊(duì)列特點(diǎn):

  1. 默認(rèn)循環(huán)均勻發(fā)送消息給多個(gè)消費(fèi)者,但是可以設(shè)置channel.basicQos(1)高效的發(fā)送消息給多個(gè)消費(fèi)者。
  2. 消息應(yīng)答機(jī)制,服務(wù)端確認(rèn)消息被銷(xiāo)毀才會(huì)移除該消息。
  3. 隊(duì)列和消息持久化,在服務(wù)端奔潰或者重啟是可以保存隊(duì)列和消息。
  4. 一個(gè)消息只能被一個(gè)消費(fèi)者消費(fèi)一次。

關(guān)閉

if (channelSend != null && channelSend.isOpen()) {
    try {
        channelSend.close();
    } catch (IOException | TimeoutException e) {
        e.printStackTrace();
    }
}
if (channelReceive != null && channelReceive.isOpen()) {
    try {
        channelReceive.close();
    } catch (IOException | TimeoutException e) {
        e.printStackTrace();
    }
}
if (connection != null && connection.isOpen()) {
    try {
        connection.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

封裝RabbitMQ-Android的簡(jiǎn)單使用,歡迎下載使用,并且反饋。

注:這是RabbitMQ-java版Client的指導(dǎo)教程翻譯系列文章,歡迎大家批評(píng)指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介紹隊(duì)列的使用
第三篇Publish/Subscribe介紹轉(zhuǎn)換器以及其中fanout類(lèi)型
第四篇Routing介紹direct類(lèi)型轉(zhuǎn)換器
第五篇Topics介紹topic類(lèi)型轉(zhuǎn)換器
第六篇RPC介紹遠(yuǎn)程調(diào)用

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,632評(píng)論 19 139
  • 關(guān)于消息隊(duì)列,從前年開(kāi)始斷斷續(xù)續(xù)看了些資料,想寫(xiě)很久了,但一直沒(méi)騰出空,近來(lái)分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    預(yù)流閱讀 586,638評(píng)論 51 787
  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,513評(píng)論 2 34
  • 注:這份文檔是我和幾個(gè)朋友學(xué)習(xí)后一起完成的。 目錄 RabbitMQ 概念 exchange交換機(jī)機(jī)制什么是交換機(jī)...
    Mooner_guo閱讀 33,626評(píng)論 8 97
  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html,并沒(méi)有及時(shí)更新。 術(shù)語(yǔ)對(duì)...
    joyenlee閱讀 7,806評(píng)論 0 3

友情鏈接更多精彩內(nèi)容