RabbitMQ基礎(chǔ)概念和使用

基本概念

Amqp概念

amqp,既Advanced Message Queuing Protocol ,一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議

包括的要素

信道:多個(gè)線程間通過一個(gè) tcp鏈接與服務(wù)端通訊,每個(gè)線程使用一個(gè)信道通訊,保證

交換器-隊(duì)列-綁定-路由鍵

  • 路由鍵是發(fā)送者指定由交換機(jī)和隊(duì)列的綁定
  • 生產(chǎn)者發(fā)送消息到交換機(jī)
  • 消費(fèi)者綁定隊(duì)列
  • 到達(dá)了無人訂閱的隊(duì)列,消息會(huì)一直排隊(duì)等待
  • 一個(gè)隊(duì)列有多個(gè)訂閱者---消息會(huì)循環(huán)方式 以此發(fā)個(gè)這幾個(gè)消費(fèi)者
  • 發(fā)送者發(fā)送一個(gè)不存在的路由鍵--消息會(huì)丟失
clipboard.png

消息的確認(rèn)

消費(fèi)者收到的每一條消息都必須進(jìn)行確認(rèn)(手動(dòng)或者自動(dòng))

  • 消費(fèi)者遲遲不確認(rèn),rabbitMQ 會(huì)一直會(huì)保持這個(gè)消息,直到鏈接的斷開,會(huì)將消息投遞到另一個(gè)消費(fèi)者(前提是有多個(gè)消費(fèi)者)

topic模式

可以使通配符 通過交換機(jī)&路由鍵是消息到多個(gè)隊(duì)列中去

clipboard.png

虛擬主機(jī)

/service1

/service2

多個(gè)應(yīng)用分區(qū),類似oracle - 表空間

每個(gè)用戶名只能連自己的虛擬主機(jī)

多個(gè)應(yīng)用時(shí)可以很好地做服務(wù)隔離


基礎(chǔ)使用

使用RabbitMQ原生Java客戶端進(jìn)行消息通信

客戶端需要amqp-client-5.0.0.jar和slf4j-api-1.6.1.jar
建議使用Maven:
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.0.0</version>
</dependency>
注意:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具體的版本號(hào)到Maven的中央倉庫查)的版本。

使用RabbitMQ原生Java客戶端進(jìn)行消息通信

Direct Exchange示例

簡單形式的 生產(chǎn)者-消費(fèi)者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *direct類型交換器的生產(chǎn)者
 */
public class DirectProducer {

    public final static String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args)
            throws IOException, TimeoutException {
        /* 創(chuàng)建連接,連接到RabbitMQ*/
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();

        /*創(chuàng)建信道*/
        Channel channel = connection.createChannel();
        /*創(chuàng)建交換器*/
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);

        /*日志消息級(jí)別,作為路由鍵使用*/
        String[] serverities = {"error","info","warning"};
        for(int i=0;i<3;i++){
            String severity = serverities[i%3];
            String msg = "Hellol,RabbitMq"+(i+1);
            /*發(fā)布消息,需要參數(shù):交換器,路由鍵,其中以日志消息級(jí)別為路由鍵*/
            channel.basicPublish(EXCHANGE_NAME,severity,null,
                    msg.getBytes());
            System.out.println("Sent "+severity+":"+msg);
        }
        channel.close();
        connection.close();

    }

}

普通消費(fèi)者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *普通的消費(fèi)者
 */
public class NormalConsumer {

    public static void main(String[] argv)
            throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");

        // 打開連接和創(chuàng)建頻道,與發(fā)送端一樣
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                "direct");

        /*聲明一個(gè)隊(duì)列*/
        String queueName = "focuserror";
        channel.queueDeclare(queueName,false,false,
                false,null);

        /*綁定,將隊(duì)列和交換器通過路由鍵進(jìn)行綁定*/
        String routekey = "info";/*表示只關(guān)注error級(jí)別的日志消息*/
        channel.queueBind(queueName,DirectProducer.EXCHANGE_NAME,routekey);

        System.out.println("waiting for message........");

        /*聲明了一個(gè)消費(fèi)者*/
        //envelope  信封 可以獲取路由鍵,隊(duì)列名 等信息
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received["+envelope.getRoutingKey()
                        +"]"+message);
            }
        };
        /*消費(fèi)者正式開始在指定隊(duì)列上消費(fèi)消息*/
        channel.basicConsume(queueName,true,consumer);
    }

}

