RabbitMQ使用
????你好!歡迎來到Java成長(zhǎng)筆記,主要是用于相互交流,相互學(xué)習(xí),也希望分享能幫到大家,如有錯(cuò)誤之處,希望指正,謝謝!
RabbitMQ簡(jiǎn)介
????RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務(wù)器是用Erlang語(yǔ)言編寫的,而群集和故障轉(zhuǎn)移是構(gòu)建在開放電信平臺(tái)框架上的。所有主要的編程語(yǔ)言均有與代理接口通訊的客戶端庫(kù)。
????Erlang語(yǔ)言最初用于交換機(jī)領(lǐng)域的架構(gòu)模式,這樣使得RabbitMQ在Broker之間進(jìn)行數(shù)據(jù)交互的性能是非常優(yōu)秀的,Erlang有著和原生Socket一樣的延遲,這也是RabbitMQ高性能的原因。
????Rabbit科技有限公司開發(fā)了RabbitMQ,并提供對(duì)其的支持。起初,Rabbit科技是LSHIFT和CohesiveFT在2007年成立的合資企業(yè),2010年4月被VMware旗下的SpringSource收購(gòu)。RabbitMQ在2013年5月成為GoPivotal的一部分。
常用消息中間件協(xié)議
AMQP協(xié)議
????????AMQP即Advanced Message Queuing Protocol,一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同開發(fā)語(yǔ)言等條件的限制。
????優(yōu)點(diǎn):可靠、通用。
MQTT協(xié)議
????MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸)是IBM開發(fā)的一個(gè)即時(shí)通訊協(xié)議,有可能成為物聯(lián)網(wǎng)的重要組成部分。該協(xié)議支持所有平臺(tái),幾乎可以把所有聯(lián)網(wǎng)物品和外部連接起來,被用來當(dāng)做傳感器和致動(dòng)器(比如通過Twitter讓房屋聯(lián)網(wǎng))的通信協(xié)議。
????優(yōu)點(diǎn):格式簡(jiǎn)潔、占用帶寬小、移動(dòng)端通信、PUSH、嵌入式系統(tǒng)。
STOMP協(xié)議
????STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息協(xié)議,是一種為MOM(Message Oriented Middleware,面向消息的中間件)設(shè)計(jì)的簡(jiǎn)單文本協(xié)議。STOMP提供一個(gè)可互操作的連接格式,允許客戶端與任意STOMP消息代理(Broker)進(jìn)行交互。
????優(yōu)點(diǎn):命令模式(非topic\queue模式)。
XMPP協(xié)議
????XMPP(可擴(kuò)展消息處理現(xiàn)場(chǎng)協(xié)議,Extensible Messaging and Presence Protocol)是基于可擴(kuò)展標(biāo)記語(yǔ)言(XML)的協(xié)議,多用于即時(shí)消息(IM)以及在線現(xiàn)場(chǎng)探測(cè)。適用于服務(wù)器之間的準(zhǔn)即時(shí)操作。核心是基于XML流傳輸,這個(gè)協(xié)議可能最終允許因特網(wǎng)用戶向因特網(wǎng)上的其他任何人發(fā)送即時(shí)消息,即使其操作系統(tǒng)和瀏覽器不同。
????優(yōu)點(diǎn):通用公開、兼容性強(qiáng)、可擴(kuò)展、安全性高,但XML編碼格式占用帶寬大。
其他基于TCP/IP自定義的協(xié)議
????一些特殊框架(如:redis、kafka、zeroMq等)根據(jù)自身需要未嚴(yán)格遵循MQ規(guī)范,而是基于TCP/IP自行封裝了一套協(xié)議,通過網(wǎng)絡(luò)socket接口進(jìn)行傳輸,實(shí)現(xiàn)了MQ的功能。
消息中間件優(yōu)點(diǎn)
1、異步解耦
通過上下游業(yè)務(wù)的松耦合涉及,即使下游子系統(tǒng)出現(xiàn)不可用或者宕機(jī),都不會(huì)影響核心交易系統(tǒng)的正產(chǎn)運(yùn)轉(zhuǎn)。
2、削峰填谷
在諸如搶紅包、秒殺等活動(dòng)帶來的流量脈沖,或因沒做相應(yīng)的保護(hù)而導(dǎo)致系統(tǒng)超負(fù)荷甚至崩潰,或因限制太過導(dǎo)致請(qǐng)求大量失敗而影響用戶體驗(yàn),削峰填谷是解決該問題的最佳方式,
3、支持分布式部署,能夠保證消息傳遞的高效和可靠,實(shí)現(xiàn)高并發(fā)、高可用、高性能,合理使用突破性能瓶頸。
常用消息中間件介紹
RabbitMQ安裝
????友情提示,RabbitMQ需要和Erlang版本相對(duì)應(yīng),本文使用的RabbitMQ版本是rabbitmq-server-generic-unix-3.8.3.tar.xz、Erlang版本是otp_src_22.2.tar
RabbitMQ下載地址
1、RabbitMQ官網(wǎng)地址
2、Erlang下載地址
3、RabbitMQ中文文檔
RabbitMQ安裝與常用命令
1、通過wget來安裝
$ wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
$ sudo rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
2、通過zip文件安裝
$ sudo yum install epel-release
$ sudo yum install erlang
3、rabbitmq啟動(dòng)
rabbitmqctl start_app
4、服務(wù)的停止
rabbitmqctl stop_app
5、查看節(jié)點(diǎn)狀態(tài)
rabbitmqctl status
6、插件管理
rabbitmq- plugins enable rabbitmq_ management
7、管理后臺(tái)地址
htp:/127.0.0.1:15672
8、添加用戶
rabbitmqctl add_user username password
9、列出所有用戶
rabbitmqctl list_users
10、刪除用戶
rabbitmqctl delete_user username
11、清除用戶權(quán)限
rabbitmqctl clear_permissions -p vhostpath username
12、列出用戶權(quán)限
rabbitmgctl list_user_permissions username
13、修改密碼
rabbitmqctl change_password username newpassword
14、設(shè)置用戶權(quán)限
rabbitmqctl set_permissions-p vhostpath username “.*”".*"".*"
15、創(chuàng)建虛擬主機(jī)
rabbitmactl add_vhost vhostpath
16、列出所有虛擬主機(jī)
rabbitmqctl list_vhosts
17、列出虛擬主機(jī)上所有權(quán)限
rabbitmqctl list_permissions -p vhostpath
18、刪除虛擬主機(jī)
rabbitmqctl delete_vhost vhostpath
19、查看所有隊(duì)列信息
rabbitmqctl list_queues
20、清除隊(duì)列里的消息
rabbitmqctl -o vhostpath purge_queue blue
21、移除所有數(shù)據(jù),要在rabbitmqctl stop_app之后使用
rabbitmqctl reset
22、組成集群命令
rabbitmqctl join_cluster< clusternode>[--ram]
23、查看集群狀態(tài)
rabbitmqctl cluster_status
24、修改集群節(jié)點(diǎn)的存儲(chǔ)形式
rabbitmqctl change_cluster_node_type disc ram
25、忘記節(jié)點(diǎn)(摘除節(jié)點(diǎn))
rabbitmactl forget_cluster_node[--offline]
26、修改節(jié)點(diǎn)名稱
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2][newnode2...]
RabbitMQ常用名詞解釋
1、Server:又稱 Broker,接受客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù)
2、Connection:連接,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接
3、Channel:網(wǎng)絡(luò)信道,幾乎所有的操作都在 Channels中進(jìn)行,Channel是進(jìn)行消息讀寫的通道??蛻舳丝山⒍鄠€(gè) Channel,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)。
4、Message:消息、服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),由Properties和Body組成。Properties可以對(duì)消息進(jìn)行修飾,比如消息的優(yōu)先級(jí)、延遲等高級(jí)特性;Body則就是消息體內(nèi)容。
5、Virtual host:虛擬地址,用于進(jìn)行邏輯隔離,最上層的消息路由。一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue,同一個(gè)VirtualHost里面不能有相同名稱的Exchange或Queue
6、Exchange:交換機(jī),接收消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列
7、Binding:Exchange和Queue之間的虛擬連接,binding中可以包含 routing key
8、Routing key:一個(gè)路由規(guī)則,虛擬機(jī)可用它來確定如何路由一個(gè)特定消息
9、Queue:也稱為Message Queue,消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者
10、Connectionfactory:獲取連接工廠
RabbitMQ基本使用
交換機(jī)基本屬性
Name:交換機(jī)名稱
Type:交換機(jī)類型direct、topic、fanout、headers
Durability:是否需要持久化,true為持久化 false不持久化
Auto Delete:當(dāng)最后一個(gè)綁定到 Exchange上的隊(duì)列刪除后,自動(dòng)刪除該 Exchange
Internal:當(dāng)前Exchange是否用于Rabbitmqp內(nèi)部使用,默認(rèn)為 False
Arguments:擴(kuò)展參數(shù),用于擴(kuò)展AMQP協(xié)議自制定化使用
引入依賴
1、需要引入的maven依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、基本使用
// 消費(fèi)端代碼
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
@Slf4j
public class Consumer {
public static void main(String[] args) throws Exception {
// 1、創(chuàng)建一個(gè)Connectionfactory,并進(jìn)行配置
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(rabbitVo.getHost());
connectionFactory.setPort(rabbitVo.getPort());
connectionFactory.setUsername(rabbitVo.getUserName());
connectionFactory.setPassword(rabbitVo.getPassWord());
connectionFactory.setVirtualHost(rabbitVo.getVirtualHost());
// 2、通過連接工廠的建連接
final Connection connection = connectionFactory.newConnection();
// 3、通過 connectioni的建一個(gè) Channe1
final Channel channel = connection.createChannel();
// 4、申明創(chuàng)建一個(gè)隊(duì)列 持久化服務(wù)器重啟隊(duì)列不會(huì)消失
final String queueName = "e-rabbitmq";
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
channel.queueDeclare(queueName, true, false, false, null);
// 5、創(chuàng)建消費(fèi)者
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.error("消息內(nèi)容:" + new String(body));
}
};
// 6、設(shè)置Channel
channel.basicConsume(queueName, true, defaultConsumer);
log.error(" 接收消息成功 ---> " + System.currentTimeMillis());
}
}
// 生產(chǎn)端代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
@Slf4j
public class Procuder {
public static void main(String[] args) throws Exception {
// 1、創(chuàng)建一個(gè)Connectionfactory,并進(jìn)行配置
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(rabbitVo.getHost());
connectionFactory.setPort(rabbitVo.getPort());
connectionFactory.setUsername(rabbitVo.getUserName());
connectionFactory.setPassword(rabbitVo.getPassWord());
connectionFactory.setVirtualHost(rabbitVo.getVirtualHost());
// 2、通過連接工廠的建連接
final Connection connection = connectionFactory.newConnection();
// 3、通過 connectioni的建一個(gè) Channe1
final Channel channel = connection.createChannel();
// 4、通過channe1發(fā)送數(shù)據(jù)
try {
for(int i=0;i<5;i++){
final String msg = "Hello RabbitMq";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish("", "e-rabbitmq", null, msg.getBytes());
}
} finally {
// 5、記得要關(guān)閉連接
channel.close();
connection.close();
}
log.error(" 消息發(fā)送成功 ---> " + System.currentTimeMillis());
}
}
上面模式使用需要注意:消息生產(chǎn)者并沒有指定Exchange,只指定了隊(duì)列名稱,消息消費(fèi)端會(huì)根據(jù)AMQP default模式來進(jìn)行處理,也就是根據(jù)與Routingkey名稱相同的隊(duì)列來進(jìn)行處理。如下圖所示:
Exchange三種類型使用
主要API介紹
// exchange:消息 Exchange 名稱
// type:消息 Exchange 類型
// durable:是否持久化 true:是 false:否
// autoDelete:true表示當(dāng)最后一個(gè)綁定到Exchange上的隊(duì)列刪除后,自動(dòng)刪除該Exchange
// internal:當(dāng)前Exchange是否用于RabbitMQ內(nèi)部使用,默認(rèn)為False
// arguments:擴(kuò)展參數(shù),用于擴(kuò)展AMQP協(xié)議自制定化使用
// 申明一個(gè)交換機(jī)
DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
// queue:隊(duì)列名稱
// durable:是否持久化 true:是 false:否
// exclusive:設(shè)置為獨(dú)占、只有一個(gè)隊(duì)列可以使用
// 創(chuàng)建一個(gè)隊(duì)列
DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
// queue:隊(duì)列名稱
// exchagne:路由名稱
// routingKey:路由key
// 建立路由、routingKey、隊(duì)列綁定關(guān)系
BindOk queueBind(String queue, String exchange, String routingKey)
// BasicProperties 對(duì)象屬性
Map<String,Object> map = ImmutableMap.of("msg1","msg1","msg2","msg2");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 1、不是持久化 2、持久化
.contentEncoding("UTF-8") // 設(shè)置字符集
.expiration("10000") // 10s沒有消費(fèi)會(huì)自動(dòng)移除
.headers(map) //
.build();
// 獲取消息信息
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
log.error("消息內(nèi)容:" + new String(body) + "BasicProperties屬性:" + JSON.toJSONString(properties));
Direct Exchange使用
Direct模式:所有發(fā)送到Direct類型的Exchange消息被轉(zhuǎn)發(fā)到Routekey中指定的Queue,注意:Direct模式可以使用RabbitMQ自帶的Exchange:default
Exchange,所以不需要將Exchange進(jìn)行任何綁定( binding)操作,消息傳遞時(shí),Routekey必須完全匹配才會(huì)被隊(duì)列接收,否則該消息會(huì)被拋棄。代碼展示如下:
// 消費(fèi)端代碼
import com.rabbitmq.client.*;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerDirect {
public static void main(String[] args) throws Exception {
// 1、通過連接工廠的建連接
final Connection connection = mqFactory.newConnection();
// 2、通過 connection 建一個(gè) Channe1
final Channel channel = connection.createChannel();
// 申明一個(gè)交換機(jī)
channel.exchangeDeclare(MqConfig.directExChange, MqTypeEnum.MQ_DIRECT.getType(), true, false, false, null);
// 是否持久化
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
channel.queueDeclare(MqConfig.directQueue, true, false,false, null);
// 建立一個(gè)綁定關(guān)系
channel.queueBind(MqConfig.directQueue, MqConfig.directExChange, MqConfig.directRoutingKey);
// 3、創(chuàng)建消費(fèi)者
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.error("消息內(nèi)容:" + new String(body));
}
};
// 4、設(shè)置Channel
channel.basicConsume(MqConfig.directQueue, true, defaultConsumer);
log.error(" 接收消息成功 ---> " + System.currentTimeMillis());
}
}
// 生產(chǎn)端代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderDirect {
public static void main(String[] args) throws Exception {
// 1、通過連接工廠的建連接
final Connection connection = mqFactory.newConnection();
// 2、通過 connectioni的建一個(gè) Channe1
final Channel channel = connection.createChannel();
// 3、通過channe1發(fā)送數(shù)據(jù)
try {
for(int i=0;i<5;i++){
final String msg = "Hello RabbitMq";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish(MqConfig.directExChange, MqConfig.directRoutingKey, null, msg.getBytes());
}
} finally {
// 4、關(guān)閉連接
channel.close();
connection.close();
}
log.error(" 消息發(fā)送成功 ---> " + System.currentTimeMillis());
}
}
Topic Exchange使用
Topic模式:生產(chǎn)端發(fā)送的消息將Exchange、RouteKey和某Topic進(jìn)行模糊匹配。例如:RouteKey中匹配符號(hào),符號(hào)“#”匹配一個(gè)或多個(gè)詞,符號(hào)“”匹配不多不少一個(gè)詞。示例:”10g.#“能夠匹配到"log.info.aa”、"log."只會(huì)匹配到"log.error“。代碼展示如下:
// 消費(fèi)端代碼
import com.rabbitmq.client.*;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Optional;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerTopic {
public static void main(String[] args) throws Exception {
// 1、通過連接工廠的建連接
final Connection connection = mqFactory.newConnection();
// 2、通過 connection 建一個(gè) Channe1
final Channel channel = connection.createChannel();
try {
Optional.ofNullable(channel).ifPresent(x->{
// 申明一個(gè)交換機(jī)
x.exchangeDeclare(MqConfig.topicExChange, MqTypeEnum.MQ_TOPIC.getType(), true, false, false, null);
// 是否持久化
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
x.queueDeclare(MqConfig.topicQueue, true, false,false, null);
// 建立一個(gè)綁定關(guān)系
x.queueBind(MqConfig.topicQueue, MqConfig.topicExChange, MqConfig.topicRoutingKey);
});
} catch (Exception e) {
throw new RuntimeException("channel連接失敗!");
}
// 3、創(chuàng)建消費(fèi)者
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.error("消息內(nèi)容:" + new String(body));
}
};
// 4、設(shè)置Channel
channel.basicConsume(MqConfig.topicQueue, true, defaultConsumer);
log.error(" 接收消息成功 ---> " + System.currentTimeMillis());
}
}
// 生產(chǎn)端代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderTopic {
public static void main(String[] args) throws Exception {
// 1、通過連接工廠的建連接
final Connection connection = mqFactory.newConnection();
// 2、通過 connectioni的建一個(gè) Channe1
final Channel channel = connection.createChannel();
// 3、通過channe1發(fā)送數(shù)據(jù)
try {
for(int i=0;i<5;i++){
final String msg = "Hello RabbitMq";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
final String routingkey = MqConfig.topicProcuderRoutingKey + UUID.randomUUID().toString().replaceAll("-","");
channel.basicPublish(MqConfig.topicExChange, routingkey, null, msg.getBytes());
}
} finally {
// 4、關(guān)閉連接
channel.close();
connection.close();
}
log.error(" 消息發(fā)送成功 ---> " + System.currentTimeMillis());
}
}
Fanout Exchange使用
Fanout模式:不處理路由鍵,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上。發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上,F(xiàn)anout交換機(jī)轉(zhuǎn)發(fā)消息是最快的。代碼展示如下:
// 消費(fèi)端代碼
import com.rabbitmq.client.*;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Optional;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerFanout {
public static void main(String[] args) throws Exception {
// 1、通過連接工廠的建連接
final Connection connection = mqFactory.newConnection();
// 2、通過 connection 建一個(gè) Channe1
final Channel channel = connection.createChannel();
Optional.ofNullable(channel).ifPresent(x->{
try {
// 申明一個(gè)交換機(jī)
x.exchangeDeclare(MqConfig.fanoutExChange, MqTypeEnum.MQ_FANOUT.getType(), true, false, false, null);
// 是否持久化
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
x.queueDeclare(MqConfig.fanoutQueue, true, false,false, null);
// 建立一個(gè)綁定關(guān)系
x.queueBind(MqConfig.fanoutQueue, MqConfig.fanoutExChange, MqConfig.fanoutRoutingKey);
} catch (IOException e) {
e.printStackTrace();
}
});
// 3、創(chuàng)建消費(fèi)者
final DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.error("消息內(nèi)容:" + new String(body));
}
};
// 4、設(shè)置Channel
channel.basicConsume(MqConfig.fanoutQueue, true, defaultConsumer);
log.error(" 接收消息成功 ---> " + System.currentTimeMillis());
}
}
// 生產(chǎn)端代碼展示
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import static com.show.rabbit.RabbitMqUtil.rabbitVo;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderFanout {
public static void main(String[] args) throws Exception {
// 1、通過連接工廠的建連接
final Connection connection = mqFactory.newConnection();
// 2、通過 connectioni的建一個(gè) Channe1
final Channel channel = connection.createChannel();
// 3、通過channe1發(fā)送數(shù)據(jù)
try {
for(int i=0;i<5;i++){
final String msg = "Hello RabbitMq";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish(MqConfig.fanoutExChange, MqConfig.fanoutRoutingKey, null, msg.getBytes());
}
} finally {
// 4、關(guān)閉連接
channel.close();
connection.close();
}
log.error(" 消息發(fā)送成功 ---> " + System.currentTimeMillis());
}
}
消息可靠性投遞
基本概念
RabbitMQ的消息確認(rèn)機(jī)制
消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果Brokerl收到消息,則會(huì)給我們生產(chǎn)者一個(gè)應(yīng)答。生產(chǎn)者進(jìn)行接收應(yīng)答,用來確定這條消息是否正常的發(fā)送到Broker。
Return消息機(jī)制
Return Listener用于處理一些不可路由的消息,在某些情況下,我們?cè)诎l(fā)送消息的時(shí)候,當(dāng)前的Exchange不存在或者指定的路由key路由不到,這個(gè)時(shí)候如果我們需要監(jiān)聽這種不可達(dá)的消息,就要使用Return Listener。
消息可靠性投遞(一)
1、利用RabbitMQ的消息的Confirm確認(rèn)消息。
2、在發(fā)送消息之前,對(duì)消息進(jìn)行落庫(kù),對(duì)消息狀態(tài)進(jìn)行打標(biāo)。例如:投遞狀態(tài)、更新時(shí)間等。
3、通過消息狀態(tài)和定時(shí)任務(wù)對(duì)投遞失敗的消息進(jìn)行二次投送、或者通過內(nèi)管后臺(tái)進(jìn)行單條數(shù)據(jù)的重試。PD如下圖所示。
消息可靠性投遞(二)
1、消息投遞之前進(jìn)行入庫(kù)操作,第一條消息進(jìn)入MQ Broker,進(jìn)入消費(fèi)端,如果消費(fèi)成功,會(huì)發(fā)送一條Confirm消息到Callback服務(wù),收到消息進(jìn)行MSG DB Change。
2、第二條延時(shí)消息進(jìn)行發(fā)送,進(jìn)入MQ Broker,進(jìn)入Callback服務(wù),查詢消息是否消費(fèi)成功、如果成功不進(jìn)行操作,如果當(dāng)前消息狀態(tài)沒有變更消息消費(fèi)失敗,會(huì)通知當(dāng)條消息進(jìn)入ReSend操作。
3、這樣處理的好處是節(jié)省服務(wù)器的資源,整個(gè)Callback作為單獨(dú)服務(wù)統(tǒng)一管理,方便維護(hù)。具體流程圖如下圖所示。
消費(fèi)端限流
RabbitMQ提供了一種qos(服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過基于consumer或者channel設(shè)置Qos的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。
// prefetchsize:設(shè)置為0表示不限制消息大小 一般我們?cè)O(shè)置為0
// prefetchCount:不要同時(shí)給一個(gè)消費(fèi)者推送多于N個(gè)消息,即一旦有N個(gè)消息還沒有ack,則該consumer將block掉,直到有消息ack,才進(jìn)行后續(xù)的消費(fèi)
// global:限流是channel級(jí)別的還是consumers級(jí)別
void Basicqos(uint prefetchsize, ushort prefetchcount, bool global)
// 消費(fèi)端代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.custom.CustomConsumer;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Optional;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerAck {
public static void main(String[] args) throws Exception {
// 1、通過連接工廠的建連接
final Connection connection = mqFactory.newConnection();
// 2、通過 connection 建一個(gè) Channe1
final Channel channel = connection.createChannel();
Optional.ofNullable(channel).ifPresent(x->{
try {
// 申明一個(gè)交換機(jī)
x.exchangeDeclare(MqConfig.ackExChange, MqTypeEnum.MQ_TOPIC.getType(), true, false, false, null);
// 是否持久化
x.queueDeclare(MqConfig.ackQueue, true, false,false, null);
// 建立一個(gè)綁定關(guān)系
x.queueBind(MqConfig.ackQueue, MqConfig.ackExChange, MqConfig.ackRoutingKey);
} catch (IOException e) {
e.printStackTrace();
}
});
// 3、限流處理
// int prefetchSize, int prefetchCount, boolean global
channel.basicQos(0, 1, false);
// 4、設(shè)置Channel String queue, boolean autoAck, Consumer callback
// autoAck true 自動(dòng)簽收 false 手動(dòng)簽收
channel.basicConsume(MqConfig.ackQueue, false, new AckConsumer(channel));
log.error(" 消息消費(fèi)成功 ---> " + System.currentTimeMillis());
}
}
// 生產(chǎn)端代碼
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderAck {
public static void main(String[] args) throws Exception {
// 1、通過連接工廠的建連接
final Connection connection = mqFactory.newConnection();
// 2、通過 connectioni的建一個(gè) Channe1
final Channel channel = connection.createChannel();
// 3、通過channe1發(fā)送數(shù)據(jù)
for(int i=0;i<5;i++){
final String msg = "Hello RabbitMq";
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
final String routingkey = MqConfig.ackProcuderRoutingKey + UUID.randomUUID().toString().replaceAll("-","");
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish(MqConfig.ackExChange, routingkey, true, properties, msg.getBytes());
}
log.error(" 消息生產(chǎn)成功 ---> " + System.currentTimeMillis());
}
}
死信隊(duì)列
死信隊(duì)列:DLX,Dead-Letter-Exchange介紹<font>
當(dāng)消息在一個(gè)隊(duì)列中變成死信(dead message)之后,它能被重新publish到另一個(gè)Exchange,這個(gè)Exchange就是DLX。
DLX也是一個(gè)正常的Exchange,和一般的Exchange沒有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就>是設(shè)置某個(gè)隊(duì)列的屬性。當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange上去,進(jìn)而被路由到另一個(gè)隊(duì)列。<font>
TTL隊(duì)列/消息介紹<font>
TTL是Time To Livel的縮寫,也就是指生存時(shí)間、RabbitMQ支持設(shè)置消息的過期時(shí)間,在發(fā)送消息發(fā)送時(shí)可以進(jìn)行指定,從消息進(jìn)入隊(duì)列開始計(jì)算,只要超過了隊(duì)列的超時(shí)時(shí)間配置,那么消息會(huì)自動(dòng)被清除。<font>
消息變成死信有以下幾種情況<font>
1、消息被拒絕(basic.reject/basic.nack)并且requeue=false
2、消息TTL過期
3、隊(duì)列達(dá)到最大長(zhǎng)度下面**
// 消費(fèi)端代碼
import com.google.common.collect.Maps;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.enums.MqTypeEnum;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ConsumerDlx {
public static void main(String[] args) throws Exception {
// 1、通過連接工廠的建連接
final Connection connection = mqFactory.newConnection();
// 2、通過 connection 建一個(gè) Channe1
final Channel channel = connection.createChannel();
Map<String, Object> agruments = Maps.newHashMap();
agruments.put("x-dead-letter-exchange", MqConfig.dlxExChange);
// dlx test
channel.exchangeDeclare(MqConfig.dlxTestExChange, MqTypeEnum.MQ_TOPIC.getType(), true, false, false, null);
// 是否持久化
channel.queueDeclare(MqConfig.dlxTestQueue, true, false,false, agruments);
// 建立一個(gè)綁定關(guān)系
channel.queueBind(MqConfig.dlxTestQueue, MqConfig.dlxTestExChange, MqConfig.dlxTestRoutingKey);
// 申明 dlx
channel.exchangeDeclare(MqConfig.dlxExChange, MqTypeEnum.MQ_TOPIC.getType(), true, false, false, null);
// dlx 是否持久化
channel.queueDeclare(MqConfig.dlxQueue, true, false,false, null);
// dlx 建立一個(gè)綁定關(guān)系
channel.queueBind(MqConfig.dlxQueue, MqConfig.dlxExChange, MqConfig.dlxRoutingKey);
channel.basicConsume(MqConfig.dlxTestQueue, false, new DlxConsumer(channel));
log.error(" 接收消息成功 ---> " + System.currentTimeMillis());
}
}
// 生產(chǎn)端代碼
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.show.service.MqConfig;
import lombok.extern.slf4j.Slf4j;
import static com.show.util.MqConnectionUtil.mqFactory;
@Slf4j
public class ProcuderDlx {
public static void main(String[] args) throws Exception {
// 1、通過連接工廠的建連接
final Connection connection = mqFactory.newConnection();
// 2、通過connectioni的建一個(gè) Channe1
final Channel channel = connection.createChannel();
// 3、通過channe1發(fā)送數(shù)據(jù)
for(int i=0;i<5;i++){
final String msg = "Hello RabbitMq";
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
channel.basicPublish(MqConfig.dlxTestExChange, MqConfig.dlxTestProcuderRoutingKey, true, properties, msg.getBytes());
}
log.error(" 消息發(fā)送成功 ---> " + System.currentTimeMillis());
}
}
本章完結(jié),后續(xù)還會(huì)持續(xù)更新,分享Java成長(zhǎng)筆記,希望我們能一起成長(zhǎng)。如果你覺得我的分享有用,記得點(diǎn)贊和關(guān)注哦!這對(duì)我是最好的鼓勵(lì)。謝謝!
PS:轉(zhuǎn)載請(qǐng)注明出處!