一、概述
RocketMQ的producer默認(rèn)有兩個,一個是DefaultMQProducer,另一個是TransactionMQProducer,本文只對send方法做一個總結(jié),其他的細(xì)節(jié)在其他章節(jié)介紹
二、DefaultMQProducer
一共定義了17種send方法,從4.x版本,事務(wù)消息被放到了TransactionMQProducer中,所以有15個send方法,這15個方法中,又有兩個異步帶超時時間的send方法被廢棄了,所以有效的send方法有13個:
1、同步發(fā)送
/**
* 同步發(fā)送模式. 只有消息被成功接收并且被固化完成后才會收到反饋。
* 內(nèi)置有重發(fā)機(jī)制, producer將會重試
* {@link #retryTimesWhenSendFailed,default=2} 次 ,然后才會報錯.
* 因此,有一定的概率向broker發(fā)送重復(fù)的消息
* 使用者有責(zé)任去解決潛在的重復(fù)數(shù)據(jù)造成的影響
* @param msg 待發(fā)送數(shù)據(jù)
* @return {@link SendResult} 實體,來通知發(fā)送者發(fā)送狀態(tài)等信息, 比如消息的ID
* {@link SendStatus} 指明 broker 存儲/復(fù)制 的狀態(tài), 發(fā)送到了哪個隊列等等
* @throws MQClientException 客戶端異常
* @throws RemotingException 網(wǎng)絡(luò)連接異常
* @throws MQBrokerException broker異常
* @throws InterruptedException 發(fā)送線程中斷異常
*/
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
2、同步發(fā)送,帶超時時間
/**
* 與 {@link #send(Message)} 相同,只不過多了超時時間的指定.
*
* @param msg 待發(fā)送消息
* @param timeout 發(fā)送超時時間
* @return {@link SendResult} 同上
* {@link SendStatus} 同上
* @throws MQClientException 客戶端異常
* @throws RemotingException 網(wǎng)絡(luò)連接異常
* @throws MQBrokerException broker異常
* @throws InterruptedException 發(fā)送線程中斷異常
*/
@Override
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);
}
3、異步發(fā)送
/**
* 異步發(fā)送消息
* 消息發(fā)送后,立即返回。broker處理返程后, 觸發(fā)sendCallback回調(diào)方法
* 與上面一樣,在給出發(fā)送失敗標(biāo)志前,會嘗試2次,所以開發(fā)者要處理重復(fù)發(fā)送帶來的問題
* @param msg 待發(fā)送消息
* @param sendCallback 回調(diào)函數(shù)
* @throws MQClientException 客戶端異常
* @throws RemotingException 網(wǎng)絡(luò)異常
* @throws InterruptedException 發(fā)送線程中斷異常
*/
@Override
public void send(Message msg,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback);
}
4、異步發(fā)送,帶超時時間
@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}
5、單向發(fā)送,不等待broker回饋
/**
* 發(fā)送方法不會等待broker的反饋,只會一直發(fā)
* 所以有很高的吞吐量,但是有一定概率丟失消息
*
* @param msg 待發(fā)送消息
* @throws MQClientException 客戶端異常
* @throws RemotingException 網(wǎng)絡(luò)異常
* @throws InterruptedException 發(fā)送線程中斷異常
*/
@Override
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg);
}
6、同步發(fā)送,指定隊列
/**
* 同步發(fā)送,指定隊列
* @param msg 待發(fā)送消息
* @param mq 指定的消息隊列
* @return {@link SendResult} 同上
* {@link SendStatus} 同上
*/
@Override
public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq);
}
7、同步發(fā)送,指定隊列,并附帶超時時間
@Override
public SendResult send(Message msg, MessageQueue mq, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq, timeout);
}
8、異步發(fā)送,指定隊列
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, mq, sendCallback);
}
9、異步發(fā)送,指定隊列,附帶超時時間
這個在4.4.0版本被設(shè)置為廢棄,后續(xù)版本會給出
/**
* 因為在處超時異常存在問題,所以廢棄
*/
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
}
10、單向發(fā)送,指定隊列
@Override
public void sendOneway(Message msg,
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg, mq);
}
11、同步發(fā)送,指定隊列選擇策略
官方的有序消息的DEMO就是基于隊列選擇器做的,讓一些列有序的消息(相同ID)發(fā)送到同一個隊列
/**
* 指定隊列選擇策略MessageQueueSelector
*
* @param msg 待發(fā)送消息
* @param selector 隊列選擇器
* @param arg 配合隊列選擇器選擇隊列的參數(shù),一般可以是業(yè)務(wù)參數(shù)(ID等)
* @return {@link SendResult} 同上
* {@link SendStatus} 同上
*/
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, selector, arg);
}
12、同步發(fā)送消息,指定隊列選擇策略,并附帶超時時間
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
}
13、異步發(fā)送消息,指定隊列選擇策略
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
}
14、異步發(fā)送消息,指定隊列選擇策略,并附帶超時時間
這個方法在4.4.0版本廢棄,后續(xù)提供
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
}
15、單向發(fā)送,指定隊列選擇策略
@Override
public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
}
三、TransactionMQProducer
1、發(fā)送事務(wù)消息
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}