RabbitMQ學習使用

消息隊列(MQ)

  • 優(yōu)點:

1.解耦:通過 MQ,Pub/Sub 發(fā)布訂閱消息這么一個模型,將系統(tǒng)A就跟其它系統(tǒng)B\C\D解耦了。
2.異步:通過MQ,A調用B,C,D各需要200ms,如果調用為串行執(zhí)行,則需要600ms,如果不是時效性非常強的可以通過MQ來進行異步調用,提高響應速度;
3.削峰:當某個時間訪問量持續(xù)飆升,過了這個時間段后又趨于平緩,可以通過mq來進行消息積壓慢慢消費,降低對系統(tǒng)的壓力;

  • 缺點:

1.系統(tǒng)可靠性降低:引用外部依賴越多,服務掛機的可能性就越大
2.系統(tǒng)復雜度提高:消息重復消費問題,消息丟失問題
3.數據一致性問題:數據是否消費成功

各消息隊列應用對比

圖片.png

RabbitMQ定義

RabbitMQ:是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的(摘抄百度)。

RabbitMQ的常用組件

borker:rabbitMQ服務器;
virtual host:虛擬機,一個rabbitMQ服務器可以創(chuàng)建多個虛擬機,每個虛擬機相當于一個服務器,是相互獨立的;
exchange:交換機,用來將生產者生產的消息發(fā)送到消息隊列,交換機主要分為三類:fanout、direct、topic、headers,已及自定義的交換機;
queue:消息隊列,作為一個容器用來存儲生產者推送給消費者的消息,直至被消費者消費;
routing_key:路由鍵,生產者需要將消息發(fā)送到指定的隊列,需要通過指定路由鍵來完成,是在發(fā)送消息的時候要用到的路由鍵;
binding_key:路由鍵,在綁定的時候使用的到路由鍵,用于消息隊列的綁定;
message:消息主體,為生產者消費者;
publisher:消息生產者;
consumer:消息消費者;
connect:鏈接,一個鏈接可以建立多個信道;
channel:信道,消費者與rabbitMQ服務器之間進行消息交互的通道,由于重復進行TCP的創(chuàng)建和銷毀開銷太大,并發(fā)數會受到系統(tǒng)的限制,容易產生性能瓶頸,信道是在真實TCP鏈接的基礎上建立的虛擬鏈接,連接數量并不會受到限制,且開銷?。?br>

rabbitMQ

Exchange類型

消息路由是從消息生產者生成消息,通過信道傳遞到交換機,此時消息有一個路由鍵,路由鍵會將消息匹配到對應的隊列中,消息消費者再從隊列中獲取消息,在交換機到隊列之間的消息路由由于交換機類型的不同會產生不同的路由模式:
1、fanout:廣播形式,消息傳遞到交換機上,會以廣播的形式綁定到所有該交換機綁定的隊列中;
2、direct:消息在從交換機投遞到隊列是,消息所帶的路由鍵(routing_key)必須與隊列所綁定的路由鍵(binding_key)一致
3、topic:以一定規(guī)則將消息所帶的路由鍵與隊列綁定的路由鍵進行匹配,規(guī)則如下:1、以‘.’對路由鍵的命名根據單詞來進行分割,比如:com.rabbitMQ.queue;2、用‘#’或者‘*’來進行模糊匹配,‘*’用于匹配一個單詞,‘#’用與匹配多個單詞;

topic消息路由表

4、header:通過消息頭部信息(鍵值對形式)與交換機跟隊列構成的鍵值對匹配,比較少用;

(排他性exclusive:排他是基于鏈接可見,同一個鏈接下的不同信道,可以同時訪問統(tǒng)一鏈接創(chuàng)建的排他隊列,但其他鏈接不能對起進行訪問)

消息的傳送和消費

消息傳送

1、生產者建立一個tcp鏈接,并開啟一個信道,用于消息的傳送;
2、生產者定義一個交換機,隊列,并設置相關屬性:如持久化,交換機類型,是否排他等,通過路由鍵將交換機和隊列進行綁定;
3、生產者發(fā)送消息,消息種包含要傳送的交換機,路由鍵信息,通過信道將信息傳遞到交換機中并通過路由鍵匹配投遞到相應對列中,當消息未投遞成功會根據相應的配置的屬性值將消息丟棄或回退;
4、關閉信道,關閉鏈接;

消息消費

