1. Helloworld-基本消息模型
- 導入的jar包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<!--和springboot2.0.5對應-->
<version>5.4.1</version>
</dependency>
- 連接工具類
public class ConnectionUtil {
/**
* 建立與RabbitMQ的連接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設置服務地址
factory.setHost("127.0.0.1");
//端口
factory.setPort(5672);
//設置賬號信息,用戶名、密碼、vhost
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 通過工程獲取連接
Connection connection = factory.newConnection();
return connection;
}
}

發(fā)送方
流程:
①:獲得連接對象
②:創(chuàng)建與交換機的連接通道
③:創(chuàng)建消息隊列
④:發(fā)送消息
public class Send {
public static final String QUEUE_NAME_HELLO = "queue_name_hello";
public static void main(String[] args) {
Connection connection = null;
try {
//1. 創(chuàng)建連接
connection = ConnectionUtil.getConnection();
//2. 創(chuàng)建與Exchange的通道,每個連接可以創(chuàng)建多個通道,每個通道代表一個會話任務
Channel channel = connection.createChannel();
/**
* 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建
* param1:隊列名稱
* param2:是否持久化
* param3:隊列是否獨占此連接
* param4:隊列不再使用時是否自動刪除此隊列
* param5:隊列參數(shù)
*/
channel.queueDeclare(QUEUE_NAME_HELLO,true,false,false,null);
String message = "hello rabbitmq";
/**
* 消息發(fā)布方法
* param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
* param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發(fā)到指定的消息隊列
* param3:消息包含的屬性
* param4:消息體
*/
/**
* 這里沒有指定交換機,消息將發(fā)送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯
示綁定或解除綁定
* 默認的交換機,routingKey等于隊列名稱
*/
channel.basicPublish("", QUEUE_NAME_HELLO,
null,message.getBytes());
System.out.println("message send successful!");
} catch (Exception e) {
e.printStackTrace();
} finally {
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消費者:
流程:
①:獲得連接對象
②:創(chuàng)建與交換機的連接通道
③:聲明監(jiān)聽隊列
④:從隊列中獲取消息,接收消息的回調
public class Consumer01 {
public static void main(String[] args) {
Connection connection = null;
try {
//1. 獲得連接
connection = ConnectionUtil.getConnection();
//2. 創(chuàng)建與Exchange的通道,每個連接可以創(chuàng)建多個通道,每個通道代表一個會話任務
Channel channel = connection.createChannel();
//3. 聲明隊列
/**
* 消費者接收消息調用此方法
* @param consumerTag 消費者的標簽,在channel.basicConsume()去指定
* @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志
(收到消息失敗后是否需要重新發(fā)送)
* @param properties
* @param body
*/
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者標識:"+consumerTag);
System.out.println("交換機名稱:"+envelope.getExchange());
System.out.println("路由key:"+envelope.getRoutingKey());
System.out.println("消息id:"+envelope.getDeliveryTag());
System.out.println("接收消息:"+new String(body));
System.out.println("消息接收成功!");
}
};
/**
* 監(jiān)聽隊列String queue, boolean autoAck,Consumer callback
* 參數(shù)明細
* 1、隊列名稱
* 2、是否自動回復,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置
為false則需要手動回復
* 3、消費消息的方法,消費者接收到消息后調用此方法
*/
channel.basicConsume(Send.QUEUE_NAME_HELLO,true , consumer);
} catch (Exception e) {
e.printStackTrace();
} finally {
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
啟動Send發(fā)送消息,可以看到當前的隊列中有一條消息處于就緒狀態(tài)!

啟動Consumer01接收消息,消息被成功接收!
消費者標識:amq.ctag-ALoQPcEfIDlldhAZqAU4CQ
交換機名稱:
路由key:queue_name_hello
消息id:1
接收消息:hello rabbitmq
消息接收成功!

2. 消息確認機制(ACK)
在上述代碼中,在basicConsume()里的autoAck設置的值為true,他表示自動簽收,即當消費者接收到了消息之后,隊列中的消息就會立即刪除,如果為false就表示為手動簽收!
RabbitMQ有一個ACK機制:即當消費者接收到了消息后,會向 RabbitMQ發(fā)送一個回執(zhí)ACK,告知消息已經被接收!RabbitMQ會將隊列中的消息刪除掉!ACK機制分為兩種
自動簽收:消息一旦被接收,自動發(fā)送回執(zhí)ACK
手動簽收:消息被接收后,需要手動發(fā)送ACK
自動簽收存在的問題:
模擬一個異常環(huán)境,當消費者接收到消息,進入回調函數(shù)中,人為拋出一個異常,查看消息接收狀態(tài)!
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
int i = 1/0; //模擬異常環(huán)境
System.out.println("消費者標識:"+consumerTag);
System.out.println("交換機名稱:"+envelope.getExchange());
System.out.println("路由key:"+envelope.getRoutingKey());
System.out.println("消息id:"+envelope.getDeliveryTag());
System.out.println("接收消息:"+new String(body));
System.out.println("消息接收成功!");
}
};
channel.basicConsume(Send.QUEUE_NAME_HELLO,true , consumer);
}
}
此時再次發(fā)送消息:

