在前面的教程里,我們改進(jìn)了日志系統(tǒng)。我們用direct交換類(lèi)型代替了fanout交換類(lèi)型,并實(shí)現(xiàn)了可以有選擇性的接收日志。
雖然使用direct類(lèi)型成功的改進(jìn)了我們的系統(tǒng),但是它仍然有一定的局限性——它不能夠基于多個(gè)標(biāo)準(zhǔn)進(jìn)行路由。
在我們的日志系統(tǒng)中,我們可能希望自己不僅僅基于嚴(yán)重性去訂閱日志,我們還應(yīng)該關(guān)注發(fā)出日志的源??赡茉趕yslog unix工具上可以了解到這個(gè)概念,這個(gè)工具根據(jù)“嚴(yán)重性”(info/warn/crit...)和“設(shè)施”(auth/cron/kern..)來(lái)路由(routing)日志。(routing這個(gè)詞我實(shí)在不知道咋翻譯了...)
這將給我們帶來(lái)靈活性,我們想要收到來(lái)自"cron"的錯(cuò)誤,也想要收到來(lái)自"kern"的所有日志。要在我們的日志系統(tǒng)中實(shí)現(xiàn)這一點(diǎn),我們需要學(xué)習(xí)一個(gè)更復(fù)雜的——topic exchange
Topic Exchange
發(fā)送到Topic Exchange的消息,不能是任意的路由鍵。他必須遵守一些規(guī)則:由點(diǎn)分割的單詞列表。單詞可以是任意的東西,但通常他們是與消息有關(guān)的特性。
路由鍵比如:"stock.usd.nyse"、"nyse.vmw"、"quick.orange.rabbit".在路由鍵中可以有很多單詞,最多可以達(dá)到255個(gè)字節(jié)。
綁定鍵也必須使用相同的方式,topic exchange背后的邏輯類(lèi)似于direct exchange——一條帶有特定路由鍵的消息將會(huì)被發(fā)送到能夠匹配綁定鍵的所有隊(duì)列!還有,綁定鍵有兩個(gè)重要的特殊情況:
- “*”可以代替一個(gè)詞。
- “#”可以代替0個(gè)或者多個(gè)單詞。
用圖解釋一下:

在這個(gè)例子中,我們將會(huì)發(fā)送所有描述東西的信息。消息將會(huì)發(fā)送到一個(gè)由三個(gè)單詞組成的路由鍵。路由鍵第一個(gè)詞描述“速度”,第二個(gè)詞描述“顏色”,第三個(gè)詞描述“物種”。
兩個(gè)隊(duì)列,我們創(chuàng)建了三個(gè)綁定:
- Q1綁定".orange."
- Q2綁定" * . * .rabbit" 和 "lazy.#"
這些綁定概括成:
- Q1 對(duì)所有橙色(orange)動(dòng)物感興趣
- Q2 想要聽(tīng)到兔子(rabbit)的一切,也想聽(tīng)到懶惰(lazy)動(dòng)物的一切~
如果一條消息的路由鍵為"quick.orange.rabbit",他將被傳到兩個(gè)隊(duì)列中去。
如果消息路由鍵是"lazy.orange.elephant",也會(huì)被傳遞到兩個(gè)隊(duì)列中去。
如果消息路由鍵是"quick.orange.fox"將會(huì)只被發(fā)送到Q1隊(duì)列
如果消息路由鍵是"lazy.brown.fox"將會(huì)只被發(fā)送到Q2隊(duì)列
如果消息路由鍵是"lazy.pink.rabbit"它將只被發(fā)送到Q2隊(duì)列,而且只有一次!
如果消息路由鍵是"quick.brown.fox"它沒(méi)有匹配任何的綁定,所以他會(huì)被丟棄。
如果我們違反了設(shè)置,用一個(gè)單詞或者四個(gè)單詞發(fā)送一條信息。
比如“orange”或”quick.orange.male.rabbit“會(huì)怎么樣???
答案是,這些消息不匹配任何綁定,將會(huì)丟失。
但是!??!如果是“l(fā)azy.orange.male.rabbit”,盡管他是四個(gè)單詞,但是它和Q2的綁定匹配。所以它將會(huì)被傳送到第二個(gè)隊(duì)列。
Topic exchange 小知識(shí)
Topic exchange很好很強(qiáng)大,行為可以表現(xiàn)的和其他的交換類(lèi)型一樣,也可以有自己的擴(kuò)展。
當(dāng)隊(duì)列與"#"綁定鍵綁定的時(shí)候,它將會(huì)化身為fanout類(lèi)型,接受所有消息。我們就可以代替fanout類(lèi)型進(jìn)行使用了。
當(dāng)特殊字符"*"和"#"沒(méi)有在綁定中使用的話,那么topic exchange就會(huì)化身為direct exchange。
嗯~~~topic完成雙殺!
完整代碼
我們將在我們的日志系統(tǒng)里使用一個(gè)topic exchange。我們將會(huì)從一個(gè)工作的假設(shè)開(kāi)始,也就是日志的路由鍵會(huì)有兩個(gè)詞:"<設(shè)施>"."<嚴(yán)重性>"
還有代碼幾乎和前一期一模一樣!所以注意點(diǎn)小改動(dòng)啦~
EmitLogTopic.java
public class EmitLogTopic {
//設(shè)置交換器的名字
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
//獲取連接
connection = ConnectionUtil.getConnection();
//創(chuàng)建通道
channel = connection.createChannel();
//聲明交換器,給它名字,設(shè)置交換類(lèi)型為direct
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//代碼里手動(dòng)設(shè)置路由鍵——RoutingKey
String routingKey = getRouting(args);
//待傳遞的消息內(nèi)容
String message = getMessage(args);
//發(fā)送消息
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
System.out.println("[x] Sent '"+routingKey+"':'"+message+"'");
}catch (Exception e){
e.printStackTrace();
}finally {
//關(guān)閉連接
if (connection!=null){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private static String getRouting(String[] strings) {
if (strings.length<1){
return "anonymous.info";
}
return strings[0];
}
private static String getMessage(String[] strings) {
if (strings.length<2){
return "hello world";
}
return joinStrings(strings," ",1);
}
private static String joinStrings(String[] strings, String delimiter,int startIndex) {
int length = strings.length;
if (length == 0){
return "";
}
if (length<startIndex){
return "";
}
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex+1 ; i < length; i++){
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
ReceiveLogsTopic.java
public class ReceiveLogsTopic {
//設(shè)置交換器的名字
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException {
Connection connection = null;
Channel channel = null;
//獲取連接
connection = ConnectionUtil.getConnection();
//創(chuàng)建通道
channel = connection.createChannel();
//聲明交換器,給它名字,設(shè)置交換類(lèi)型為topic
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//得到隊(duì)列的名字
String queueName = channel.queueDeclare().getQueue();
//截一下輸入錯(cuò)誤的情況
if (args.length<1){
System.err.println("Usage: ReceiveLogsTopic [binding_key]");
System.exit(1);
}
String wholeSeverity = "";
//根據(jù)輸入,確定程序想要收集的"RoutingKey",進(jìn)行綁定。
for (String bindingKey : args){
channel.queueBind(queueName,EXCHANGE_NAME,bindingKey);
wholeSeverity = wholeSeverity + " " + bindingKey;
}
System.out.println("[*] Waiting for"+wholeSeverity+" message.To exit press CTRL+C");
//消費(fèi)者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8");
System.out.println("[x] Received'"+envelope.getRoutingKey()+"':'"+message+"'");
}
};
channel.basicConsume(queueName,true,consumer);
}
}
這個(gè)太過(guò)于靈活,想給大家展示一下結(jié)果,但是圖太多了。。。所以給大家貼一下官方的圖,結(jié)合我前面幾期寫(xiě)的就能得到結(jié)果!請(qǐng)大家請(qǐng)自行測(cè)試?(? ? ??)