消費(fèi)者綁定多個(gè)路由鍵

結(jié)果:這個(gè)消費(fèi)者 會(huì)收到多個(gè)隊(duì)列的消息

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *隊(duì)列和交換器的多重綁定
 */
public class MulitBindConsumer {

    public static void main(String[] argv) throws IOException,
            InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");

        // 打開連接和創(chuàng)建頻道,與發(fā)送端一樣
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                "direct");

        //聲明一個(gè)隨機(jī)隊(duì)列
        String queueName = channel.queueDeclare().getQueue();

        /*隊(duì)列綁定到交換器上時(shí),是允許綁定多個(gè)路由鍵的,也就是多重綁定*/
        String[] severities={"error","info","warning"};
        for(String serverity:severities){
            channel.queueBind(queueName,DirectProducer.EXCHANGE_NAME,
                    serverity);
        }
        System.out.println(" [*] Waiting for messages:");

        // 創(chuàng)建隊(duì)列消費(fèi)者
        final Consumer consumerA = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties
                                               properties,
                                       byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" Received "
                        + envelope.getRoutingKey() + ":'" + message
                        + "'");
            }
        };
        channel.basicConsume(queueName, true, consumerA);
    }
}

一個(gè)連接多個(gè)信道

結(jié)果:每個(gè)消費(fèi)者 都會(huì)收到所有的消息

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *一個(gè)連接多個(gè)信道
 */
public class MulitChannelConsumer {

    private static class ConsumerWorker implements Runnable{
        final Connection connection;

        public ConsumerWorker(Connection connection) {
            this.connection = connection;
        }

        public void run() {
            try {
                /*創(chuàng)建一個(gè)信道,意味著每個(gè)線程單獨(dú)一個(gè)信道*/
                final Channel channel = connection.createChannel();
                channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                        "direct");
                // 聲明一個(gè)隨機(jī)隊(duì)列
                String queueName = channel.queueDeclare().getQueue();
                //消費(fèi)者名字,打印輸出用
                final String consumerName
                        =  Thread.currentThread().getName()+"-all";

                //所有日志嚴(yán)重性級(jí)別
                String[] severities={"error","info","warning"};
                for (String severity : severities) {
                    //關(guān)注所有級(jí)別的日志(多重綁定)
                    channel.queueBind(queueName,
                            DirectProducer.EXCHANGE_NAME, severity);
                }
                System.out.println("["+consumerName+"] Waiting for messages:");

                // 創(chuàng)建隊(duì)列消費(fèi)者
                final Consumer consumerA = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties
                                                       properties,
                                               byte[] body)
                            throws IOException {
                        String message =
                                new String(body, "UTF-8");
                        System.out.println(consumerName
                                +" Received "  + envelope.getRoutingKey()
                                + ":'" + message + "'");
                    }
                };
                channel.basicConsume(queueName, true, consumerA);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] argv) throws IOException,
            InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");

        // 打開連接和創(chuàng)建頻道,與發(fā)送端一樣
        Connection connection = factory.newConnection();
        //一個(gè)連接多個(gè)信道
        for(int i=0;i<2;i++){
            /*將連接作為參數(shù),傳遞給每個(gè)線程*/
            Thread worker =new Thread(new ConsumerWorker(connection));
            worker.start();
        }
    }
}

一個(gè)隊(duì)列多個(gè)消費(fèi)者,則會(huì)表現(xiàn)出消息在消費(fèi)者之間的輪詢發(fā)送。

消費(fèi)者會(huì)輪詢收到消息

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *一個(gè)隊(duì)列多個(gè)消費(fèi)者,則會(huì)表現(xiàn)出消息在消費(fèi)者之間的輪詢發(fā)送。
 */
public class MulitConsumerOneQueue {

    private static class ConsumerWorker implements Runnable{
        final Connection connection;
        final String queueName;

        public ConsumerWorker(Connection connection,String queueName) {
            this.connection = connection;
            this.queueName = queueName;
        }