[圖片上傳失敗...(image-b23445-1585216601052)]
再次接收:控制臺沒有打印任何信息,消息是沒有被成功接收到的

[圖片上傳失敗...(image-e39c33-1585216601052)]
但是隊列中的消息已經被刪除掉了,說明,在自動簽收的情況下,如果遇到異常,數(shù)據(jù)會丟失!因此如果數(shù)據(jù)很重要建議使用手動模式!
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//int i = 1/0;
System.out.println("消費者標識:"+consumerTag);
System.out.println("交換機名稱:"+envelope.getExchange());
System.out.println("路由key:"+envelope.getRoutingKey());
System.out.println("消息id:"+envelope.getDeliveryTag());
System.out.println("接收消息:"+new String(body));
System.out.println("消息接收成功!");
channel.basicAck(envelope.getDeliveryTag(), false);//手動提交ACK
}
};
channel.basicConsume(Send.QUEUE_NAME_HELLO,false , consumer);
3. Work queues模型

[圖片上傳失敗...(image-56d745-1585216601052)]
work queues與入門程序相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息!
結果:
同一個消息只能給一個一個消費者
RabbitMQ采用輪詢的方式將消息發(fā)送給消費者
消費者只有在處理完一條消息后,才能接收下一條消息
能者多勞
模擬一個消費者處理消息效率低下的情況:
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者標識:"+consumerTag);
System.out.println("交換機名稱:"+envelope.getExchange());
System.out.println("路由key:"+envelope.getRoutingKey());
System.out.println("消息id:"+envelope.getDeliveryTag());
System.out.println("接收消息:"+new String(body));
System.out.println("消息接收成功!");
channel.basicAck(envelope.getDeliveryTag(), false);//手動提交ACK
}
};
channel.basicConsume(Send.QUEUE_NAME_WORK_QUEUE,false , consumer);
此時RabbitMQ采用輪詢的方式再發(fā)送消息,處于休眠的消費者和另一個消費者會輪流接收到消息,但是處于休眠的消費者處理耗時較長,而另一個消費者處理效率更高,會一直處于空閑狀態(tài)!
此時可以使用basicQos方法和prefetchCount = 1設置。 這告訴RabbitMQ一次不要向工作人員發(fā)送多于一條消息。 或者換句話說,不要向工作人員發(fā)送新消息,直到它處理并確認了前一個消息。 相反,它會將其分派給不是仍然忙碌的下一個工作人員。
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者標識:"+consumerTag);
System.out.println("交換機名稱:"+envelope.getExchange());
System.out.println("路由key:"+envelope.getRoutingKey());
System.out.println("消息id:"+envelope.getDeliveryTag());
System.out.println("接收消息:"+new String(body));
System.out.println("消息接收成功!");
channel.basicQos(1); //設置最大處理消息數(shù)為1
channel.basicAck(envelope.getDeliveryTag(), false);//手動提交ACK
}
};
這樣設置后,處于休眠的消費者一直在處理分配給它的一條消息,而另一個消費者會接收到消息,處理完后又會接收到下一條消息,不再處于空閑狀態(tài),從而實現(xiàn)能者多勞!
4. 訂閱模型分類