1、消費者建立一個tcp鏈接,并開啟一個信道,用于消息的傳送;
2、消費者想rabbitMQ Broder請求相應隊列的消息,可設置響應的回調函數;
3、消費者等到消息投遞,消息投遞主要有兩種模式:1)推模式:通過持續(xù)訂閱的方式來獲取消息;2)拉模式:用于獲取單條消息,消費者確認接收并確認消費(ack),消息從響應對列中刪除;
4、關閉信道,關閉鏈接;

連接RabbitMQ

  public  Channel  createConnectFactory() throws IOException, TimeoutException {
        Connection connection =null;
        Channel channel =null;
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //當連接失敗時,嘗試重新連接
        connectionFactory.setAutomaticRecoveryEnabled(true);
        //重試間隔時間(單位:毫秒)
        connectionFactory.setNetworkRecoveryInterval(10000);
        //方法一:
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        try {
            connection = connectionFactory.newConnection();
        }catch(java.net.ConnectException e){
            log.error("連接異常"+e.getMessage());
        }
        //方法二:可以設置多個連接地址,無法連接也不會拋出異常,接著嘗試下個連接
        Address [] addresses=new Address[]{new Address("127.0.0.1",5672),new Address("192.168.1.0",5672)};
        connection =connectionFactory.newConnection("amqp://guest:guest@localhust:5672/");
        connectionFactory.newConnection();
        channel = connection.createChannel();
        return channel;
    }

交換機和隊列的使用

public  void initRabbit(){
    Channel channel =null;
    try {
        //創(chuàng)建連接
        channel=createConnectFactory();
        String exchange="exchange";
        String queue="queue";
        String routingKey="routingKey";
        /*聲明交換機參數:exchange(交換機名), type(交換機類型), durable(是否持久化)*/
        channel.exchangeDeclare(exchange,"direct",true);
        channel.exchangeDeclare("exchange1","fanout",true);
        /*聲明隊列參數:queue(隊列名), durable(是否持久化), exclusive(是否排他), autoDelete(消息傳遞后是否自動刪除),arguments(其他結構化的參數)*/
        channel.queueDeclare(queue,true,true,false,null);
        /*交換機也可以用于綁定交換機,被綁定的交換機相當于一個隊列,對應參數:destination(被綁定的交換), source(攜帶消息的交換機),routingKey(路由鍵)*/
        channel.exchangeBind("exchange1",exchange,"exKey");
        /*隊列綁定參數:queue(隊列名),exchange(交換機名),routingKey(路由鍵), arguments(其他結構化的參數)*/
        channel.queueBind(queue,exchange,routingKey,null);
        /*內部被綁定的交換機與隊列綁定*/
        channel.queueBind("queue1","exchange1","",null);
    } catch (IOException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }
}

針對持久化問題可以通過設置'durable'屬性來對交換機和隊列進行持久化

交換機與交換機綁定

消息發(fā)送

//方法1:
/*發(fā)送消息參數:exchange(交換機名),routingKey(路由鍵),mandatory(設置消息未被成功路由是否將消息返回給生產者),props(消息設置的參數,也可自定義), body(消息體)*/
channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_BASIC,message.getBytes());
//方法2:
/*自定義消息屬性*/
channel.basicPublish(exchange,routingKey,new AMQP.BasicProperties().builder()
        .contentType("text/plain")
        //消息持久化
        .deliveryMode(2)
        .priority(1)
        .userId("hidden")
        .build(),message.getBytes());

(new AMQP.BasicProperties().builder().deliveryMode(2).build(),對消息持久化)
1、mandatory:消息發(fā)送時設置該屬性用于針對消息無法根據自身類型以及路由鍵找到對應的消費者時將消息回退給生產者(true)中還是丟棄(false);2、immediate:該屬性設置true表示當消息根據路由匹配隊列上,并未發(fā)現任何消費者時,不將消息加入到隊列中,而是返回給生產者,盡可能不用,會影響鏡像隊列的性能

消息消費

public void consume(){
        Channel channel=null;
        try {
            //推模式:發(fā)布訂閱
            channel=createConnectFactory();
            //表示消息在推送給消費者時,一次性不要給消費者推送超過設置的消息數
            channel.basicQos(1);
            Channel channel1=channel;
            String queue="queue";
            //autoAck=false:表示手動確認消息,默認為true
            Boolean autoAck=false;
//            Channel finalChannel = channel;
            channel.basicConsume(queue,autoAck,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body)
                        throws IOException
                {
                    //消息成功接收后告知隊列
                    channel1.basicAck( envelope.getDeliveryTag(),false);
                }
            });
            //拉模式
            GetResponse getResponse = channel.basicGet(queue, autoAck);
            byte[] body = getResponse.getBody();
            //消息成功接收后告知隊列:deliveryTag可以看做是消息編號
            channel.basicAck(getResponse.getEnvelope().getDeliveryTag(),false);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

