RabbitMQ的使用

1. Helloworld-基本消息模型

  1. 導入的jar包
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <!--和springboot2.0.5對應-->
        <version>5.4.1</version>
    </dependency>

  1. 連接工具類
public class ConnectionUtil {
    /**
     * 建立與RabbitMQ的連接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定義連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);
        //設置賬號信息,用戶名、密碼、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通過工程獲取連接
        Connection connection = factory.newConnection();
        return connection;
    }
}

1582541127455.png

發(fā)送方

流程:

①:獲得連接對象

②:創(chuàng)建與交換機的連接通道

③:創(chuàng)建消息隊列

④:發(fā)送消息

public class Send {
    public static final String QUEUE_NAME_HELLO = "queue_name_hello";
    public static void main(String[] args) {
        Connection connection = null;
        try {
            //1. 創(chuàng)建連接
            connection = ConnectionUtil.getConnection();
            //2. 創(chuàng)建與Exchange的通道,每個連接可以創(chuàng)建多個通道,每個通道代表一個會話任務
            Channel channel = connection.createChannel();
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數(shù)
             */
            channel.queueDeclare(QUEUE_NAME_HELLO,true,false,false,null);
            String message = "hello rabbitmq";
            /**
             * 消息發(fā)布方法
             * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
             * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發(fā)到指定的消息隊列
             * param3:消息包含的屬性
             * param4:消息體
             */
            /**
             * 這里沒有指定交換機,消息將發(fā)送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯
             示綁定或解除綁定
             * 默認的交換機,routingKey等于隊列名稱
             */
            channel.basicPublish("", QUEUE_NAME_HELLO,
                    null,message.getBytes());
            System.out.println("message send successful!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消費者:

流程:

①:獲得連接對象

②:創(chuàng)建與交換機的連接通道

③:聲明監(jiān)聽隊列

④:從隊列中獲取消息,接收消息的回調

public class Consumer01 {
    public static void main(String[] args) {
        Connection connection = null;
        try {
            //1. 獲得連接
            connection = ConnectionUtil.getConnection();
            //2. 創(chuàng)建與Exchange的通道,每個連接可以創(chuàng)建多個通道,每個通道代表一個會話任務
            Channel channel = connection.createChannel();
            //3. 聲明隊列

            /**
             * 消費者接收消息調用此方法
             * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定
             * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志
            (收到消息失敗后是否需要重新發(fā)送)
             * @param properties
             * @param body
             */

            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消費者標識:"+consumerTag);
                    System.out.println("交換機名稱:"+envelope.getExchange());
                    System.out.println("路由key:"+envelope.getRoutingKey());
                    System.out.println("消息id:"+envelope.getDeliveryTag());
                    System.out.println("接收消息:"+new String(body));
                    System.out.println("消息接收成功!");
                }
            };
            /**
             * 監(jiān)聽隊列String queue, boolean autoAck,Consumer callback
             * 參數(shù)明細
             * 1、隊列名稱
             * 2、是否自動回復,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置
             為false則需要手動回復
             * 3、消費消息的方法,消費者接收到消息后調用此方法
             */
            channel.basicConsume(Send.QUEUE_NAME_HELLO,true , consumer);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

啟動Send發(fā)送消息,可以看到當前的隊列中有一條消息處于就緒狀態(tài)!

1582540913388.png

啟動Consumer01接收消息,消息被成功接收!

消費者標識:amq.ctag-ALoQPcEfIDlldhAZqAU4CQ
交換機名稱:
路由key:queue_name_hello
消息id:1
接收消息:hello rabbitmq
消息接收成功!
1582540844236.png

2. 消息確認機制(ACK)

在上述代碼中,在basicConsume()里的autoAck設置的值為true,他表示自動簽收,即當消費者接收到了消息之后,隊列中的消息就會立即刪除,如果為false就表示為手動簽收!

RabbitMQ有一個ACK機制:即當消費者接收到了消息后,會向 RabbitMQ發(fā)送一個回執(zhí)ACK,告知消息已經被接收!RabbitMQ會將隊列中的消息刪除掉!ACK機制分為兩種

自動簽收:消息一旦被接收,自動發(fā)送回執(zhí)ACK

手動簽收:消息被接收后,需要手動發(fā)送ACK

自動簽收存在的問題:

模擬一個異常環(huán)境,當消費者接收到消息,進入回調函數(shù)中,人為拋出一個異常,查看消息接收狀態(tài)!

            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    int i = 1/0; //模擬異常環(huán)境
                    System.out.println("消費者標識:"+consumerTag);
                    System.out.println("交換機名稱:"+envelope.getExchange());
                    System.out.println("路由key:"+envelope.getRoutingKey());
                    System.out.println("消息id:"+envelope.getDeliveryTag());
                    System.out.println("接收消息:"+new String(body));
                    System.out.println("消息接收成功!");
                }
            };
        