[圖片上傳失敗...(image-b0a2d-1585216601052)]
特點:
①:一個生產者,多個消費者
②:每個消費者偵聽自己的隊列
③:每個隊列綁定在交換機上
④:生產者負責把消息發(fā)送給交換機
⑤:生產者把消息發(fā)送到交換機,通過隊列到達消費者,實現(xiàn)一條消息,被多個消費者消費!
⑤:根據(jù)交換機的不同類型處理消息
Exchange(交換機)只負責轉發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!
4.1 Exchange類型-Fanout:廣播
在廣播的工作模式下:
①:一個生產者和多個消費者
②:生產者與交換機綁定,多個消費者有自己的隊列,隊列與交換機綁定!
③:生產者負責把消息發(fā)送給交換機,交換機負責把消息發(fā)送給具體的隊列,生產者無法決定!
④:交換機把消息發(fā)送給每一個隊列
⑤:每一個消費者都能接收到消息!實現(xiàn)一條消息被多個消費者消費
流程:
聲明Exchange,不再聲明Queue
發(fā)送消息到Exchange,不再發(fā)送到Queue
生產者:
public class Send {
public static final String EXCHANGE_NAME_FANOUT= "exchange_name_fanout";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String message = "hello rabbitmq";
//聲明交換機名字和類型
channel.exchangeDeclare(EXCHANGE_NAME_FANOUT, BuiltinExchangeType.FANOUT);
//發(fā)布消息 routingKey為"",發(fā)送到每一個與之綁定的隊列!
channel.basicPublish(EXCHANGE_NAME_FANOUT, "", null, message.getBytes());
System.out.println("send message successful!");
}
}
消費者01:
public class Consumer01 {
public static final String QUEUE_NAME_01 = "quene_name_01";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME_01, false, false, false, null);
//綁定到交換機
channel.queueBind(QUEUE_NAME_01, Send.EXCHANGE_NAME_FANOUT, "");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者標識:"+consumerTag);
System.out.println("交換機名稱:"+envelope.getExchange());
System.out.println("路由key:"+envelope.getRoutingKey());
System.out.println("消息id:"+envelope.getDeliveryTag());
System.out.println("接收消息:"+new String(body));
System.out.println("消息接收成功!");
channel.basicQos(1); //設置最大處理消息數(shù)為1
channel.basicAck(envelope.getDeliveryTag(), false);//手動提交ACK
}
};
channel.basicConsume(QUEUE_NAME_01,false,consumer );
}
}
消費者02:
public class Consumer02 {
public static final String QUEUE_NAME_02 = "quene_name_02";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME_02, false, false, false, null);
//綁定到交換機
channel.queueBind(QUEUE_NAME_02, Send.EXCHANGE_NAME_FANOUT, "");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者標識:"+consumerTag);
System.out.println("交換機名稱:"+envelope.getExchange());
System.out.println("路由key:"+envelope.getRoutingKey());
System.out.println("消息id:"+envelope.getDeliveryTag());
System.out.println("接收消息:"+new String(body));
System.out.println("消息接收成功!");
channel.basicQos(1); //設置最大處理消息數(shù)為1
channel.basicAck(envelope.getDeliveryTag(), false);//手動提交ACK
}
};
channel.basicConsume(QUEUE_NAME_02,false,consumer );
}
}
生產者發(fā)送消息到交換機,交換機根據(jù)routingKey匹配隊列,這里沒有指定routingKey,交換機會將消息發(fā)送到每一個隊列,消費者01和消費者02都能收到消息!
使用場景:
群發(fā)信息