[用官方文檔學(xué)習(xí)RabbitMQ]5.RabbitMQ的Topic Exchange

在前面的教程里,我們改進(jìn)了日志系統(tǒng)。我們用direct交換類(lèi)型代替了fanout交換類(lèi)型,并實(shí)現(xiàn)了可以有選擇性的接收日志。

雖然使用direct類(lèi)型成功的改進(jìn)了我們的系統(tǒng),但是它仍然有一定的局限性——它不能夠基于多個(gè)標(biāo)準(zhǔn)進(jìn)行路由。

在我們的日志系統(tǒng)中,我們可能希望自己不僅僅基于嚴(yán)重性去訂閱日志,我們還應(yīng)該關(guān)注發(fā)出日志的源??赡茉趕yslog unix工具上可以了解到這個(gè)概念,這個(gè)工具根據(jù)“嚴(yán)重性”(info/warn/crit...)和“設(shè)施”(auth/cron/kern..)來(lái)路由(routing)日志。(routing這個(gè)詞我實(shí)在不知道咋翻譯了...)

這將給我們帶來(lái)靈活性,我們想要收到來(lái)自"cron"的錯(cuò)誤,也想要收到來(lái)自"kern"的所有日志。要在我們的日志系統(tǒng)中實(shí)現(xiàn)這一點(diǎn),我們需要學(xué)習(xí)一個(gè)更復(fù)雜的——topic exchange

Topic Exchange

發(fā)送到Topic Exchange的消息,不能是任意的路由鍵。他必須遵守一些規(guī)則:由點(diǎn)分割的單詞列表。單詞可以是任意的東西,但通常他們是與消息有關(guān)的特性。

路由鍵比如:"stock.usd.nyse"、"nyse.vmw"、"quick.orange.rabbit".在路由鍵中可以有很多單詞,最多可以達(dá)到255個(gè)字節(jié)。

綁定鍵也必須使用相同的方式,topic exchange背后的邏輯類(lèi)似于direct exchange——一條帶有特定路由鍵的消息將會(huì)被發(fā)送到能夠匹配綁定鍵的所有隊(duì)列!還有,綁定鍵有兩個(gè)重要的特殊情況:

  • “*”可以代替一個(gè)詞。
  • “#”可以代替0個(gè)或者多個(gè)單詞。

用圖解釋一下:


topic exchange

在這個(gè)例子中,我們將會(huì)發(fā)送所有描述東西的信息。消息將會(huì)發(fā)送到一個(gè)由三個(gè)單詞組成的路由鍵。路由鍵第一個(gè)詞描述“速度”,第二個(gè)詞描述“顏色”,第三個(gè)詞描述“物種”。

兩個(gè)隊(duì)列,我們創(chuàng)建了三個(gè)綁定:

  • Q1綁定".orange."
  • Q2綁定" * . * .rabbit" 和 "lazy.#"

這些綁定概括成:

  • Q1 對(duì)所有橙色(orange)動(dòng)物感興趣
  • Q2 想要聽(tīng)到兔子(rabbit)的一切,也想聽(tīng)到懶惰(lazy)動(dòng)物的一切~

如果一條消息的路由鍵為"quick.orange.rabbit",他將被傳到兩個(gè)隊(duì)列中去。
如果消息路由鍵是"lazy.orange.elephant",也會(huì)被傳遞到兩個(gè)隊(duì)列中去。
如果消息路由鍵是"quick.orange.fox"將會(huì)只被發(fā)送到Q1隊(duì)列
如果消息路由鍵是"lazy.brown.fox"將會(huì)只被發(fā)送到Q2隊(duì)列
如果消息路由鍵是"lazy.pink.rabbit"它將只被發(fā)送到Q2隊(duì)列,而且只有一次!
如果消息路由鍵是"quick.brown.fox"它沒(méi)有匹配任何的綁定,所以他會(huì)被丟棄。
如果我們違反了設(shè)置,用一個(gè)單詞或者四個(gè)單詞發(fā)送一條信息。
比如“orange”或”quick.orange.male.rabbit“會(huì)怎么樣???
答案是,這些消息不匹配任何綁定,將會(huì)丟失。
但是!??!如果是“l(fā)azy.orange.male.rabbit”,盡管他是四個(gè)單詞,但是它和Q2的綁定匹配。所以它將會(huì)被傳送到第二個(gè)隊(duì)列。

Topic exchange 小知識(shí)

