java-RabbitMQ指導(dǎo)1#前山翻譯

這篇指南是關(guān)于java版RabbitMQ的客戶端接口,而不是被分為好幾個章節(jié)的初級教程

在編譯和運行期,5.x正式版本需要依賴jdk8庫,這意味著,Android系統(tǒng)需要等于或大于7.0版本才被支持使用5.x正式版。4.x正式版本支持jdk6,以及Android系統(tǒng)版本小于7.0才可使用。

該庫是一個開源項目,支持下面三種許可證:

這意味著用戶可以考慮在以上的許可證下聲明其它任何的許可證。舉例說明,用戶選擇Apache Public License2.0,可以將該庫使用在商業(yè)產(chǎn)品中。在GPLv2許可證下代碼庫被聲明,等等。

該庫的API文檔是被分開的。

這里有一些命令行工具,用于支持運行java版客戶端。

客戶端API幾乎都是以AMQP0-9-1協(xié)議說明為模型,增加的抽象是為了簡單使用。

Overview-預(yù)覽

java版RabbitMQ客戶端使用com.rabbitmq.client作為頂級包名,核心的類和接口為:

  • Channel:通道
  • Connection:連接
  • ConnectionFactory:連接工廠
  • Consumer:消費者

操作協(xié)議主要是使用Channel接口,Connection被用來創(chuàng)建channels,記錄連接生命周期的事件處理者,并且不需要時關(guān)閉連接。ConnectionFactory可以初始化Connections,是可以配置連接參數(shù),像vhost或者username。

Connections and Channels-連接和通道

核心的兩個類是Connection和Channel,表示遵守AMQP 0-9-1協(xié)議的connection和channel。在使用之前需要分別引入:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
    

Connecting to a broker-連接一個中間件

下面的代碼使用給定的參數(shù)連接到一個AMQP協(xié)議的中間件(參數(shù):主機IP,端口號等等):

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();

所有的參數(shù)和服務(wù)端運行在本地需要的參數(shù)是一致的。

另外,也可以使用uri的方式進(jìn)行連接:

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();

所有的參數(shù)和服務(wù)端運行在本地需要的參數(shù)是一致的。

Connection接口可以被用作創(chuàng)建channel:

Channel channel = conn.createChannel();

這個channel可以用來發(fā)送和接受消息,就像之后章節(jié)描述的那樣。

斷開連接的話,僅需要關(guān)閉channel和connection:

channel.close();
conn.close();

說明:關(guān)閉channel被認(rèn)為是一種好的習(xí)慣,但是并不是嚴(yán)格必須的。當(dāng)?shù)撞縞onnection被關(guān)閉時,所有的channel將一定會自動結(jié)束。

Using Exchange and Queues-使用轉(zhuǎn)換器和隊列

客戶端應(yīng)用需要用到exchanges和queues,對于高級別創(chuàng)建的MAQP塊,在使用之前,它們一定要被聲明。聲明任何類型對象僅僅是為了保證它的存在,如果必要的話就創(chuàng)建它。

繼續(xù)上面的例子,下面的代碼聲明了一個轉(zhuǎn)換器和一個隊列,并且將它們綁定在一起:

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

這將聲明下面的對象,通過使用條件的參數(shù)定制的對象,目前是只有兩個特別指定的參數(shù):

  • 一個有持久性,不會自動刪除,direct類型的裝換器
  • 一個臨時的,一個客戶端獨有的,會自動刪除,并且自動生成名字的隊列

上面的方法被調(diào)用后會通過給定的路由Key綁定隊列到轉(zhuǎn)換器上。

注明:這是一種特殊的方式去聲明一個隊列,因為只有一個客戶需要使用它,它不需要一個明確的名字,當(dāng)沒有客戶端使用它時,它將自動被清除掉。如果有好幾個客戶端想要共用一個名字明確的隊列,下面的代碼將會有效:

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

這將會聲明:

  • 一個持久化,不會自動刪除,direct類型的轉(zhuǎn)換器
  • 一個持久化,不是獨有的,不會自動刪除,有明確名字的隊列

說明:Channel接口中所有的方法很多都是重載的,這些方便簡短的方法使用是非常明智的,如exchangeDeclare,queueDeclare和queueBind。也有一些帶有許多參數(shù)的較長形式,如有必要的話可以重寫這些默認(rèn)的方法。只要需要就給你足夠的權(quán)限。