        public void run() {
            try {
                /*創(chuàng)建一個(gè)信道,意味著每個(gè)線程單獨(dú)一個(gè)信道*/
                final Channel channel = connection.createChannel();
                channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
                        "direct");
                /*聲明一個(gè)隊(duì)列,rabbitmq,如果隊(duì)列已存在,不會(huì)重復(fù)創(chuàng)建*/
                channel.queueDeclare(queueName,
                        false,false,
                        false,null);
                //消費(fèi)者名字,打印輸出用
                final String consumerName
                        =  Thread.currentThread().getName();

                //所有日志嚴(yán)重性級(jí)別
                String[] severities={"error","info","warning"};
                for (String severity : severities) {
                    //關(guān)注所有級(jí)別的日志(多重綁定)
                    channel.queueBind(queueName,
                            DirectProducer.EXCHANGE_NAME, severity);
                }
                System.out.println(" ["+consumerName+"] Waiting for messages:");

                // 創(chuàng)建隊(duì)列消費(fèi)者
                final Consumer consumerA = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties
                                                       properties,
                                               byte[] body)
                            throws IOException {
                        String message =
                                new String(body, "UTF-8");
                        System.out.println(consumerName
                                +" Received "  + envelope.getRoutingKey()
                                + ":'" + message + "'");
                    }
                };
                channel.basicConsume(queueName, true, consumerA);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] argv) throws IOException,
            InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");

        // 打開連接和創(chuàng)建頻道,與發(fā)送端一樣
        Connection connection = factory.newConnection();

        //3個(gè)線程,線程之間共享隊(duì)列,一個(gè)隊(duì)列多個(gè)消費(fèi)者
        String queueName = "focusAll";
        for(int i=0;i<3;i++){
            /*將隊(duì)列名作為參數(shù),傳遞給每個(gè)線程*/
            Thread worker =new Thread(new ConsumerWorker(connection,queueName));
            worker.start();
        }

    }
}

個(gè)人理解:信道只是客戶端與服務(wù)端建立連接 ,消息消費(fèi)時(shí) 是多個(gè)消費(fèi)者輪詢收到消息 還是 每個(gè)消費(fèi)者收到全部消息 取決于這些消費(fèi)者是否監(jiān)聽同一個(gè)隊(duì)列
模式:
一個(gè)連接多個(gè)信道 -- 實(shí)際是 每個(gè)信道中有一個(gè)隨機(jī)產(chǎn)生的隊(duì)列名,此時(shí)是多個(gè)隊(duì)列,是不同的隊(duì)列,每個(gè)隊(duì)列 有一個(gè)消費(fèi)者 那么這個(gè)消費(fèi)者就會(huì)收到這個(gè)隊(duì)列里的所有消息;在上一層 每個(gè)信道使用(邦定)的是同一個(gè)路由鍵(就是生產(chǎn)者發(fā)布的路由鍵),所以此時(shí)每個(gè)隊(duì)列都會(huì)收到生產(chǎn)者發(fā)布的所有消息,進(jìn)而每個(gè)消費(fèi)者就可以收到所有消息
一個(gè)隊(duì)列多個(gè)消費(fèi)者,則會(huì)表現(xiàn)出消息在消費(fèi)者之間的輪詢發(fā)送 -- 實(shí)際上是 多個(gè)信道使用(邦定)的同一個(gè)路由鍵,而這些信道使用的是同一個(gè)隊(duì)列,消息會(huì)發(fā)布到這個(gè)隊(duì)列中(此時(shí)只有這一個(gè)隊(duì)列),所以多個(gè)消費(fèi)者監(jiān)聽的是同一個(gè)隊(duì)列,此時(shí)消息會(huì)被這些消費(fèi)者(輪詢,分發(fā))收到

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,198評(píng)論 2 11
  • 什么叫消息隊(duì)列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單,比如只包含文本字符串,也可以更復(fù)雜...
    lijun_m閱讀 1,415評(píng)論 0 1
  • RabbitMQ優(yōu)異性 RabbitMQ發(fā)展到今天,被越來越多的人認(rèn)可,這和它在易用性、擴(kuò)展性、可靠性和高可用性等...
    查無此人_chazz閱讀 663評(píng)論 0 0
  • 這個(gè)指導(dǎo)提供一個(gè)AMQP 0-9-1協(xié)議的概述,它是RabbitMq支持的一個(gè)協(xié)議。 什么是AMQP 0-9-1?...
    浪_6e80閱讀 815評(píng)論 0 1
  • 好詞:破譯,救濟(jì)苦難,無緣無故,如醉如癡,中影,左顧右盼,毫不足鬼。 好句:他周身包圍著一層極薄的牧,這是天生的,...
    朋吧閱讀 588評(píng)論 0 0

友情鏈接更多精彩內(nèi)容