消息中間件踩坑之旅(四)——RabbitMq運行流程及多點注意

這里博主推薦大家閱讀由朱忠華先生編寫的<<RabbitMq實戰(zhàn)指南>>,這里有詳細(xì)的客戶端開發(fā)接口的說明,例如com.rabbotmq.client包的使用

運行流程

生產(chǎn)者
  1. 生產(chǎn)者連接到RabbitMq Broker,建立一個連接即程序里的Connection,開啟一個信道

    ConnectionFactory factory = new ConnectionFactory();
    // "guest"/"guest" by default, limited to localhost connections
    factory.setUsername(userName);
    factory.setPassword(password);
    factory.setVirtualHost(virtualHost);
    factory.setHost(hostName);
    factory.setPort(portNumber);
    
    Connection conn = factory.newConnection();
    
    Channel channel = conn.createChannel();
    
  2. 生產(chǎn)者聲明一個交換器,并設(shè)置相關(guān)屬性,這里聲明交換器的類型,是否持久化等,很重要哦,可以自行查看api(交換器也可以轉(zhuǎn)發(fā)消息給別的交換器)

    channel.exchangeDeclare(exchangeName, "direct", true);
    
  1. 生產(chǎn)者聲明一個隊列并設(shè)置屬性,比如排他性,是否持久化,是否過期刪除等

    String queueName = channel.queueDeclare().getQueue();
    or
    channel.queueBind(queueName, exchangeName, routingKey);
    
  2. 生產(chǎn)者通過路由鍵將交換器和隊列進(jìn)行綁定起來

    channel.queueBind(queueName, exchangeName, routingKey)
    
  3. 生產(chǎn)者發(fā)送信息到RabbitMq Broker,其中包含路由鍵、交換器信息

    byte[] messageBodyBytes = "Hello, world!".getBytes();
    channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
    
  4. 對應(yīng)交換器根據(jù)收到的路由鍵匹配發(fā)送的隊列`(重點type: topic、direct)

  5. 找到后,把消息存入隊列

  6. 找不到,會退給生產(chǎn)者

  7. 關(guān)閉信道,關(guān)閉連接

    channel.close();
    conn.close();
    
消費者
  1. 消費者連接到RabbitMq Broker,建立一個連接即程序里的Connection,開啟一個信道

  2. 消費者向?qū)?yīng)隊列請求消費消息,

  3. 等待RabbitMq Broker回應(yīng)并投遞相應(yīng)的隊列消息給消費者,消費者然后進(jìn)行回調(diào)處理(回調(diào)在輪詢或者是阻塞里)

  4. 消費者完成回調(diào),確認(rèn)消息已經(jīng)被成功消費(這里確認(rèn)消息可以是自動應(yīng)答,也可手動,建議是手動)

    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "myConsumerTag",
         new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag,
                                        Envelope envelope,
                                        AMQP.BasicProperties properties,
                                        byte[] body)
                 throws IOException
             {
                 String routingKey = envelope.getRoutingKey();
                 String contentType = properties.getContentType();
                 long deliveryTag = envelope.getDeliveryTag();
                 //。。。。。
                 // 手動應(yīng)答 acknowledge receipt of the message
                 //Delivery Tag 用來標(biāo)識信道中投遞的消息
                 //第二個參數(shù)是requeue 如果是true表示通知RabbitMq把這個消息重新存入隊列并發(fā)送給下一個消費者,反之刪除
                 channel.basicAck(deliveryTag, false);
             }
         });
    
  5. RabbitMq 刪除對應(yīng)已經(jīng)被消費者確認(rèn)消費的消息

  6. 關(guān)閉信道

  7. 關(guān)閉連接

重點

  • 以上過程不是絕對的,已經(jīng)表明
  • Connection連接就是一條TCP連接,在實際項目中不要多次創(chuàng)建Connection,他是可以復(fù)用的,類似NIO。因為建立TCP連接開銷很大。
  • Channel信道是建立在Connection上虛擬連接,RabbitMq處理的每條AMQP指令都是通過信道完成。
  • 每個線程把持一個信道,所以信道復(fù)用了Connection的TCP連接。同時RabbitMq確保了每個線程的私密性,就像獨立的連接一樣。
  • Channel不建議在線程間貢獻(xiàn),不然很酸爽
  • Connection可以復(fù)用不代表項目中只能創(chuàng)建一個,可以根據(jù)實際情況創(chuàng)建多個Connection,并讓信道均攤到這些Connection上。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,208評論 2 11
  • 什么叫消息隊列? 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單,比如只包含文本字符串,也可以更復(fù)...
    Agile_dev閱讀 2,438評論 0 24
  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html,并沒有及時更新。 術(shù)語對...
    joyenlee閱讀 7,806評論 0 3
  • 本章我們重點學(xué)習(xí)一下Rabbit里面的exchange(交換器)的知識。 交換器分類 RabbitMQ的Excha...
    Java大生閱讀 414評論 0 1
  • “帶上英語單詞本出去遛嗎?”每次陪菇?jīng)龀鋈デ拔視绽嵝?。“不想帶,今天好累,吃得又有點兒多。”嗯,不想帶就不帶。...
    安之若素911閱讀 354評論 2 3

友情鏈接更多精彩內(nèi)容