"short form , long form"模型被運用在客戶端API的始終。

Publishing messages-發(fā)布消息

發(fā)布消息到裝換器,使用Channel.basicPublish,如下:

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

為了更好的控制,你可以重載變量,指定mandatory標(biāo)記,或者發(fā)送設(shè)置好消息屬性的消息:

channel.basicPublish(exchangeName, routingKey, mandatory,
         MessageProperties.PERSISTENT_TEXT_PLAIN,
         messageBodyBytes);

發(fā)送消息的一些屬性:分發(fā)模式2,1等級和內(nèi)容類型“text/plain”。你可以創(chuàng)建你自己的消息屬性對象,使用Builder類創(chuàng)建你所需要的屬性,例如:

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .contentType("text/plain")
               .deliveryMode(2)
               .priority(1)
               .userId("bob")
               .build()),
               messageBodyBytes);

使用自定義的頭文件發(fā)布消息,如下:

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude",  51.5252949);
headers.put("longitude", -0.0905493);

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .headers(headers)
               .build()),
               messageBodyBytes);

發(fā)送有時間期限的消息,如下:

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .expiration("60000")
               .build()),
               messageBodyBytes);

在這里我們不會闡述所有的情況。

注明:BasicProperties是AMQP類的內(nèi)部類,自動持有外部類。

如果一個資源驅(qū)動的警報生效,那么通道basicpublish的調(diào)用最終會被阻塞。

Channels and Concurrency Considerations(Thread safety)-通道和并發(fā)的考慮(線程安全)

首要的規(guī)則是,避免在多個線程間共享一個Channel對象。應(yīng)用傾向于每個線程中使用一個Channel,而不是多個線程中使用有一個Channel。

并發(fā)的情況有一些操作是安全的,但是有一些是不安全的,將會導(dǎo)致錯誤的框架,游離于火線上,如兩次消息響應(yīng)等。

并發(fā)的發(fā)布在一個共享的Channel中,會導(dǎo)致錯誤的框架,觸發(fā)一個連接等級協(xié)議的錯誤,然后連接關(guān)閉。因為它在應(yīng)用代碼中需要并確實異步的(Channel#basicPublis一定得在嚴(yán)格的區(qū)域中調(diào)用)。在多個線程中共享Channels需要Publisher Confirms。我們強烈建議避免并發(fā)發(fā)送到一個共享的Channel中。

在一個線程中消費和在另一個線程中發(fā)送使用共同一個Channel是安全的。

服務(wù)器分發(fā)消息,保證每個Channel按照順序保護(hù),這個分發(fā)的機制使用java.util.concurrent.ExecutorService。每次連接,提供這個自定義的執(zhí)行者可以被共享,簡單的通過ConnectionFactory#setSharedExecutor的setter方法即可創(chuàng)建Connections。

當(dāng)應(yīng)答機制被使用時,考慮在什么線程中使用應(yīng)答機制很重要,從接受線程將會不一樣(Consumer#handleDelivery授權(quán)分開給不同的線程),在多個參數(shù)中設(shè)置為true的應(yīng)答機制將會是不安全,并且回到時兩次應(yīng)答響應(yīng)的后果,因此會發(fā)生一個Channel-level協(xié)議的錯誤,然后關(guān)閉Channel。只有一次應(yīng)答一個消息才是安全的。

Receiving Messages By Subscription("push API")-通過訂閱接受消息

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

最有效率的接受消息的方式是通過Consumer接口建立訂閱,一有消息將自動被分發(fā)出去,而不需要確切的請求。

個人的訂閱總是根據(jù)他們消費的標(biāo)簽來獲取的。一個消費的標(biāo)簽是消費的標(biāo)識符,它可以是客戶端或者服務(wù)端生成的。為了讓RabbitMQ生成一個廣泛的并且獨一無二的標(biāo)簽,重寫Channel#basicConsume方法,因為它本身不帶有一個Consumer標(biāo)簽參數(shù)或者傳遞一個空的字符串當(dāng)作consumer標(biāo)簽,然后可以使用Channel#basicConsume方法的返回值。消費標(biāo)簽可以用來取消消費者。

不同的消費者對象一定有不同的消費者標(biāo)簽。在連接中有重復(fù)的消費者標(biāo)簽是強烈不建議這么做的,因為在自動恢復(fù)連接過程中會導(dǎo)致問題并且消費者被監(jiān)聽時會混淆了監(jiān)聽數(shù)據(jù)。

最簡單的方式去實現(xiàn)一個Consumer的子類DefautlConsumer,這個子類對象可以被CasicConsumer調(diào)用建立監(jiān)聽。

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.getContentType();
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });

這里,我們特別指出autoAck=false。這是必須的:確保消息分發(fā)給了消費者。在handleDelivery方法中可以方面的做大部分事情。正如說明,更多有經(jīng)驗的消費者會重寫更多的方法,尤其是當(dāng)Channels和connection關(guān)閉時調(diào)用handleShutdownSignal方法;在Consumer的其它回調(diào)方法調(diào)用之前,驗證消費標(biāo)簽handleConsumeOk將會先被調(diào)用。

消費者也可以實現(xiàn)handleCancelOk和handleCancel的方法,各自表示明確的取消和含蓄的取消。

你可以含蓄的取消消費者通過Channel.basicCancel:

channel.basicCancel(consumerTag);

傳遞消費者標(biāo)簽即可。

就好像發(fā)布者一樣,考慮并發(fā)給消費者帶來的安全風(fēng)險是非常重要的。

消費者的回調(diào)方法是在一個線程池中調(diào)用,和初始化Channel的線程是分開的。這意味著在連接的過程中,消費者可以安全的調(diào)用塊級的方法,例如Channel#queueDeclare或者Channel#basicCancel。

每個Channel都有它自己分發(fā)的線程。對于大多數(shù)情況是一個Channel對應(yīng)一個Consumer,這意味著消費者不會攔截其它的消費者。如果你的一個Channel有多個消費者,應(yīng)該注意一個長時間運行的Consumer可能會攔截這個Channel上其它消費者的回調(diào)方法。

請參考并發(fā)的情況(線程安全)并發(fā)和相關(guān)并發(fā)安全的主題章節(jié)。

Retrieving Individual Messages("Pull API")-檢索自己的消息

為了明確的檢索消息, 使用Channel.basicGet方法。該方法返回的值是一個GetResponse對象,從中可以提取出請求頭中信息和消息本身:

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
    // No message retrieved.
} else {
    AMQP.BasicProperties props = response.getProps();
    byte[] body = response.getBody();
    long deliveryTag = response.getEnvelope().getDeliveryTag();
    ...

并且autoAck=false,你必須調(diào)用Channel.basicAck方法確認(rèn)你已經(jīng)成功收到了消息:

 ...
    channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}

Handing unroutable messages-處理沒有路由的消息

如果帶有強制性標(biāo)簽的消息被發(fā)布出去,但是無法被路由,這個中間件將會把消息發(fā)回給發(fā)送端(通過AMQP.Basic.Return命令)。

為了得到這樣返回值的通知,客戶端可以實現(xiàn)ReturnListener接口并且調(diào)用Channel.addReturnListener方法。如果客戶端沒有為這個特別的channel配置返回監(jiān)聽,那么相關(guān)聯(lián)的返回消息將默認(rèn)被丟棄。

channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int replyCode,
                                  String replyText,
                                  String exchange,
                                  String routingKey,
                                  AMQP.BasicProperties properties,
                                  byte[] body)
    throws IOException {
        ...
    }
});

返回監(jiān)聽將會被調(diào)用。比如, 一個客戶端發(fā)布一條帶有強制標(biāo)識的消息給'direct'類型的裝換器,但是該裝換器沒有綁定一個隊列。

Shutdown Protocol-關(guān)閉協(xié)議

Overview of the AMQP client shutdown-關(guān)閉AMQP客戶端的預(yù)覽

AMQP 0-9-1協(xié)議下的connection和channel都有相同的方式去管理網(wǎng)絡(luò)異常,內(nèi)部異常和明確的本地關(guān)閉。

AMQP 0-9-1中connection和channel有下面的生命周期狀態(tài):

  • open:對象準(zhǔn)備使用
  • closing:對象已經(jīng)被通知關(guān)閉,有一個關(guān)閉的請求對所有底層對象都有效,并且等待它們關(guān)閉的完成。
  • closed:這個對象已經(jīng)從所有底層對象中接收到所有關(guān)閉完成的通知,然后以關(guān)閉自己為結(jié)果

