這篇指南是關(guān)于java版RabbitMQ的客戶端接口,而不是被分為好幾個章節(jié)的初級教程。
在編譯和運行期,5.x正式版本需要依賴jdk8庫,這意味著,Android系統(tǒng)需要等于或大于7.0版本才被支持使用5.x正式版。4.x正式版本支持jdk6,以及Android系統(tǒng)版本小于7.0才可使用。
該庫是一個開源項目,支持下面三種許可證:
這意味著用戶可以考慮在以上的許可證下聲明其它任何的許可證。舉例說明,用戶選擇Apache Public License2.0,可以將該庫使用在商業(yè)產(chǎn)品中。在GPLv2許可證下代碼庫被聲明,等等。
該庫的API文檔是被分開的。
這里有一些命令行工具,用于支持運行java版客戶端。
客戶端API幾乎都是以AMQP0-9-1協(xié)議說明為模型,增加的抽象是為了簡單使用。
Overview-預(yù)覽
java版RabbitMQ客戶端使用com.rabbitmq.client作為頂級包名,核心的類和接口為:
- Channel:通道
- Connection:連接
- ConnectionFactory:連接工廠
- Consumer:消費者
操作協(xié)議主要是使用Channel接口,Connection被用來創(chuàng)建channels,記錄連接生命周期的事件處理者,并且不需要時關(guān)閉連接。ConnectionFactory可以初始化Connections,是可以配置連接參數(shù),像vhost或者username。
Connections and Channels-連接和通道
核心的兩個類是Connection和Channel,表示遵守AMQP 0-9-1協(xié)議的connection和channel。在使用之前需要分別引入:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
Connecting to a broker-連接一個中間件
下面的代碼使用給定的參數(shù)連接到一個AMQP協(xié)議的中間件(參數(shù):主機IP,端口號等等):
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
所有的參數(shù)和服務(wù)端運行在本地需要的參數(shù)是一致的。
另外,也可以使用uri的方式進(jìn)行連接:
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();
所有的參數(shù)和服務(wù)端運行在本地需要的參數(shù)是一致的。
Connection接口可以被用作創(chuàng)建channel:
Channel channel = conn.createChannel();
這個channel可以用來發(fā)送和接受消息,就像之后章節(jié)描述的那樣。
斷開連接的話,僅需要關(guān)閉channel和connection:
channel.close();
conn.close();
說明:關(guān)閉channel被認(rèn)為是一種好的習(xí)慣,但是并不是嚴(yán)格必須的。當(dāng)?shù)撞縞onnection被關(guān)閉時,所有的channel將一定會自動結(jié)束。
Using Exchange and Queues-使用轉(zhuǎn)換器和隊列
客戶端應(yīng)用需要用到exchanges和queues,對于高級別創(chuàng)建的MAQP塊,在使用之前,它們一定要被聲明。聲明任何類型對象僅僅是為了保證它的存在,如果必要的話就創(chuàng)建它。
繼續(xù)上面的例子,下面的代碼聲明了一個轉(zhuǎn)換器和一個隊列,并且將它們綁定在一起:
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
這將聲明下面的對象,通過使用條件的參數(shù)定制的對象,目前是只有兩個特別指定的參數(shù):
- 一個有持久性,不會自動刪除,direct類型的裝換器
- 一個臨時的,一個客戶端獨有的,會自動刪除,并且自動生成名字的隊列
上面的方法被調(diào)用后會通過給定的路由Key綁定隊列到轉(zhuǎn)換器上。
注明:這是一種特殊的方式去聲明一個隊列,因為只有一個客戶需要使用它,它不需要一個明確的名字,當(dāng)沒有客戶端使用它時,它將自動被清除掉。如果有好幾個客戶端想要共用一個名字明確的隊列,下面的代碼將會有效:
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
這將會聲明:
- 一個持久化,不會自動刪除,direct類型的轉(zhuǎn)換器
- 一個持久化,不是獨有的,不會自動刪除,有明確名字的隊列
說明:Channel接口中所有的方法很多都是重載的,這些方便簡短的方法使用是非常明智的,如exchangeDeclare,queueDeclare和queueBind。也有一些帶有許多參數(shù)的較長形式,如有必要的話可以重寫這些默認(rèn)的方法。只要需要就給你足夠的權(quán)限。
"short form , long form"模型被運用在客戶端API的始終。
Publishing messages-發(fā)布消息
發(fā)布消息到裝換器,使用Channel.basicPublish,如下:
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
為了更好的控制,你可以重載變量,指定mandatory標(biāo)記,或者發(fā)送設(shè)置好消息屬性的消息:
channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
發(fā)送消息的一些屬性:分發(fā)模式2,1等級和內(nèi)容類型“text/plain”。你可以創(chuàng)建你自己的消息屬性對象,使用Builder類創(chuàng)建你所需要的屬性,例如:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("bob")
.build()),
messageBodyBytes);
使用自定義的頭文件發(fā)布消息,如下:
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build()),
messageBodyBytes);
發(fā)送有時間期限的消息,如下:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build()),
messageBodyBytes);
在這里我們不會闡述所有的情況。
注明:BasicProperties是AMQP類的內(nèi)部類,自動持有外部類。
如果一個資源驅(qū)動的警報生效,那么通道basicpublish的調(diào)用最終會被阻塞。
Channels and Concurrency Considerations(Thread safety)-通道和并發(fā)的考慮(線程安全)
首要的規(guī)則是,避免在多個線程間共享一個Channel對象。應(yīng)用傾向于每個線程中使用一個Channel,而不是多個線程中使用有一個Channel。
并發(fā)的情況有一些操作是安全的,但是有一些是不安全的,將會導(dǎo)致錯誤的框架,游離于火線上,如兩次消息響應(yīng)等。
并發(fā)的發(fā)布在一個共享的Channel中,會導(dǎo)致錯誤的框架,觸發(fā)一個連接等級協(xié)議的錯誤,然后連接關(guān)閉。因為它在應(yīng)用代碼中需要并確實異步的(Channel#basicPublis一定得在嚴(yán)格的區(qū)域中調(diào)用)。在多個線程中共享Channels需要Publisher Confirms。我們強烈建議避免并發(fā)發(fā)送到一個共享的Channel中。
在一個線程中消費和在另一個線程中發(fā)送使用共同一個Channel是安全的。
服務(wù)器分發(fā)消息,保證每個Channel按照順序保護(hù),這個分發(fā)的機制使用java.util.concurrent.ExecutorService。每次連接,提供這個自定義的執(zhí)行者可以被共享,簡單的通過ConnectionFactory#setSharedExecutor的setter方法即可創(chuàng)建Connections。
當(dāng)應(yīng)答機制被使用時,考慮在什么線程中使用應(yīng)答機制很重要,從接受線程將會不一樣(Consumer#handleDelivery授權(quán)分開給不同的線程),在多個參數(shù)中設(shè)置為true的應(yīng)答機制將會是不安全,并且回到時兩次應(yīng)答響應(yīng)的后果,因此會發(fā)生一個Channel-level協(xié)議的錯誤,然后關(guān)閉Channel。只有一次應(yīng)答一個消息才是安全的。
Receiving Messages By Subscription("push API")-通過訂閱接受消息
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
最有效率的接受消息的方式是通過Consumer接口建立訂閱,一有消息將自動被分發(fā)出去,而不需要確切的請求。
個人的訂閱總是根據(jù)他們消費的標(biāo)簽來獲取的。一個消費的標(biāo)簽是消費的標(biāo)識符,它可以是客戶端或者服務(wù)端生成的。為了讓RabbitMQ生成一個廣泛的并且獨一無二的標(biāo)簽,重寫Channel#basicConsume方法,因為它本身不帶有一個Consumer標(biāo)簽參數(shù)或者傳遞一個空的字符串當(dāng)作consumer標(biāo)簽,然后可以使用Channel#basicConsume方法的返回值。消費標(biāo)簽可以用來取消消費者。
不同的消費者對象一定有不同的消費者標(biāo)簽。在連接中有重復(fù)的消費者標(biāo)簽是強烈不建議這么做的,因為在自動恢復(fù)連接過程中會導(dǎo)致問題并且消費者被監(jiān)聽時會混淆了監(jiān)聽數(shù)據(jù)。
最簡單的方式去實現(xiàn)一個Consumer的子類DefautlConsumer,這個子類對象可以被CasicConsumer調(diào)用建立監(jiān)聽。
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
這里,我們特別指出autoAck=false。這是必須的:確保消息分發(fā)給了消費者。在handleDelivery方法中可以方面的做大部分事情。正如說明,更多有經(jīng)驗的消費者會重寫更多的方法,尤其是當(dāng)Channels和connection關(guān)閉時調(diào)用handleShutdownSignal方法;在Consumer的其它回調(diào)方法調(diào)用之前,驗證消費標(biāo)簽handleConsumeOk將會先被調(diào)用。
消費者也可以實現(xiàn)handleCancelOk和handleCancel的方法,各自表示明確的取消和含蓄的取消。
你可以含蓄的取消消費者通過Channel.basicCancel:
channel.basicCancel(consumerTag);
傳遞消費者標(biāo)簽即可。
就好像發(fā)布者一樣,考慮并發(fā)給消費者帶來的安全風(fēng)險是非常重要的。
消費者的回調(diào)方法是在一個線程池中調(diào)用,和初始化Channel的線程是分開的。這意味著在連接的過程中,消費者可以安全的調(diào)用塊級的方法,例如Channel#queueDeclare或者Channel#basicCancel。
每個Channel都有它自己分發(fā)的線程。對于大多數(shù)情況是一個Channel對應(yīng)一個Consumer,這意味著消費者不會攔截其它的消費者。如果你的一個Channel有多個消費者,應(yīng)該注意一個長時間運行的Consumer可能會攔截這個Channel上其它消費者的回調(diào)方法。
請參考并發(fā)的情況(線程安全)并發(fā)和相關(guān)并發(fā)安全的主題章節(jié)。
Retrieving Individual Messages("Pull API")-檢索自己的消息
為了明確的檢索消息, 使用Channel.basicGet方法。該方法返回的值是一個GetResponse對象,從中可以提取出請求頭中信息和消息本身:
boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
...
并且autoAck=false,你必須調(diào)用Channel.basicAck方法確認(rèn)你已經(jīng)成功收到了消息:
...
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}
Handing unroutable messages-處理沒有路由的消息
如果帶有強制性標(biāo)簽的消息被發(fā)布出去,但是無法被路由,這個中間件將會把消息發(fā)回給發(fā)送端(通過AMQP.Basic.Return命令)。
為了得到這樣返回值的通知,客戶端可以實現(xiàn)ReturnListener接口并且調(diào)用Channel.addReturnListener方法。如果客戶端沒有為這個特別的channel配置返回監(jiān)聽,那么相關(guān)聯(lián)的返回消息將默認(rèn)被丟棄。
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
...
}
});
返回監(jiān)聽將會被調(diào)用。比如, 一個客戶端發(fā)布一條帶有強制標(biāo)識的消息給'direct'類型的裝換器,但是該裝換器沒有綁定一個隊列。
Shutdown Protocol-關(guān)閉協(xié)議
Overview of the AMQP client shutdown-關(guān)閉AMQP客戶端的預(yù)覽
AMQP 0-9-1協(xié)議下的connection和channel都有相同的方式去管理網(wǎng)絡(luò)異常,內(nèi)部異常和明確的本地關(guān)閉。
AMQP 0-9-1中connection和channel有下面的生命周期狀態(tài):
- open:對象準(zhǔn)備使用
- closing:對象已經(jīng)被通知關(guān)閉,有一個關(guān)閉的請求對所有底層對象都有效,并且等待它們關(guān)閉的完成。
- closed:這個對象已經(jīng)從所有底層對象中接收到所有關(guān)閉完成的通知,然后以關(guān)閉自己為結(jié)果
這些對象結(jié)尾都是關(guān)閉狀態(tài),不管被關(guān)閉的理由,就好像一個應(yīng)用請求,一個內(nèi)部客戶端庫異常,一個遠(yuǎn)程網(wǎng)絡(luò)請求或者網(wǎng)路異常。
AMQP中connection和channel對象擁有下面相關(guān)關(guān)閉的方法:
- addShutdownListener(ShutdownListener listener)和removeShutdownListener(ShutdownListener listener)。為了管理所有的監(jiān)聽,當(dāng)connection和channel對象轉(zhuǎn)換到關(guān)閉狀態(tài)時,所有的監(jiān)聽都會被清除。注意,添加一個ShutdownListener到一個已經(jīng)被關(guān)閉的對象中將立刻會被清除該監(jiān)聽。
- getCloseReason(),允許調(diào)查到底是什么 原因?qū)е聦ο蟊魂P(guān)閉
- isOpen(),用于測試對象是否是打開的狀態(tài)
- close(int closeCode,String closeMessage),明確通知對象需要關(guān)閉
一些簡單的監(jiān)聽可能會像下面那樣:
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause)
{
...
}
});
information about the circumstances of a shutdown-關(guān)閉環(huán)境的信息
檢索到ShutdownSignalException,它包含所有關(guān)閉原因的信息,當(dāng)然也可以明確的調(diào)用getCloseReason()方法,它在ShutdownListener類中的service(ShutdownSignalException cause)方法使用參數(shù)cause來調(diào)用。
這個ShutdownSignalException類提供了一個分析關(guān)閉原因的方法,通過調(diào)用isHardError()方法,我們可以獲取到是否連接或者通道有錯誤的信息,通過getResson()方法返回原因的信息。在AMQP方法表格中-不管是AMQP.Channel.Close 或者 AMQP.Connection.Close(如果在庫中原因是一些錯誤是會返回null,例如網(wǎng)絡(luò)連接失敗,這種情況下發(fā)生的錯誤可以通過getCause()獲取到錯誤 ):
public void shutdownCompleted(ShutdownSignalException cause)
{
if (cause.isHardError())
{
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication())
{
Method reason = cause.getReason();
...
}
...
} else {
Channel ch = (Channel)cause.getReference();
...
}
}
Atomicity and use of the isOpen() method-原子和使用isOPen()方法
channel和connection對象在生產(chǎn)環(huán)境的代碼中是不推薦使用isOpen()方法。因為這個方法返回的值是依賴于關(guān)閉原因,下面的代碼將會闡述一些可能的情況:
public void brokenMethod(Channel channel)
{
if (channel.isOpen())
{
// The following code depends on the channel being in open state.
// However there is a possibility of the change in the channel state
// between isOpen() and basicQos(1) call
...
channel.basicQos(1);
}
}
相反,我們應(yīng)該忽略這樣的檢查,簡單的嘗試行為 。如果連接的通道關(guān)閉了,代碼報出了錯誤。一個ShutdownSignalException將會被拋出表示對象處在一個無效的狀態(tài)。我們因該通過try-catch到IOException,或者SocketException。當(dāng)中間件意外的關(guān)閉了連接或者報出ShutdownSignalException,中間件會自動清除關(guān)閉:
public void validMethod(Channel channel)
{
try {
...
channel.basicQos(1);
} catch (ShutdownSignalException sse) {
// possibly check if channel was closed
// by the time we started action and reasons for
// closing it
...
} catch (IOException ioe) {
// check why connection was closed
...
}
}
接著下一篇文章:java-RabbitMQ指導(dǎo)2