本節(jié)詳細(xì)講述RabbitMQ的幾個(gè)基本API,圍繞Connection和channel兩個(gè)AMQP協(xié)議接口.詳解以下幾點(diǎn):連接、交換器、隊(duì)列的創(chuàng)建和綁定、消息的發(fā)送和消費(fèi)、消息確認(rèn)和關(guān)閉連接
一、連接RabbitMQ
不貼代碼了,參考上一節(jié)。這里要說一下,channel是非線程安全的,在有些情況下可能會(huì)導(dǎo)致網(wǎng)絡(luò)上出現(xiàn)錯(cuò)誤的通信幀交錯(cuò),同時(shí)也會(huì)影響發(fā)送方確認(rèn)機(jī)制的運(yùn)行。
二、使用Exchange和Queue
交換器和隊(duì)列是AMQP中high-level層面的構(gòu)建模塊,應(yīng)用程序應(yīng)確保使用前已經(jīng)存在了。使用前需要聲明。
生產(chǎn)者和消費(fèi)者都可以聲明一個(gè)交換器和隊(duì)列.如果嘗試聲明一個(gè)已存在的交換器或隊(duì)列,只要聲明的參數(shù)完全匹配,RabbitMQ就會(huì)什么都不做.否則會(huì)拋出異常.
- exchange的聲明和刪除
- 聲明: exchangeDeclare方法詳解
exchangeDeclare方法有多個(gè)重載方法,都是通過下面這個(gè)方法缺省某些參數(shù)構(gòu)成的
/**
* Declare an exchange, via an interface that allows the complete set of
* arguments.
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @param autoDelete true if the server should delete the exchange when it is no longer in use
* @param internal true if the exchange is internal, i.e. can't be directly
* published to by a client.
* @param arguments other properties (construction arguments) for the exchange
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
詳解一下幾個(gè)參數(shù):
exchange: 交換器名稱
type: 交換器類型,上節(jié)提到的direct fanout headers和topic
durable: 是否持久化 持久話將會(huì)將交換器存盤,服務(wù)器重啟時(shí)不會(huì)丟失
autoDelete: 是否自動(dòng)刪除. 自動(dòng)刪除的前提是至少有一個(gè)隊(duì)列或者交換器與這個(gè)交換器綁定.之后所有的交換器和隊(duì)列與這個(gè)交換器解綁后,該交換器會(huì)自動(dòng)刪除.不能錯(cuò)誤的理解為連接斷開后交換器會(huì)刪除
internal: 是否內(nèi)置的 這個(gè)參數(shù)為true時(shí)表示這個(gè)交換器只能由其他交換器發(fā)送消息.不能與客戶端直連
arguments: 其他一些結(jié)構(gòu)化參數(shù)
刪除交換器: exchangeDelete
/**
* Delete an exchange
* @see com.rabbitmq.client.AMQP.Exchange.Delete
* @see com.rabbitmq.client.AMQP.Exchange.DeleteOk
* @param exchange the name of the exchange
* @param ifUnused true to indicate that the exchange is only to be deleted if it is unused
* @return a deletion-confirm method to indicate the exchange was successfully deleted
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
- queue的聲明和刪除
聲明隊(duì)列:
/**
* Like {@link Channel#queueDeclare(String, boolean, boolean, boolean, java.util.Map)} but sets nowait
* flag to true and returns no result (as there will be no response from the server).
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @throws java.io.IOException if an error is encountered
*/
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
其他幾個(gè)參數(shù)含義跟declareExchange相同,說一下exclusive:設(shè)置是否排他.為true隊(duì)列是排他的.如果一個(gè)隊(duì)列是排他的,該隊(duì)列僅對首次聲明他的連接可見.并在連接斷開時(shí)斷連.同一個(gè)連接的不同channel可以訪問該連接創(chuàng)建的排他隊(duì)列.
刪除隊(duì)列:
/**
* Delete a queue
* @see com.rabbitmq.client.AMQP.Queue.Delete
* @see com.rabbitmq.client.AMQP.Queue.DeleteOk
* @param queue the name of the queue
* @param ifUnused true if the queue should be deleted only if not in use
* @param ifEmpty true if the queue should be deleted only if empty
* @return a deletion-confirm method to indicate the queue was successfully deleted
* @throws java.io.IOException if an error is encountered
*/
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
刪除隊(duì)列中的消息:
/**
* Purges the contents of the given queue.
* @see com.rabbitmq.client.AMQP.Queue.Purge
* @see com.rabbitmq.client.AMQP.Queue.PurgeOk
* @param queue the name of the queue
* @return a purge-confirm method if the purge was executed successfully
* @throws java.io.IOException if an error is encountered
*/
Queue.PurgeOk queuePurge(String queue) throws IOException;
- queueBind exchangeBind queueUnbind exchangeUnbind
- queueBind方法詳解
/**
* Bind a queue to an exchange.
* @see com.rabbitmq.client.AMQP.Queue.Bind
* @see com.rabbitmq.client.AMQP.Queue.BindOk
* @param queue the name of the queue 要綁定的隊(duì)列名
* @param exchange the name of the exchange 要綁定的交換器名
* @param routingKey the routing key to use for the binding 綁定時(shí)的routingBind
* @param arguments other properties (binding parameters) 綁定參數(shù)
* @return a binding-confirm method if the binding was successfully created
* @throws java.io.IOException if an error is encountered
*/
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
- queueUnbind 可以解綁已經(jīng)綁定的隊(duì)列和交換器
- 何時(shí)創(chuàng)建
- 由生產(chǎn)者和消費(fèi)者聲明
RabbitMQ 官方建議,生產(chǎn)者和消費(fèi)者都應(yīng)該嘗試創(chuàng)建隊(duì)列. - 程序上線前在服務(wù)器上創(chuàng)建好,比如通過頁面管理、RabbitMQ命令或更好的配置中心下發(fā)
預(yù)先創(chuàng)建的好處:
免去聲明過程;
確保交換器和隊(duì)列正確綁定;
配合mandatory參數(shù)或者備份交換器提高程序的健壯性
三、發(fā)送消息
/**
* Publish a message.
*
* Publishing to a non-existent exchange will result in a channel-level
* protocol exception, which closes the channel.
*
* Invocations of <code>Channel#basicPublish</code> will eventually block if a
* <a >resource-driven alarm</a> is in effect.
*
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @see <a >Resource-driven alarms</a>
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param mandatory true if the 'mandatory' flag is to be set
* @param immediate true if the 'immediate' flag is to be
* set. Note that the RabbitMQ server does not support this flag.
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
- exchange: 交換器名稱
- RoutingKey: 路由鍵
- props: 消息的基本屬性集 包括14個(gè)屬性
private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
- byte[] 消息體
- mandatory和immediate下節(jié)講述
四、消費(fèi)消息
/**
* Start a non-nolocal, non-exclusive consumer.
* @param queue the name of the queue 隊(duì)列名稱
* @param autoAck true if the server should consider messages 是否自動(dòng)確認(rèn).建議設(shè)置成false
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param consumerTag a client-generated consumer tag to establish context 消費(fèi)者標(biāo)簽,用來區(qū)分不同消費(fèi)者
* @param callback an interface to the consumer object 回調(diào)函數(shù),用來處理推送來的消息
* @return the consumerTag associated with the new consumer
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
*/
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
五、消費(fèi)端的確認(rèn)和拒絕
為了保證消息從隊(duì)列可靠地到達(dá)消費(fèi)者,RabbitMQ提供了消息確認(rèn)機(jī)制.當(dāng)自動(dòng)確認(rèn)autoack參數(shù)為false時(shí),消費(fèi)者客戶端需要顯示的調(diào)用basicAck確認(rèn)消費(fèi).RabbitMQ不會(huì)為未確認(rèn)的消息設(shè)置過期時(shí)間.它判斷此消息是否需要重新投遞的唯一依據(jù)是該消費(fèi)者的連接是否已斷開.
RabbitMQ提供了basicReject方法來告訴RabbitMQ拒絕一個(gè)消息.定義如下:
/**
* Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being rejected.
* @see com.rabbitmq.client.AMQP.Basic.Reject
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicReject(long deliveryTag, boolean requeue) throws IOException;
當(dāng)requeue為true時(shí),拒絕的消息將重新存入隊(duì)列,發(fā)送給下一個(gè)訂閱的消費(fèi)者.
這里提供一個(gè)demo:
public void consumeMessage() throws IOException {
Connection connection = ConnectionCreator.getConnection();
Channel channel = connection.createChannel();
boolean autoAck = false;
channel.basicConsume("test_queue", autoAck, "testConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(envelope);
System.out.println(new String(body));
long tag = envelope.getDeliveryTag();
if (tag / 2 == 0) {
channel.basicReject(tag, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), true);
}
}
});
RabbitMQUtils.close(connection, channel);
}
rabbitMQ還提供了basicNack方法.可以批量拒絕消息
注意:
basicReject或者basicNack中的requeue設(shè)置為false時(shí),可以啟用"死信隊(duì)列"功能.死信隊(duì)列可以通過檢測被拒絕或者未送達(dá)的消息來追蹤問題.
channel.basicReject
六、關(guān)閉連接
RabbitMQ為channel和connection提供了addShutdownListener方法對連接的關(guān)閉進(jìn)行回調(diào).如下:
public void closeChannel() throws IOException {
Connection connection = ConnectionCreator.getConnection();
Channel channel = connection.createChannel();
channel.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
System.out.println(cause);
}
});
RabbitMQUtils.close(channel);
}
附:
兩個(gè)測試時(shí)省代碼的封裝類
- 創(chuàng)建連接:
package com.pctf.basic;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionCreator {
private static final String HOST = "127.0.0.1";
private static final int PORT = 5672;
private static final String USER_NAME = "root";
private static final String PASSWORD = "123456";
private static final ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setPassword(PASSWORD);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
}
public static Connection getConnection() {
try {
return connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
}
- 關(guān)閉連接:
package com.pctf.utils;
import java.io.Closeable;
import java.io.IOException;
public class RabbitMQUtils {
public static void close(AutoCloseable... closeables) {
if (closeables != null) {
for (AutoCloseable c : closeables) {
if (c != null) {
try {
c.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
}