注意:autoAck屬性用于表示是否確認處理,設置true表示消息發(fā)送后,無需等消費者確認消息接口便從隊列移除,設置false表示消息發(fā)送后,需要等待消費者確認,確認后從隊列中移除,未被確認的消息會一致保存在等斷開重新連接后重新加入到消息隊列進行投遞;當消費者需要拒絕消息時,可以通過:channel.basicReject(long deliveryTag, boolean requeue) throws IOException進行拒絕,表示拒絕消息編號為deliveryTag的消息, requeue為false表示將消息從隊列中移除,不會重新發(fā)送給消費者

備份交換機(AE:alternate exchange)

當存在有消息未被成功路由,不想使用mandatory參數且不想消息丟失的情況下可以設置一個備份交換機用來存儲這些消息,當需要對這些信息進行處理時去獲?。?/p>

String amexchange="amexchange";
String amQueue="amQueue";
String amRoutingKey="amRoutingKey";
String exchange="exchange";
String queue="queue";
String routingKey="routingKey";
Map<String, Object> arguments = new HashMap<>();
//聲明交換機為備份交換機
arguments.put("alternate-exchange",amexchange);
channel.exchangeDeclare(amexchange,"fanout",true,false,arguments);
channel.queueDeclare(amQueue,true,false,false,null);
channel.queueBind(amQueue,amexchange,null);
channel.exchangeDeclare(exchange,"direct",true);
channel.queueDeclare(queue,true,true,false,null);
channel.queueBind(queue,exchange,routingKey,null);
備份交換機

客戶端會根據交換機所設置的交換機類型,屬性進行展示:


客戶端展示

死信隊列(DLX:dead-letter-exchange)

1、死信隊列的形成:1)、消息被拒絕(channel.basicReject(),channel.basicNack()),并且requeue屬性為false;2)、消息過期;3)、隊列達到最大長度。
消息過期:可以對通過在聲明隊列時,定義'x-message-ttl'值來設置所有消息的過期時間或者通過在'new AMQP.BasicProperties().builder().expiration("6000").build()'來對單條消息設置過期時間
2、死信隊列是也是一個正常的隊列,通過設置隊列的x-dead-letter-exchange參數將隊列變?yōu)樗佬抨犃?,當一個非死信隊列中存在死信,則會將這條消息自動發(fā)送到死信交換機上并路由到死信隊列中;

死信隊列

死信隊列客戶端

延遲隊列

延遲對列存儲的消息對象為延遲消息,也就是生產者發(fā)送的消息并不想立刻發(fā)送給消費者,而是想等待一段時間后再進行發(fā)送,延遲隊列適用場景有定時,訂單規(guī)定時間支付,可以引用死信對列來實現,消費者所綁定的為死信對列,對每條消息設置過期時間,當消息過期后轉到死信隊列中供消費者消費;

優(yōu)先級隊列

優(yōu)先級隊列,優(yōu)先級高的隊列具有高的優(yōu)先權,優(yōu)先級高的消息具有優(yōu)先被消費的權力,設置隊列優(yōu)先級可以通過設置arguments屬性x-max-priority來設置優(yōu)先權,消息優(yōu)先權可以通過設置new AMQP.BasicProperties().builder().priority("10").build()來設置優(yōu)先權

如何保證數據傳輸可靠性

數據丟失環(huán)節(jié)
  • 消息生產者傳輸消息至MQ發(fā)生數據丟失

為保證消息成功從生產者通過信道發(fā)送至交換機中,rabbitMQ引入兩種機制來保證:

1、事務機制
public void business() throws IOException {
    Channel channel =null;
    try {
        //創(chuàng)建連接
        channel=createConnectFactory();
        //開啟事務
        channel.txSelect();
        String exchange="exchange";
        String queue="queue";
        String routingKey="routingKey";
        channel.exchangeDeclare(exchange,"direct",true);
        channel.queueDeclare(queue,true,true,false,null);
        channel.queueBind(queue,exchange,routingKey,null);
        String message="hello world";
        channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_BASIC,message.getBytes());
        //事務提交
        channel.txCommit();
    } catch (Exception e) {
        channel.txRollback();
        e.printStackTrace();
    }
}

事務的缺陷:事務的開啟會嚴重影響RabbitMQ的性能,且消息的發(fā)送只能等待前一條事務完成后再發(fā)送。

2、發(fā)送方確認機制