這些對象結(jié)尾都是關(guān)閉狀態(tài),不管被關(guān)閉的理由,就好像一個應(yīng)用請求,一個內(nèi)部客戶端庫異常,一個遠(yuǎn)程網(wǎng)絡(luò)請求或者網(wǎng)路異常。

AMQP中connection和channel對象擁有下面相關(guān)關(guān)閉的方法:

  • addShutdownListener(ShutdownListener listener)和removeShutdownListener(ShutdownListener listener)。為了管理所有的監(jiān)聽,當(dāng)connection和channel對象轉(zhuǎn)換到關(guān)閉狀態(tài)時,所有的監(jiān)聽都會被清除。注意,添加一個ShutdownListener到一個已經(jīng)被關(guān)閉的對象中將立刻會被清除該監(jiān)聽。
  • getCloseReason(),允許調(diào)查到底是什么 原因?qū)е聦ο蟊魂P(guān)閉
  • isOpen(),用于測試對象是否是打開的狀態(tài)
  • close(int closeCode,String closeMessage),明確通知對象需要關(guān)閉

一些簡單的監(jiān)聽可能會像下面那樣:

import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
    public void shutdownCompleted(ShutdownSignalException cause)
    {
        ...
    }
});

information about the circumstances of a shutdown-關(guān)閉環(huán)境的信息

檢索到ShutdownSignalException,它包含所有關(guān)閉原因的信息,當(dāng)然也可以明確的調(diào)用getCloseReason()方法,它在ShutdownListener類中的service(ShutdownSignalException cause)方法使用參數(shù)cause來調(diào)用。

這個ShutdownSignalException類提供了一個分析關(guān)閉原因的方法,通過調(diào)用isHardError()方法,我們可以獲取到是否連接或者通道有錯誤的信息,通過getResson()方法返回原因的信息。在AMQP方法表格中-不管是AMQP.Channel.Close 或者 AMQP.Connection.Close(如果在庫中原因是一些錯誤是會返回null,例如網(wǎng)絡(luò)連接失敗,這種情況下發(fā)生的錯誤可以通過getCause()獲取到錯誤 ):

public void shutdownCompleted(ShutdownSignalException cause)
{
  if (cause.isHardError())
  {
    Connection conn = (Connection)cause.getReference();
    if (!cause.isInitiatedByApplication())
    {
      Method reason = cause.getReason();
      ...
    }
    ...
  } else {
    Channel ch = (Channel)cause.getReference();
    ...
  }
}

Atomicity and use of the isOpen() method-原子和使用isOPen()方法

channel和connection對象在生產(chǎn)環(huán)境的代碼中是不推薦使用isOpen()方法。因為這個方法返回的值是依賴于關(guān)閉原因,下面的代碼將會闡述一些可能的情況:

public void brokenMethod(Channel channel)
{
    if (channel.isOpen())
    {
        // The following code depends on the channel being in open state.
        // However there is a possibility of the change in the channel state
        // between isOpen() and basicQos(1) call
        ...
        channel.basicQos(1);
    }
}

相反,我們應(yīng)該忽略這樣的檢查,簡單的嘗試行為 。如果連接的通道關(guān)閉了,代碼報出了錯誤。一個ShutdownSignalException將會被拋出表示對象處在一個無效的狀態(tài)。我們因該通過try-catch到IOException,或者SocketException。當(dāng)中間件意外的關(guān)閉了連接或者報出ShutdownSignalException,中間件會自動清除關(guān)閉:

public void validMethod(Channel channel)
{
    try {
        ...
        channel.basicQos(1);
    } catch (ShutdownSignalException sse) {
        // possibly check if channel was closed
        // by the time we started action and reasons for
        // closing it
        ...
    } catch (IOException ioe) {
        // check why connection was closed
        ...
    }
}

接著下一篇文章:java-RabbitMQ指導(dǎo)2

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html,并沒有及時更新。 術(shù)語對...
    joyenlee閱讀 7,805評論 0 3
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評論 19 139
  • 接著上一篇文章:java-RabbitMQ指導(dǎo)1 Advanced Connection options-連接的高...
    前山飯店閱讀 1,077評論 0 1
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,513評論 2 34
  • 什么叫消息隊列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單,比如只包含文本字符串,也可以更復(fù)雜...
    lijun_m閱讀 1,415評論 0 1

友情鏈接更多精彩內(nèi)容