Topic exchange很好很強(qiáng)大,行為可以表現(xiàn)的和其他的交換類(lèi)型一樣,也可以有自己的擴(kuò)展。
當(dāng)隊(duì)列與"#"綁定鍵綁定的時(shí)候,它將會(huì)化身為fanout類(lèi)型,接受所有消息。我們就可以代替fanout類(lèi)型進(jìn)行使用了。
當(dāng)特殊字符"*"和"#"沒(méi)有在綁定中使用的話,那么topic exchange就會(huì)化身為direct exchange。
嗯~~~topic完成雙殺!

完整代碼

我們將在我們的日志系統(tǒng)里使用一個(gè)topic exchange。我們將會(huì)從一個(gè)工作的假設(shè)開(kāi)始,也就是日志的路由鍵會(huì)有兩個(gè)詞:"<設(shè)施>"."<嚴(yán)重性>"
還有代碼幾乎和前一期一模一樣!所以注意點(diǎn)小改動(dòng)啦~
EmitLogTopic.java

public class EmitLogTopic {

    //設(shè)置交換器的名字
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args){
        Connection connection = null;
        Channel channel = null;
        try {
        //獲取連接
        connection = ConnectionUtil.getConnection();
        //創(chuàng)建通道
        channel = connection.createChannel();
        //聲明交換器,給它名字,設(shè)置交換類(lèi)型為direct
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        //代碼里手動(dòng)設(shè)置路由鍵——RoutingKey
        String routingKey = getRouting(args);
        //待傳遞的消息內(nèi)容
        String message = getMessage(args);
        //發(fā)送消息
        channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
        System.out.println("[x] Sent '"+routingKey+"':'"+message+"'");

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //關(guān)閉連接
            if (connection!=null){
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static String getRouting(String[] strings) {
        if (strings.length<1){
            return "anonymous.info";
        }
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length<2){
            return "hello world";
        }
        return joinStrings(strings," ",1);
    }

    private static String joinStrings(String[] strings, String delimiter,int startIndex) {
        int length = strings.length;
        if (length == 0){
            return "";
        }
        if (length<startIndex){
            return "";
        }
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex+1 ; i < length; i++){
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

ReceiveLogsTopic.java

public class ReceiveLogsTopic {

    //設(shè)置交換器的名字
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException {
        Connection connection = null;
        Channel channel = null;
        //獲取連接
        connection = ConnectionUtil.getConnection();
        //創(chuàng)建通道
        channel = connection.createChannel();
        //聲明交換器,給它名字,設(shè)置交換類(lèi)型為topic
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //得到隊(duì)列的名字
        String queueName = channel.queueDeclare().getQueue();
        //截一下輸入錯(cuò)誤的情況
        if (args.length<1){
            System.err.println("Usage: ReceiveLogsTopic [binding_key]");
            System.exit(1);
        }
        String wholeSeverity = "";
        //根據(jù)輸入,確定程序想要收集的"RoutingKey",進(jìn)行綁定。
        for (String bindingKey : args){
            channel.queueBind(queueName,EXCHANGE_NAME,bindingKey);
            wholeSeverity = wholeSeverity + " " + bindingKey;
        }
        System.out.println("[*] Waiting for"+wholeSeverity+" message.To exit press CTRL+C");
        //消費(fèi)者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("[x] Received'"+envelope.getRoutingKey()+"':'"+message+"'");
            }
        };
        channel.basicConsume(queueName,true,consumer);
    }
}

這個(gè)太過(guò)于靈活,想給大家展示一下結(jié)果,但是圖太多了。。。所以給大家貼一下官方的圖,結(jié)合我前面幾期寫(xiě)的就能得到結(jié)果!請(qǐng)大家請(qǐng)自行測(cè)試?(? ? ??)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,688評(píng)論 19 139
  • 內(nèi)容來(lái)自:RabbitMQ Tutorials Java版 Topics 在上一個(gè)教程中我們改進(jìn)了我們的日志系統(tǒng):...
    maxwellyue閱讀 2,918評(píng)論 1 2
  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,514評(píng)論 2 34
  • 主題(topic) 在上一個(gè)教程中我們改善了我們的日志系統(tǒng)。我們使用direct類(lèi)型的exchange,可以選擇性...
    roach閱讀 2,787評(píng)論 0 1
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開(kāi)發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,240評(píng)論 3 51

友情鏈接更多精彩內(nèi)容