上文的工作隊列模式是直接在生產(chǎn)者與消費者里聲明好一個隊列(不指定交換機,生產(chǎn)者和消費者之間直接連接隊列),這種情況下消息只會對應同類型的消費者。
顯然這種只處理同種類型的消息是有弊端的。
舉個用戶注冊的列子
門戶網(wǎng)站,用戶在注冊完后一般都會發(fā)送消息通知用戶注冊成功(失?。?/p>
如果在一個系統(tǒng)中,用戶注冊信息有郵箱、手機號,那么在注冊完后會向郵箱和手機號都發(fā)送注冊完成信息(假設都發(fā)送)。
利用 MQ 實現(xiàn)業(yè)務異步處理,如果是用工作隊列的話,就會聲明一個注冊信息隊列。注冊完成之后生產(chǎn)者會向隊列提交一條注冊數(shù)據(jù),消費者取出數(shù)據(jù)同時向郵箱以及手機號發(fā)送兩條消息。但是實際上郵箱和手機號信息發(fā)送實際上是不同的業(yè)務邏輯,不應該放在一塊處理。(應該放到不同的隊列中,由不同的消費者消費)
這個時候就可以利用發(fā)布/訂閱模式將消息發(fā)送到轉(zhuǎn)換機(EXCHANGE),聲明兩個不同的隊列(郵箱、手機),并綁定到交換機。這樣生產(chǎn)者只需要發(fā)布一次消息,兩個隊列都會接收到消息發(fā)給對應的消費者,大致如下圖所示。

1. 什么是發(fā)布訂閱模式
簡單解釋就是,可以將消息發(fā)送給不同類型的消費者。做到發(fā)布一次,消費多個。下圖取自于官方網(wǎng)站(RabbitMQ)的發(fā)布/訂閱模式的圖例:

P 表示為生產(chǎn)者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。
2.代碼部分
生產(chǎn)者
cn.lovingliu.rabbitmq_fanout.producer.Producer
package cn.lovingliu.rabbitmq_fanout.producer;
import cn.lovingliu.common.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author:LovingLiu
* @Description: 生產(chǎn)者
* @Date:Created in 2020-01-16
*/
public class Producer {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
/** 1.創(chuàng)建新的連接 */
Connection connection = ConnectionUtil.getConnection();
/** 2.創(chuàng)建通道 */
Channel channel = connection.createChannel();
/** 3.綁定的交換機 參數(shù)1交互機名稱 參數(shù)2 exchange類型 */
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/** 4.發(fā)送消息 */
for (int i = 0; i < 10; i++)
{
String message = "用戶注冊消息:" + i;
System.out.println("[send]:" + message);
//發(fā)送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
try {
Thread.sleep(5 * i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** 5.關閉通道、連接 */
channel.close();
connection.close();
/** 注意:如果消費沒有綁定交換機和隊列,則消息會丟失 */
}
}
代碼補充,channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8")); 其中第二個參數(shù)為空類似于表示全局廣播,只要綁定到該隊列上的消費者理論上是都可以收到的。
消費者
短信消費者cn.lovingliu.rabbitmq_fanout.consumer.ConsumerSMS
package cn.lovingliu.rabbitmq_fanout.consumer;
import cn.lovingliu.common.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author:LovingLiu
* @Description: 短信消費者
* @Date:Created in 2020-01-16
*/
public class ConsumerSMS {
private static final String QUEUE_NAME = "ConsumerFanout_sms";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消費者啟動");
/** 1.創(chuàng)建新的連接 */
Connection connection = ConnectionUtil.getConnection();
/** 2.創(chuàng)建通道 */
Channel channel = connection.createChannel();
/** 3.消費者關聯(lián)隊列 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 4.消費者綁定交換機 參數(shù)1 隊列 參數(shù)2交換機 參數(shù)3 routingKey */
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消費者獲取生產(chǎn)者消息:" + msg);
}
};
/** 5.消費者監(jiān)聽隊列消息 */
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
郵件消費者cn.lovingliu.rabbitmq_fanout.consumer.ConsumerEmail
package cn.lovingliu.rabbitmq_fanout.consumer;
import cn.lovingliu.common.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author:LovingLiu
* @Description: 郵件消費
* @Date:Created in 2020-01-16
*/
public class ConsumerEmail {
private static final String QUEUE_NAME = "consumerFanout_email";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("郵件消費者啟動");
/** 1.創(chuàng)建新的連接 */
Connection connection = ConnectionUtil.getConnection();
/** 2.創(chuàng)建通道 */
Channel channel = connection.createChannel();
/** 3.消費者關聯(lián)隊列 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 4.消費者綁定交換機 參數(shù)1 隊列 參數(shù)2交換機 參數(shù)3 routingKey */
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消費者獲取生產(chǎn)者消息:" + msg);
}
};
/** 5.消費者監(jiān)聽隊列消息 */
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
代碼補充, channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 中第三個參數(shù)置為空時,可以接收到生產(chǎn)者所有的消息(生產(chǎn)者 routingKey參數(shù)為空時)。
3.運行截圖
先運行兩個消費者,再運行生產(chǎn)者。如果沒有提前將隊列綁定到交換機,那么直接運行生產(chǎn)者的話,消息是不會發(fā)到任何隊列里的。
生產(chǎn)者

消費者
郵件消費者

短信消費者

總結
首先相對于工作模式,發(fā)布訂閱模式引入了交換機的概念,相對其類型上更加靈活廣泛一些。通過上文我們可以總結如下:
1.生產(chǎn)者不是直接操作隊列,而是將數(shù)據(jù)發(fā)送給交換機,由交換機將數(shù)據(jù)發(fā)送給與之綁定的隊列。從不加特定參數(shù)的運行結果中可以看到,兩種類型的消費者(
email,sms)都收到相同數(shù)量的消息。2.必須聲明交換機,并且設置模式:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"),其中fanout指分發(fā)模式(將每一條消息都發(fā)送到與交換機綁定的隊列)。3.隊列必須綁定交換機:
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
生產(chǎn)者發(fā)送消息到交換機,多個消費者聲明多個隊列,與交換機進行綁定,隊列中的消息可以被所有消費者消費,類似于QQ群消息