RabbitMQ Exchange類型

image.png

藍(lán)色的框:指的是生產(chǎn)者將消息投遞到EXchange上,然后根據(jù)routingkey路由到指定隊(duì)列上
綠色框:消費(fèi)者監(jiān)聽隊(duì)列,然后接受消息。
黃色框:消息到達(dá)了exchange是路由到哪個(gè)隊(duì)列,要根據(jù)routingkey而定。

下面講解Exchange
Exchange的屬性:(大致有個(gè)印象就OK,繼續(xù)往下看)
1.Name:交換機(jī)名稱;
2.Type:交換機(jī)類型 direct,topic,fanout,headers;
3.Durability:是否需要持久化,true為持久化,代表交換機(jī)在服務(wù)器重啟后是否還存在;
4.Auto Delete :當(dāng)最后一個(gè)綁定到exchange上的隊(duì)列刪除后,自動(dòng)刪除該exchange.
5.Internal:當(dāng)前的exchange是否用于rabbitmq內(nèi)部使用,默認(rèn)為false.
6.Arguments:擴(kuò)展參數(shù),用于擴(kuò)展AMQP協(xié)議自制化。

Exchange類型以及講解

1.Direct exchange

image.png
image.png

生產(chǎn)者代碼:

public class Producer4DirectExchange {

    
    public static void main(String[] args) throws Exception {
        
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 創(chuàng)建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創(chuàng)建Channel
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
        //5 發(fā)送
        
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());         
        
    }
    
}

消費(fèi)者的代碼:
此時(shí)需要channel發(fā)送消息指定的routingkey和綁定exchange和隊(duì)列時(shí)候的routingkey相同,直接路由到這些隊(duì)列上。
如果不指定exchangeType,那么就是default Exchange,此時(shí)不需要將隊(duì)列綁定到exchange.但是Routekey需要完全匹配。

public class Consumer4DirectExchange {

    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        
        //表示聲明了一個(gè)交換機(jī)
        //參數(shù)說明:Exchange的屬性:
        //1.Name:交換機(jī)名稱;
        //2.Type:交換機(jī)類型 direct,topic,fanout,headers;
        //3.Durability:是否需要持久化,true為持久化;
        //4.Auto Delete :當(dāng)最后一個(gè)綁定到exchange上的隊(duì)列刪除后,自動(dòng)刪除該exchange.
        //5.Internal:當(dāng)前的exchange是否用于rabbitmq內(nèi)部使用,默認(rèn)為false.
        //Arguments:擴(kuò)展參數(shù),用于擴(kuò)展AMQP協(xié)議自制化
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示聲明了一個(gè)隊(duì)列
        
        //參數(shù)說明:
        //1.消息隊(duì)列:實(shí)際存儲(chǔ)消息數(shù)據(jù)
        //2.Durability:是否持久化,3.auto_delete:如果選yes,代表最后一個(gè)監(jiān)聽被移除之后,該隊(duì)列會(huì)自動(dòng)被刪除。
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一個(gè)綁定關(guān)系:
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數(shù):隊(duì)列名稱、是否自動(dòng)ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循環(huán)獲取消息  
        while(true){  
            //獲取消息,如果沒有消息,這一步將會(huì)一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}

2.topic exchange

此方式是算是routingkey的通配符匹配模式,兩張圖片說明問題

符號(hào)“#” 匹配一個(gè)或多個(gè)詞
符號(hào)“”匹配不多不少一個(gè)詞
例如:“l(fā)og.#”能夠匹配到“l(fā)og.info.oa”
"log.
"只會(huì)匹配到“l(fā)og.erro”
如下圖

image.png

生產(chǎn)者代碼(用了三種routingkey發(fā)送)

public class Producer4TopicExchange {

    
    public static void main(String[] args) throws Exception {
        
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 創(chuàng)建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創(chuàng)建Channel
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 發(fā)送
        
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());    
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
        channel.close();  
        connection.close();  
    }
    
}

消費(fèi)者代碼:

public class Consumer4TopicExchange {

    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        //String routingKey = "user.*";
        String routingKey = "user.*";
        // 1 聲明交換機(jī) 
        channel.exchangeDeclare(exchangeName, exch angeType, true, false, false, null);
        // 2 聲明隊(duì)列
        channel.queueDeclare(queueName, false, false, false, null);
        // 3 建立交換機(jī)和隊(duì)列的綁定關(guān)系:
        //綁定關(guān)系中指定routingkey
        channel.queueBind(queueName, exchangeName, routingKey);
         
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數(shù):隊(duì)列名稱、是否自動(dòng)ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循環(huán)獲取消息  
        while(true){  
            //獲取消息,如果沒有消息,這一步將會(huì)一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}

3.Fanout Exchange(不需要routingkey,只需要綁定)

image.png

特點(diǎn)是:
1.不處理路由鍵,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上
2.發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上
3.Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的

生產(chǎn)者代碼:

public class Producer4FanoutExchange {

    
    public static void main(String[] args) throws Exception {
        
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 創(chuàng)建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創(chuàng)建Channel
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_fanout_exchange";
        //5 發(fā)送
        for(int i = 0; i < 10; i ++) {
            String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
            channel.basicPublish(exchangeName, "", null , msg.getBytes());          
        }
        channel.close();  
        connection.close();  
    }
    
}

消費(fèi)者代碼:

public class Consumer4FanoutExchange {

    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = ""; //不設(shè)置路由鍵
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數(shù):隊(duì)列名稱、是否自動(dòng)ACK、Consumer
        channel.basicConsume(queueName, true, consumer); 
        //循環(huán)獲取消息  
        while(true){  
            //獲取消息,如果沒有消息,這一步將會(huì)一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容