發(fā)送方消息確認機制是用于解決生產者知悉消息已經成功到達rabbitMQ,消息的確認一般是等消息成功路由到與其匹配的隊列中才進行確認(Basic.Ack),如果消息和隊列有實現持久化,只消息確認是在消息成功持久化到磁盤中才進行消息確認。確認機制需要將信道設置為confirm(確認)模式,該模式下消息發(fā)送時會被自動設置一個唯一ID(從1開始增長),并且消息在確認時返回的信息中會帶上該唯一ID;
相較于事務機制,發(fā)送方確認機制最大的優(yōu)點是可以異步進行,生產者應用程序可以在等待信道返回確認的同時發(fā)送消息,等待消息確認之后,生產者應用程序可以回調方法來處理確認消息,即使消息被拒絕(Basic.nack())。

public void confirm(){
    Channel channel =null;
    try {
        //創(chuàng)建連接
        channel=createConnectFactory();
        //開啟確認(confirm)模式
        channel.confirmSelect();
        String amexchange="amexchange";
        String amQueue="amQueue";
        String amRoutingKey="amRoutingKey";
        String exchange="exchange";
        String queue="queue";
        String routingKey="routingKey";
        Map<String, Object> arguments = new HashMap<>();
        //聲明交換機為備份交換機
        arguments.put("alternate-exchange",amexchange);
        channel.exchangeDeclare(amexchange,"fanout",true,false,arguments);
        channel.queueDeclare(amQueue,true,false,false,null);
        channel.queueBind(amQueue,amexchange,"");
        channel.exchangeDeclare(exchange,"direct",true);
        channel.queueDeclare(queue,true,true,false,null);
        channel.queueBind(queue,exchange,routingKey);
        String message="hello world";
        channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_BASIC,message.getBytes());
        //監(jiān)聽消息確認
        if (!channel.waitForConfirms()){
            System.out.println("發(fā)送消息失敗");
        }
        /*監(jiān)聽被拒絕的消息*/
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
            }
        });
    } catch (IOException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }catch (InterruptedException e) {
        e.printStackTrace();
    }
}

發(fā)送方消息確認機制有:
1)普通確認(confirm)方法:其實就是串行執(zhí)行,等一條消息被確認后再發(fā)送下一條,與事務機制類似,且性能類似,并沒有實現該機制真正的優(yōu)點,
2)批量確認(confirm)方法:沒發(fā)送一批消息后,就調用 channel.waitForConfirms()方法,等待服務器的確認返回。但是其存在消息如果丟失或者超時,已經被拒絕的情況下,需要重發(fā),導致性能更大的消耗。
3)異步確認(confirm)方法:提供一個回調方法,當服務器確認一條或者多條消息后悔回調該方法進行消息處理,并可以針對確認信息類型的不同來調用不同的方法,比如超時,被拒絕等等。該方法能極大提升服務器性能;
注意:confirm模式和事務區(qū)別在于一個時異步,一個是同步

  • 消息存儲在MQ內存中, MQ發(fā)生宕機,導致數據丟失

開啟數據持久化
第一步:創(chuàng)建queue時將其設置為持久化,持久化得為元數據,并不是queue里的數據
第二步:發(fā)送消息的時候將消息的delveryMode設置為2;
一般情況下數據隊列消息持久化與confirm模式結合,只有當消息持久化至硬盤中時,confirm才通知生產者ack,當為持久化時,生產者可根據需要對消息進行重發(fā);

  • 消息消費者從MQ消費消息時,還未消費完服務宕機

使用手動ack模式,只有當消息被消費者真正消費時,消費者確認后才通知隊列將數據進行移除,而不是自動ack模式,當消息到達消費者便通知隊列將數據移除,此時有可能數據還為被真正消費,消費者就宕機了;

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,200評論 2 11
  • 什么叫消息隊列? 消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復...
    Agile_dev閱讀 2,430評論 0 24
  • 概述 RabbitMQ是目前非常熱門的一款消息中間件,不管是互聯網行業(yè)還是傳統(tǒng)行業(yè)都在大量地使用 。 Rabbit...
    Tian_Peng閱讀 1,771評論 1 4
  • RabbitMq基本組件 一、 channel 信道: 信道是生產消費者與rabbit通信的渠道,生產者publi...
    一秒一心跳閱讀 3,645評論 0 2
  • 又值一年雷鋒日。 雷鋒是我們的好榜樣,我們永遠不會忘卻他。每到這個紀念日,各行各業(yè)都以各種形式開展活動,掀起新一輪...
    義川有時想閱讀 527評論 0 0

友情鏈接更多精彩內容