RabbitMQ必備核心知識(精選)

現(xiàn)在很多知名的互聯(lián)網(wǎng)公司都有用到RabbitMQ,其性能,可擴展性讓很多大公司青睞于使用它,不過想要完全使用好RabbitMQ需要掌握其核心的一些概念,這里就說說掌握RabbitMQ所需的必要知識

生產(chǎn)者與消費者

生產(chǎn)者: 創(chuàng)建消息,然后發(fā)送到代理服務(wù)器(RabbitMQ)的程序

消費者:連接到代理服務(wù)器,并訂閱到隊列上接收消息

消息流程

AMQP協(xié)議規(guī)定,AMQP消息必須有三部分,交換機,隊列和綁定。生產(chǎn)者把消息發(fā)送到交換機,交換機與隊列的綁定關(guān)系決定了消息如何路由到特定的隊列,最終被消費者接收。

Note: 消息是不能直接到達隊列(Queue)的

交換機

消息實際上投遞到的是交換機,具體路由到那個隊列由交換機根據(jù)路由鍵(routing key)完成。

當你發(fā)消息到代理服務(wù)器時,即便路由鍵是空的,RabbitMQ也會將其和使用的路由鍵進行匹配。如果路由的消息不匹配任何綁定模式,消息將會進入黑洞。

交換機在隊列與消息中間起到了中間層的作用,有了交換機我們可以實現(xiàn)更靈活的功能,RabbitMQ中有三種常用的交換機類型:

direct: 如果路由鍵匹配,消息就投遞到對應的隊列

fanout:投遞消息給所有綁定在當前交換機上面的隊列

topic:允許實現(xiàn)有趣的消息通信場景,使得5不同源頭的消息能夠達到同一個隊列。topic隊列名稱有兩個特殊的關(guān)鍵字。

* 可以替換一個單詞

# 可以替換所有的單詞

可以理解,direct為1v1, fanout為1v所有,topic比較靈活,可以1v任意。

虛擬主機

每一個虛擬主機(vhost)相當于mini版的RabbitMQ服務(wù)器,擁有自己的隊列,交換機和綁定,權(quán)限… 這使得一個RabbitMQ服務(wù)眾多的應用程序,而不會互相沖突。

rabbitMQ默認的虛擬主機為: “/” ,一般我們在創(chuàng)建Rabbit的用戶時會再給用戶分配一個虛擬主機。

操作虛擬主機,除了命令行之外還有一個web管理頁面

#創(chuàng)建虛擬主機

rabbitmqctl add vhost [vhost_name]

#刪除虛擬主機

rabbitmqctl delete vhost [vhost_name]

#列出虛擬主機

rabbitmqctl list_vhosts

消息投遞策略

默認情況下RabbitMQ的隊列和交換機在RabbitMQ服務(wù)器重啟之后會消失,原因在于隊列和交換機的durable屬性,該屬性默認情況下為false.

能從AMQP服務(wù)器崩潰中恢復的消息稱為持久化消息,如果想要從崩潰中恢復那么消息必須

投遞模式設(shè)置2,來標記消息為持久化

發(fā)送到持久化的交換機

到到持久化的隊列

缺點:消息寫入磁盤性能差很多。除非特別關(guān)鍵的消息會使用

關(guān)鍵API

以上都是概念性的內(nèi)容,實際我們還是要通過編程來實現(xiàn)我們的目的,RabbitMQ的客戶端api提供了很多功能,通過看代碼,來了解它的強大之處。

基本步驟之前的RabbitMQ快速入門已經(jīng)提過了,Channel類是關(guān)鍵的部分:包含了很多我們想要的功能

消息確認

生成端可以添加監(jiān)聽事件:

channel.addConfirmListener(new ConfirmListener() {

@Override

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

System.err.println("-------no ack!-----------");

}

@Override

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

System.err.println("-------ack!-----------");

}

});

消費端可以確認消息狀態(tài):

public class MyConsumer extends DefaultConsumer {

private Channel channel ;

public MyConsumer(Channel channel) {

super(channel);

this.channel = channel;

}

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.err.println("-----------consume message----------");

System.err.println("body: " + new String(body));

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

if((Integer)properties.getHeaders().get("num") == 0) {

channel.basicNack(envelope.getDeliveryTag(), false, true);

} else {

channel.basicAck(envelope.getDeliveryTag(), false);

}

}

}

channel.basicAck與basicNack最后一個參數(shù)指定消息是否重回隊列。