            channel.basicConsume(Send.QUEUE_NAME_HELLO,true , consumer);
    }

}

此時再次發(fā)送消息:


1582543055232.png

[圖片上傳失敗...(image-b23445-1585216601052)]

再次接收:控制臺沒有打印任何信息,消息是沒有被成功接收到的

1582543124202.png

[圖片上傳失敗...(image-e39c33-1585216601052)]

但是隊列中的消息已經被刪除掉了,說明,在自動簽收的情況下,如果遇到異常,數(shù)據(jù)會丟失!因此如果數(shù)據(jù)很重要建議使用手動模式!

Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //int i = 1/0;
                    System.out.println("消費者標識:"+consumerTag);
                    System.out.println("交換機名稱:"+envelope.getExchange());
                    System.out.println("路由key:"+envelope.getRoutingKey());
                    System.out.println("消息id:"+envelope.getDeliveryTag());
                    System.out.println("接收消息:"+new String(body));
                    System.out.println("消息接收成功!");
                    channel.basicAck(envelope.getDeliveryTag(), false);//手動提交ACK
                }
            };
channel.basicConsume(Send.QUEUE_NAME_HELLO,false , consumer);

3. Work queues模型

1582543583679.png

[圖片上傳失敗...(image-56d745-1585216601052)]

work queues與入門程序相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息!

結果:

  • 同一個消息只能給一個一個消費者

  • RabbitMQ采用輪詢的方式將消息發(fā)送給消費者

  • 消費者只有在處理完一條消息后,才能接收下一條消息

能者多勞

模擬一個消費者處理消息效率低下的情況:

                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        Thread.sleep(1000000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消費者標識:"+consumerTag);
                    System.out.println("交換機名稱:"+envelope.getExchange());
                    System.out.println("路由key:"+envelope.getRoutingKey());
                    System.out.println("消息id:"+envelope.getDeliveryTag());
                    System.out.println("接收消息:"+new String(body));
                    System.out.println("消息接收成功!");
                    channel.basicAck(envelope.getDeliveryTag(), false);//手動提交ACK
                }
            };
            channel.basicConsume(Send.QUEUE_NAME_WORK_QUEUE,false , consumer);

此時RabbitMQ采用輪詢的方式再發(fā)送消息,處于休眠的消費者和另一個消費者會輪流接收到消息,但是處于休眠的消費者處理耗時較長,而另一個消費者處理效率更高,會一直處于空閑狀態(tài)!

此時可以使用basicQos方法和prefetchCount = 1設置。 這告訴RabbitMQ一次不要向工作人員發(fā)送多于一條消息。 或者換句話說,不要向工作人員發(fā)送新消息,直到它處理并確認了前一個消息。 相反,它會將其分派給不是仍然忙碌的下一個工作人員。

Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        Thread.sleep(1000000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消費者標識:"+consumerTag);
                    System.out.println("交換機名稱:"+envelope.getExchange());
                    System.out.println("路由key:"+envelope.getRoutingKey());
                    System.out.println("消息id:"+envelope.getDeliveryTag());
                    System.out.println("接收消息:"+new String(body));
                    System.out.println("消息接收成功!");
                    channel.basicQos(1); //設置最大處理消息數(shù)為1
                    channel.basicAck(envelope.getDeliveryTag(), false);//手動提交ACK
                }
            };

