注:這是RabbitMQ-java版Client的指導教程翻譯系列文章,歡迎大家批評指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介紹隊列的使用
第三篇Publish/Subscribe介紹轉換器以及其中fanout類型
第四篇Routing介紹direct類型轉換器
第五篇Topics介紹topic類型轉換器
第六篇RPC介紹遠程調用
在上一篇指導教程中,我們創(chuàng)建了一個日志系統(tǒng),可以把日志消息廣播給很多接受者。
在這篇指導教程中,我們需要添加一個功能:可以訂閱消息的一部分。例如:我們會直接將嚴重的錯誤信息生成日志文件(保存在空余的磁盤上),但是依然會把所有的日志信息顯示在控制臺。
綁定(Bindings)
在上篇指導教程的例子中,我們已經創(chuàng)建過綁定的實例,你可能會覺得跟下面的代碼類似:
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定的含義是轉換器和隊列之間的一種關聯(lián),通俗來說就是一個隊列對這個轉換器中的消息感興趣。
綁定可以帶有一個參數:routingKey。為了避免和basic_publish中的參數產生困惑,我們將這個參數叫著binding key(綁定鑰匙),下面是我們創(chuàng)建一個帶有鑰匙的綁定。
channel.queueBind(queueName, EXCHANGE_NAME, "black");
這個綁定鑰匙的意思取決于轉換器的類型,如果是我們之前使用的fanout類型轉換器,那么會忽略綁定鑰匙的意義。
直接轉換器(Direct exchange)
在上篇指導教程中,我們的日志系統(tǒng)會廣播消息給所有綁定轉換器的消費者?,F(xiàn)在我們擴展一下:根據消息的級別來過濾消息。舉例來說,我們想一個應用只接受嚴重級別的消息并且寫入到磁盤里,就不用浪費磁盤空間去保存警告或者信息日志的消息。
如果使用fanout轉換器,那樣就沒有什么靈活性,不停的愚蠢的廣播。
可以使用direct轉換器,它的路由選擇的算法是容易理解,一個消息之所以到這個隊列中去,是因為隊列的binding Key和發(fā)出消息的routingkey相匹配。
為了說明這個問題,看下下面的結構:

這張結構圖中,可以看到有兩個隊列綁定著類型為direct的轉換器,第一個隊列綁定鑰匙為orange,第二個綁定鑰匙有兩個:一個是black另一個是green。
在上面的結構圖中,一個帶有routingkey為orange的消息發(fā)送給轉換器將會被發(fā)送到隊列Q1中,帶有routing Key為black和green將會被發(fā)送給到隊列Q2中,其他所有的消息將會被清除。
多重綁定(Multiple bindings)

多個隊列擁有相同的binding key是完全合規(guī)的,上圖中我們可以在轉換器x和帶有bindingkey為black的隊列Q1建立綁定關系。在這種情況下,direct類型的轉換器具有fanout類型的一樣特性,可以廣播給所有匹配的隊列消息。一個routingkey為black的消息將會被發(fā)送到Q1和Q2兩個隊列中。
發(fā)送消息(Emitting logs)
我們使用這種模型應用到日志系統(tǒng)上,發(fā)送給消息給direct而不是fanout類型的轉換器。將以日志嚴重等級作為routing key。按照那種方式,消費者應用將會選擇接受日志的嚴重等級的消息。首先我們先發(fā)送消息。
總是一樣的,先聲明一個轉換器:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
準備好發(fā)送消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
為了方面,我們假設等級分為三種:info,warning,error。
訂閱(Subscribing)
只要像上篇指導教程中接受消息就可以,有一個不同的地方就是:我們可以去創(chuàng)建任何一個等級的綁定。
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
綜合

下面是EmitLogDirect.java類,這里下載:
import com.rabbitmq.client.*;
import java.io.IOException;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv); //獲取日志等級
String message = getMessage(argv); //獲取消息
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close();
connection.close();
}
//..
}
下面是ReceiveLogsDirect.java類,這里下載:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
跟以前一樣編譯,運行的時候為了方面,我們使用環(huán)境便來個$CP作為路徑配置:
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
如果你想把warning和error(而不是info)類型的日消息保存到文件中,只需要打開一個控制臺和記錄:
java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
如果你想在你的屏幕上看到所有的日志消息,新打開一個終端和查看就可以:
java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C
舉個例子,發(fā)送一個error類型的日志消息:
java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
第四節(jié)的內容大致翻譯完了,這里是原文鏈接。接著進入下一節(jié):Topics。
終篇是我對RabbitMQ使用理解的總結文章,歡迎討教。
--謝謝--