監(jiān)聽不可達消息

我們的消息生產(chǎn)者通過指定交換機和路由鍵來把消息送到隊列中,但有時候指定的路由鍵不存在,或者交換機不存在,那么消息就會return,我們可以通過添加return listener來實現(xiàn):

channel.addReturnListener(new ReturnListener() {

@Override

public void handleReturn(int replyCode, String replyText, String exchange,

String routingKey, BasicProperties properties, byte[] body) throws IOException {

System.err.println("---------handle return----------");

System.err.println("replyCode: " + replyCode);

System.err.println("replyText: " + replyText);

System.err.println("exchange: " + exchange);

System.err.println("routingKey: " + routingKey);

System.err.println("properties: " + properties);

System.err.println("body: " + new String(body));

}

});

channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());

在basicPublish中的Mandatory要設(shè)置為true才會生效,否則broker會刪除該消息

消費端限流

假設(shè)MQ服務(wù)器上面囤積了成千上萬條的消息的時候,這個時候突然連接消費端,那么巨量的消息全部推過來,但是客戶端無法一次性處理這么多的數(shù)據(jù)。

在高并發(fā)的時候,瞬間產(chǎn)生的流量很大,消息很大,而MQ有個重要的作用就是限流,限流則是消費端做的。

RabbitMQ提供了一種Qos(服務(wù)質(zhì)量保證)功能,即在非自動確認消息的前提下,在一定數(shù)量的消息未被消費前,不進行消費新的消息。

// prefetchSize消息的限制大小,一般設(shè)置為0,在生產(chǎn)端限制

// prefetchCount 我們一次最多消費多少條消息,一般設(shè)置為1

// global,一般設(shè)置為false,在消費端進行限制

channel.basicQos(int prefetchSize, int prefetchCount, boolean global)

// 使用

channel.basicQos(0, 1, false);

channel.basicConsume(queueName, false, new MyConsumer(channel));

Note: autoAck設(shè)置為false, 一定要手工簽收消息

死信隊列(DLX)

當消息在隊列中變成死信,沒有消費者進行消費的時候,消息可能會被重新發(fā)布到另外一個隊列中,這個隊列就是死信隊列。

以下情況會導致消息進入死信隊列:

basic.reject/basic.nack 并且 requeue為false(不重回隊列)的時候,消息就是死信

消息TTL過期

隊列達到最大的長度

死信隊列也是正常的Exchange,和一般的Exchange沒什么區(qū)別,不過要做一點操作。

設(shè)置死信隊列包括:

設(shè)置Exchange(dlx.exchange名稱隨意),設(shè)置Queue(dlx.queue),設(shè)置RoutingKey(#)

創(chuàng)建正常的交換機,隊列,綁定,只不過加上一個參數(shù) arguments.put(“x-dead-letter-exchange”,“dlx.exchange”)

// 這就是一個普通的交換機 和 隊列 以及路由

String exchangeName = "test_dlx_exchange";

String routingKey = "dlx.#";

String queueName = "test_dlx_queue";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

Map agruments = new HashMap();

agruments.put("x-dead-letter-exchange", "dlx.exchange");

//這個agruments屬性,要設(shè)置到聲明隊列上

channel.queueDeclare(queueName, true, false, false, agruments);

channel.queueBind(queueName, exchangeName, routingKey);

//要進行死信隊列的聲明:

channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);

channel.queueDeclare("dlx.queue", true, false, false, null);

channel.queueBind("dlx.queue", "dlx.exchange", "#");

Java高架構(gòu)師、分布式架構(gòu)、高可擴展、高性能、高并發(fā)、性能優(yōu)化、Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分布式項目實戰(zhàn)學習架構(gòu)師視頻免費獲取架構(gòu)群:854180697

群鏈接:加群鏈接

寫在最后:歡迎留言討論,加關(guān)注,持續(xù)更新!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,208評論 2 11
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,539評論 19 139
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,512評論 2 34
  • 摘要:RabbitMQ發(fā)送消息時,都是先把消息發(fā)送給ExChange(交換機),然后再分發(fā)給有相應RoutingK...
    請叫wo小爺閱讀 1,403評論 0 2
  • 春天,樹葉開始閃出黃青,花苞輕輕的在風中擺動,似乎夾雜著一種冬天的凄寒,只等著一場春雨的洗禮,春天才是真正的來臨...
    微汐兒閱讀 467評論 0 2

友情鏈接更多精彩內(nèi)容