這樣設置后,處于休眠的消費者一直在處理分配給它的一條消息,而另一個消費者會接收到消息,處理完后又會接收到下一條消息,不再處于空閑狀態(tài),從而實現(xiàn)能者多勞!

4. 訂閱模型分類

1582547449987.png

[圖片上傳失敗...(image-b0a2d-1585216601052)]

特點:

①:一個生產者,多個消費者

②:每個消費者偵聽自己的隊列

③:每個隊列綁定在交換機上

④:生產者負責把消息發(fā)送給交換機

⑤:生產者把消息發(fā)送到交換機,通過隊列到達消費者,實現(xiàn)一條消息,被多個消費者消費!

⑤:根據(jù)交換機的不同類型處理消息

Exchange(交換機)只負責轉發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!

4.1 Exchange類型-Fanout:廣播

在廣播的工作模式下:

①:一個生產者和多個消費者

②:生產者與交換機綁定,多個消費者有自己的隊列,隊列與交換機綁定!

③:生產者負責把消息發(fā)送給交換機,交換機負責把消息發(fā)送給具體的隊列,生產者無法決定!

④:交換機把消息發(fā)送給每一個隊列

⑤:每一個消費者都能接收到消息!實現(xiàn)一條消息被多個消費者消費

流程:

聲明Exchange,不再聲明Queue

發(fā)送消息到Exchange,不再發(fā)送到Queue

生產者:

public class Send {
    public static final String EXCHANGE_NAME_FANOUT= "exchange_name_fanout";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String message = "hello rabbitmq";
        //聲明交換機名字和類型
        channel.exchangeDeclare(EXCHANGE_NAME_FANOUT, BuiltinExchangeType.FANOUT);
        //發(fā)布消息 routingKey為"",發(fā)送到每一個與之綁定的隊列!
        channel.basicPublish(EXCHANGE_NAME_FANOUT, "", null, message.getBytes());
        System.out.println("send message successful!");
    }
}

消費者01:

public class Consumer01 {
    public static final String QUEUE_NAME_01 = "quene_name_01";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME_01, false, false, false, null);
        //綁定到交換機
        channel.queueBind(QUEUE_NAME_01, Send.EXCHANGE_NAME_FANOUT, "");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者標識:"+consumerTag);
                System.out.println("交換機名稱:"+envelope.getExchange());
                System.out.println("路由key:"+envelope.getRoutingKey());
                System.out.println("消息id:"+envelope.getDeliveryTag());
                System.out.println("接收消息:"+new String(body));
                System.out.println("消息接收成功!");
                channel.basicQos(1); //設置最大處理消息數(shù)為1
                channel.basicAck(envelope.getDeliveryTag(), false);//手動提交ACK
            }
        };
        channel.basicConsume(QUEUE_NAME_01,false,consumer );
    }
}

消費者02:

public class Consumer02 {
    public static final String QUEUE_NAME_02 = "quene_name_02";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME_02, false, false, false, null);
        //綁定到交換機
        channel.queueBind(QUEUE_NAME_02, Send.EXCHANGE_NAME_FANOUT, "");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者標識:"+consumerTag);
                System.out.println("交換機名稱:"+envelope.getExchange());
                System.out.println("路由key:"+envelope.getRoutingKey());
                System.out.println("消息id:"+envelope.getDeliveryTag());
                System.out.println("接收消息:"+new String(body));
                System.out.println("消息接收成功!");
                channel.basicQos(1); //設置最大處理消息數(shù)為1
                channel.basicAck(envelope.getDeliveryTag(), false);//手動提交ACK
            }
        };
        channel.basicConsume(QUEUE_NAME_02,false,consumer );
    }
}

生產者發(fā)送消息到交換機,交換機根據(jù)routingKey匹配隊列,這里沒有指定routingKey,交換機會將消息發(fā)送到每一個隊列,消費者01和消費者02都能收到消息!

使用場景:

群發(fā)信息

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容