接著上一篇文章:java-RabbitMQ指導1
Advanced Connection options-連接的高級選項
Consumer thread pool-消費者線程池
消費線程會被默認自動分配在一個新的ExeutorService線程池中。如果想要更好的控制就需要提供一個ExecutorService給new Connection()方法,那么默認使用的線程池將會被替換使用。這里有一個例子,提供一個比默認分配的更大的線程池:
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
Executors和ExecutorService類都是在java.util.concurrent包中。
當connection關閉時,默認的ExecutorService會被關閉。但是用戶提供的ExecutorService(或者類似的)將不會被自動關閉。所以最后客戶端都必須保證它已經被關閉(通過調用它自身shutdown()方法),否則該線程池將會阻止虛擬機的結束。
同一個ExecutorService可以在多個連接之間共享使用,或者重復連接時重復使用,但是一旦它被關閉將不能使用。
使用這項特性應該考慮的是:在消費者回調的過程中,是否存在性能上的瓶頸。如果沒有一個消費者回調或者說很少很少,默認的分配會有大量的剩余。使用的是很小的,總的線程資源分配是有界限的,即使偶爾有爆發(fā)性的消費情況發(fā)生。
Using Lists of Hosts-使用主機集合
傳遞地址數組給new Connection()方法是可以的,Address是一個簡單方面的類,在com.rabbitmq.client包下,包含主機的端口號屬性,如下:
Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
, new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);
將會嘗試連接hostname1:portnumber1,如果連接失敗將會連接hostname2:portnumber2。連接返回的是數組中第一個成功的值(不會返回IOException)。這是通過每次都調用factory.newConnection()完整平等的重復設置主機和端口號,直至其中個一個連接成功。
如果同樣提供了ExecutorService(使用方法factory.netConnection(es,addrArr)),這個線程池將會第一個成功連接的相關聯。
如果你想有更多的控制主機的方式連接,查看支持服務發(fā)現。
Service discovery with thw AddressResolver interface-帶有AddressResolver接口的服務發(fā)現
在3.6.6版本中,在創(chuàng)建連接時,去實現AddressResolver接口選擇實現的地方:
Connection conn = factory.newConnection(addressResolver);
這個AddressResolver接口類似于下面:
public interface AddressResolver {
List<Address> getAddresses() throws IOException;
}
就好像一組主機的集合一樣,先嘗試連接第一個的地址,如果失敗了則連接第二個地址,依次類推。
如果同樣提供了ExecutorService(使用方法factory.newConnection(es,addressResolver)),線程池將會和第一個連接成功的相關聯。
AddressResolver是一個非常好的地方去實現自定義的服務發(fā)現邏輯,在動態(tài)的構造中尤其有用。和自動恢復連接結合使用,客戶端在開始的時候將會自動連接到節(jié)點上。Affinity和載入平衡屬于其它的情景,這些情景正是自定義AddressResolver有用的地方。
Java客戶端帶有下面的實現(細節(jié)請參考文檔):
- DnsRecordIpAddressResolver:給定主機名,返回Ip地址(不能在DNS服務平臺使用)。在負載均衡和失效備援方面可以為簡單的DNS-based有用。
- DnsSrvRecordAddressResolver:給定服務名字,返回主機名字和端口號。作為一個DNS SRV請求被實現。當使用服務登錄像HashiCorp Consul 時會很有用。
Heartbeat Timeout -心跳的超時時間
查看 Heartbeats guide 獲取更多信息,關于心跳機制和在java客戶端如何配置的問題。
Custom Thread Factories-自定義線程工廠
計算機環(huán)境系統(tǒng)像谷歌應用引擎可以嚴格管理線程的實例化情況。為了在這樣的環(huán)境下使用RabbitMQjava版客戶端。非常有必要配置一個自定義的ThreadFactory,用于實例化線程的提供方法,例如GAE的ThreadManager。下面是谷歌應用引擎的例子:
import com.google.appengine.api.ThreadManager;
ConnectionFactory cf = new ConnectionFactory();
cf.setThreadFactory(ThreadManager.backgroundThreadFactory());
Support for java non-blocking IO-支持java非阻塞IO
4.0版本的java版客戶端帶來實驗性的支持java非阻塞式IO(java NIO)。NIO不應該被認為快于阻塞IO,它只是允許去更容易的控制資源(例如線程)
默認是阻塞式IO模式,每次連接都使用一個線程從網絡套接宇讀取數據。如果是NIO模式,你可以控制線程的數量,從網絡套接宇中讀取或者寫入。
如果你的程序使用很多連接(十幾個或者上百個),使用NIO模式。如果你使用少量的線程就使用默認的阻塞式模式,設置合適數量的線程,在性能上你不應該變差,尤其是連接并不是很忙的時候。
NIO必須明確的能夠使用:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
NIO模式通過設置NioParams類:
connectionFactory.setNioParams(new NioParams().setNbIoThreads(4));
NIO模式使用合理的方式,但是你應該根據你自己工作量去改變它們。一些設置:總的IO線程使用,緩存的大小,服務執(zhí)行使用在IO循環(huán),內存中參數寫入隊列(在被發(fā)送到網上之前,將請求寫入到隊列上),詳細情況請閱讀文檔。
Automatic Recovery From NetWork Failures-網絡連接失敗自動恢復
Connection Recovery-恢復連接
在客戶端和RabbitMQ節(jié)點間網絡連接可能會失敗。RabbitMQ客戶端支持自動恢復連接和拓展連接(隊列,裝換器,綁定和消費者),對于許多應用而言自動回復過程將如下步驟:
- 恢復連接
- 恢復連接監(jiān)聽
- 恢復通道
- 恢復通道監(jiān)聽
- 恢復通道basic.qos設置,發(fā)布確認和事務設置
拓展恢復包括以下行為,針對每一個通道:
- 恢復聲明裝換器(除了默認定義的)
- 恢復聲明隊列
- 恢復所有綁定
- 恢復所有消費者
在java版客戶端4.0.0版本中,自動恢復是默認的(拓展恢復也是)。
禁止或者允許自動恢復,使用factory.setAutomaticRecoveryEnabled(boolean)方法,下面的片段顯示了如何明確的自動恢復(對于java客戶端4.0.0):
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setAutomaticRecoveryEnabled(true);
// connection that will recover automatically
Connection conn = factory.newConnection();
如果恢復失敗導致一個錯誤(RabbitMQ節(jié)點沒有存在),在修復時間間隔(默認5秒)后將會再次嘗試,這個間隔可以配置:
ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);
當提供一組地址時,這組地址沒有順序,所有的地址就一個接著一個嘗試的連接:
ConnectionFactory factory = new ConnectionFactory();
Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};
factory.newConnection(addresses);
Recovery Listeners -恢復監(jiān)聽類
在合理的連接和通道中注冊一個或者多個恢復監(jiān)聽是可能,當恢復連接是允許的,通過ConnectionFactory#newConnection和Connection#createChannel方法獲取Connection類實現com.rabbitmq.client.Recoverable類,提供兩個可描述的名字的方法:
- addRecoveryListener
- removeRecoveryListener
注意,當前你需要轉變connections和channels為Recoverable才可以使用這些方法。
Effects on Publishing-發(fā)布的影響
使用Channel.basicPublish發(fā)布的消息在連接異常時將會被丟失??蛻舳瞬粫阉鼈兎诺疥犃兄械然謴瓦B接時在分發(fā)。為了保證發(fā)布的消息到達RabbitMQ應用,需要使用發(fā)布確認機制和連接失敗的統(tǒng)計。
Topology Recovery-拓展恢復
拓展恢復包括恢復轉換器,隊列,綁定和消費者。當自動恢復是允許的它的也是默認允許的。這樣在java4.0.0版本中拓展恢復也是允許的。
如果需要拓展恢復也可以禁止:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
// enable automatic recovery (e.g. Java client prior 4.0.0)
factory.setAutomaticRecoveryEnabled(true);
// disable topology recovery
factory.setTopologyRecoveryEnabled(false);
Failure Detection and Recovery Limitations-失敗檢測和恢復限制
自動恢復連接有許多限制和有意圖的設計目的,因此應用開發(fā)者們應該注意。
當連接停止或者丟失,它需要一定的時間才能檢測到。因此這里會有一個空窗期,在依賴和應用之間沒有意識到實際的連接已經失敗的情況。在這段時間的框架下,如何消息的發(fā)布都是連續(xù)的并且跟平常一樣地寫入到TCP套接宇。通過發(fā)布者確認機制才能夠保證它們分發(fā)的消息傳遞給了中間件:在AMQP0-9-1中發(fā)布消息被設計為異步的。
如果允許自動恢復,當一個套接宇或者I/O操作錯誤在連接中被檢測到,一段配置延遲時間后將會開始恢復連接,默認是5s。這個設計的目的是即使許多次網絡連接失敗是短暫的并且存活的時間很短,它們也不會立刻停止恢復?;謴瓦B接將在一段明確的時間間隔中保持繼續(xù)直至新的連接被成功打開。
當一個連接處于恢復狀態(tài)時,其中通道嘗試任務消息的發(fā)布都將導致一個錯誤產生,客戶端目前也不會緩存任何內部的無效的消息。應用開發(fā)者的責任之一是保證這些消息的記錄,然后等恢復連接成功時在發(fā)送它們。發(fā)布確認機制是一個協議擴展,發(fā)布者應該使用,就不會導致消息的丟失。
當一個通道關閉因為一個通道等級的錯誤時恢復連接將不會收影響,這樣的錯誤經常表示應用級別的問題,這個依賴庫不能做一些信息方面的決定關于這樣實例的時間。
關閉通道將不會被恢復,即使恢復連接之后,這些包括明確的關閉通道和通道等級的錯誤發(fā)生。
Manual Acknowledgements and Automatic Recovery-手動確認和自動恢復
當手動應答被使用時,有可能在消息傳遞和應答中間發(fā)生網絡連接RabbitMQ節(jié)點失敗的情況。在恢復連接之后,RabbitMQ會重設傳遞的標簽在所有的通道上,這意味著basic.ack,basic.nack和basic.reject帶有老的傳遞標簽將會草紙一個連接錯誤。為了避免這種情況,RabbitMQ的java版客戶端保留痕跡和跟新傳遞的標簽,在恢復時使它們自動跟新。Channel.basicAck,Channel.basicNack和Channel.basicReject將轉換成合適的傳遞標簽,使之可以被RabbitMQ使用。穩(wěn)定的傳遞標簽的應答機制將不會被發(fā)送,使用手動應答機制和自動恢復功能的應一定有能力處理重新傳遞消息。
Unhandled Exceptions-未處理的錯誤
關聯到連接,通道,恢復,消費者聲明周期的未處理錯誤將被授權給錯誤處理者。錯誤處理者是一個實現了ExceptionHandler接口的對象。默認的情況使用DefaultExceptionHandler對象處理,它將按標準方式輸出錯誤的詳情。
可以重寫ConnectionFactory#setExceptionHandler,通過Factory創(chuàng)建的connections都可以使用。
ConnectionFactory factory = new ConnectionFactory();
cf.setExceptionHandler(customHandler);
錯誤處理者應該用于打印錯誤日志。
Metrics and monitoring-測量和監(jiān)視器
作為4.0.0版本,客戶端可以手收集運行時各種度量(大量發(fā)布的消息),度量的收集是可以選擇的,并且通過ConnectionFactory級別來創(chuàng)建,使用setMetricsCollector(metricsCollector)方法,這個方法期待著一個MetricsCollector的實例,它可以在代碼中好幾個地方調用。
4.3版本客戶端支持Micrometer和Dropwizard Metrics
下面是收集的metrics:
- 許多打開的connections
- 許多開發(fā)的channels
- 許多發(fā)布的消息
- 許多消費的消息
- 許多應答的消息
- 許多被拒絕的消息
Micrometer和Dropwizard Metric都提供了數量,還包含速率,最后五分鐘速率等等。對于消息關聯的度量,它們同樣支持類似的工具去監(jiān)視和報道(JMX,Graphite,Ganglia,Datadog,etc),下面片段專注與更多的細節(jié)。
請記住下面關于度量的收集:
不要忘記添加依賴路(Maven,Gradle或者JAR文件)徑在JVM路徑中,當使用Micrometer或者Dropwizard Metrics。這些可選擇的依賴將不會自動在java客戶端加載出來,你需要添加其它的獨立依賴后臺使用,Metrics收集是擴展,實現自定義的MetricsCollector是為了特殊的需求,是被鼓勵的。
MetricsCollector被設置到ConnectionFactory等級,但是也可以被分享到不同的實例中。
Metrics手機不支持事務,舉例,如果應答被發(fā)送事務當中,事務然后在回滾,應答機制在客戶端被計算度量(而不是中間件)。注意,應答機制實際上應該被發(fā)送給中間件,然后由事務回滾取消。因此客戶端度量可以糾正大部分發(fā)送的應答消息。總計一句話,不要使用客戶端度量嚴格的業(yè)務邏輯,它們不會保證表現的很完美,它們以為會使用簡單的運行系統(tǒng)和使操作更有效率。
Micrometer supprot-支持千分尺
你可以使用度量收集千分尺的數據,如下方式:
ConnectionFactory connectionFactory = new ConnectionFactory();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // get Micrometer's Counter object
千分尺支持多種后臺:Netflix Atlas,Prometheus,Datadog,Influx,JMX等等。
你可以傳遞一個MeterRegistry的實例給MicrometerMetricsCollector,下面是JMX的例子:
JmxMeterRegistry registry = new JmxMeterRegistry();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector(registry);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setMetricsCollector(metrics);
Dropwizard Metrics Supprot-支持逐一度量
你可以使用度量收集Dropwiard,如下方式:
ConnectionFactory connectionFactory = new ConnectionFactory();
StandardMetricsCollector metrics = new StandardMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // get Metrics' Meter object
Dropwizard Metrics支持多種報告后臺:console,JMX,HTTP,Graphite,Ganglia等等。
你可以傳遞MetricsRegistry的實例給StandardMetricsCollector,下面是JMX的例子:
ConnectionFactory connectionFactory = new ConnectionFactory();
StandardMetricsCollector metrics = new StandardMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // get Metrics' Meter object
RabbitMQ java Client on Google App Engine-客戶端使用Google應用引擎
使用java版RabbitMQ客戶端在谷歌應用引擎上需要使用自定義的線程工廠,即使用GAE's ThreadManager實例化線程。另外,非常有必要設置一個低心跳機制間隔(4-5秒)去避免超時進行流讀取的行為在GAE上
ConnectionFactory factory = new ConnectionFactory();
cf.setRequestedHeartbeat(5);
Caveats and Limitations-警告和限制
為了使拓展恢復變成可能,java版RabbitMQ客戶端維護了聲明隊列,裝換器和綁定的緩存。這個緩存是先前連接的數據,確保RabbitMQ特性是客戶端去觀察到這些拓展的變化是不可能的。例如:當一個隊列被刪除,因為TTL,RabbitMQ客戶端將視圖緩存整個相同的例子:
- 當隊列被刪
- 當裝換器被刪
- 當綁定被刪
- 在一個自動刪除的隊列上消費者被取消
- 隊列或者轉換器未綁定一個會自動刪除的裝換器
無論如何,客戶端在簡單的連接之后不會記錄這些拓展的變化,應用依賴與自動刪除的隊列或者裝換器,同樣隊列TTL(而不是,消息TTL),使用自動恢復連接,應該明確的刪除實體,并且指導無用的或者刪除的,清除客戶端拓展緩存。這個將是有促進的,Channel#queueDelete,Channel#exchangeDelete,Channel#queueUnbind和Channel#exchangeUnbind將自動恢復在RabbitMQ3.3.x中(刪除將不會導致錯誤的東西)。
The RPC(Request/Reply) Pattern-RPC模型
作為一門邊界程序,java客戶端API提供了一個RpcClient類,它使用臨響應的隊列去提供簡單的RPC樣式通訊通過協議。
這個類不會強制任務特殊的類型在RPC參數和返回值中。它簡單的提供一套機制去發(fā)送消息給一個給定的裝換器用一個特定的routingkey,并且在回復隊列中等待響應,
import com.rabbitmq.client.RpcClient;
RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);
(這個類使用AMQP0-9-1實現的細節(jié)如下:請求信息被發(fā)送到basic.correlation_id屬性,作為一個獨一無二的值設置在RpcClient實例中,和使用basic.reply_to設置到回復隊列的名字中)。
一旦你創(chuàng)建了這個類的實例,你可以使用下面的方法發(fā)送RPC請求:
byte[] primitiveCall(byte[] message);
String stringCall(String message)
Map mapCall(Map message)
Map mapCall(Object[] keyValuePairs)
這個primitiveCall方法將字節(jié)數組作為請求參數和響應體,這個stringCall是方面圍繞primitiveCall,對待消息體像String實例
一樣,使用默認的字符編碼方法。
mapCall不同參數有一點復雜,它編碼了java.util.map包含的不同java值到AMQP0-9-1協議的二進制中,并且以同樣的方式反編碼了響應(注意,這里有許多的關系在值類型中使用,詳情細節(jié)請看文檔)
所有編組的或者未編組的簡易方法使用primitiveCall作為一種轉換,就好像提供了一層包裹在它上面。
TLS Support
在客戶端和中間件通訊時使用TLS加密是可能的,客戶端和服務端的認證都是被支持的,下面是最簡單的方式在客戶端加密:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);
factory.useSslProtocol();
注意,客戶端不會強制任何服務端(認證鏈)以上述默認的方式。信任所有的認證,TrustManager被用時,對于本地的發(fā)展是方面的,但是在一般的攻擊中或者開發(fā)環(huán)境中不推薦使用。想要學習更多的TLS支持使用RabbitMQ。請參考TLS指南,如果你僅僅想配置客戶端,可以閱讀TLS指南路徑片段。