MessageListenerAdapter詳解
Message listener adapter that delegates the handling of messages to target listener methods via reflection, with flexible message type conversion. Allows listener methods to operate on message content types, completely independent from the Rabbit API.
消息監(jiān)聽(tīng)適配器(adapter),通過(guò)反射將消息處理委托給目標(biāo)監(jiān)聽(tīng)器的處理方法,并進(jìn)行靈活的消息類型轉(zhuǎn)換。允許監(jiān)聽(tīng)器方法對(duì)消息內(nèi)容類型進(jìn)行操作,完全獨(dú)立于Rabbit API。
By default, the content of incoming Rabbit messages gets extracted before being passed into the target listener method, to let the target method operate on message content types such as String or byte array instead of the raw
Message. Message type conversion is delegated to a Spring AMQ MessageConverter. By default, a SimpleMessageConverter will be used. (If you do not want such automatic message conversion taking place, then be sure to set the #setMessageConverter MessageConverter to null.)
默認(rèn)情況下,傳入Rabbit消息的內(nèi)容在被傳遞到目標(biāo)監(jiān)聽(tīng)器方法之前被提取,以使目標(biāo)方法對(duì)消息內(nèi)容類型進(jìn)行操作以String或者byte類型進(jìn)行操作,而不是原始Message類型。 (消息轉(zhuǎn)換器)
消息類型轉(zhuǎn)換委托給MessageConverter接口的實(shí)現(xiàn)類。 默認(rèn)情況下,將使用SimpleMessageConverter。 (如果您不希望進(jìn)行這樣的自動(dòng)消息轉(zhuǎn)換,
那么請(qǐng)自己通過(guò)#setMessageConverter MessageConverter設(shè)置為null)
If a target listener method returns a non-null object (typically of a message content type such as String or byte array), it will get wrapped in a Rabbit Message and sent to the exchange of the incoming message with the routingKey that comes from the Rabbit ReplyTo property or via
#setResponseRoutingKey(String)specified routingKey).
如果目標(biāo)監(jiān)聽(tīng)器方法返回一個(gè)非空對(duì)象(通常是消息內(nèi)容類型,例如String或byte數(shù)組),它將被包裝在一個(gè)Rabbit Message 中,并發(fā)送使用來(lái)自Rabbit ReplyTo屬性或通過(guò)#setResponseRoutingKey(String)指定的routingKey的routingKey來(lái)傳送消息。(使用rabbitmq 來(lái)實(shí)現(xiàn)異步rpc功能時(shí)候會(huì)使用到這個(gè)屬性)。
Note:The sending of response messages is only available when using the ChannelAwareMessageListener entry point (typically through a Spring message listener container). Usage as MessageListener does not support the generation of response messages.
注意:發(fā)送響應(yīng)消息僅在使用ChannelAwareMessageListener入口點(diǎn)(通常通過(guò)Spring消息監(jiān)聽(tīng)器容器)時(shí)可用。 用作MessageListener不支持生成響應(yīng)消息。
Demo
配置類MQConfig:
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange("zhihao.direct.exchange");
rabbitTemplate.setRoutingKey("zhihao.debug");
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("order","pay","zhihao.miao.order");
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
//設(shè)置處理器的消費(fèi)消息的默認(rèn)方法,如果沒(méi)有設(shè)置,那么默認(rèn)的處理器中的默認(rèn)方式是handleMessage方法
adapter.setDefaultListenerMethod("onMessage");
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("order","onorder");
queueOrTagToMethodName.put("pay","onpay");
queueOrTagToMethodName.put("zhihao.miao.order","oninfo");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
return container;
}
}
Handler類MessageHandler,MessageHandler類中定義的方法也就是上面翻譯的目標(biāo)監(jiān)聽(tīng)器的處理方法:
public class MessageHandler {
//沒(méi)有設(shè)置默認(rèn)的處理方法的時(shí)候,方法名是handleMessage
public void handleMessage(byte[] message){
System.out.println("---------handleMessage-------------");
System.out.println(new String(message));
}
//通過(guò)設(shè)置setDefaultListenerMethod時(shí)候指定的方法名
public void onMessage(byte[] message){
System.out.println("---------onMessage-------------");
System.out.println(new String(message));
}
//以下指定不同的隊(duì)列不同的處理方法名
public void onorder(byte[] message){
System.out.println("---------onorder-------------");
System.out.println(new String(message));
}
public void onpay(byte[] message){
System.out.println("---------onpay-------------");
System.out.println(new String(message));
}
public void oninfo(byte[] message){
System.out.println("---------oninfo-------------");
System.out.println(new String(message));
}
}
啟動(dòng)應(yīng)用類:
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("===start up======");
TimeUnit.SECONDS.sleep(60);
context.close();
}
}
總結(jié)
使用MessageListenerAdapter處理器進(jìn)行消息隊(duì)列監(jiān)聽(tīng)處理,如果容器沒(méi)有設(shè)置setDefaultListenerMethod,則處理器中默認(rèn)的處理方法名是handleMessage,如果設(shè)置了setDefaultListenerMethod,則處理器中處理消息的方法名就是setDefaultListenerMethod方法參數(shù)設(shè)置的值。也可以通過(guò)setQueueOrTagToMethodName方法為不同的隊(duì)列設(shè)置不同的消息處理方法。
源碼分析:
我們知道MessageListenerAdapter繼承AbstractAdaptableMessageListener類,實(shí)現(xiàn)MessageListener和ChannelAwareMessageListener接口,而我們知道MessageListener和ChannelAwareMessageListener接口的onMessage方法就是具體容器監(jiān)聽(tīng)隊(duì)列處理隊(duì)列消息的方法。
MessageListenerAdapter的onMessage方法
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// Check whether the delegate is a MessageListener impl itself.
// In that case, the adapter will simply act as a pass-through.
Object delegate = getDelegate();
if (delegate != this) {
if (delegate instanceof ChannelAwareMessageListener) {
if (channel != null) {
((ChannelAwareMessageListener) delegate).onMessage(message, channel);
return;
}
else if (!(delegate instanceof MessageListener)) {
throw new AmqpIllegalStateException("MessageListenerAdapter cannot handle a "
+ "ChannelAwareMessageListener delegate if it hasn't been invoked with a Channel itself");
}
}
if (delegate instanceof MessageListener) {
((MessageListener) delegate).onMessage(message);
return;
}
}
// Regular case: find a handler method reflectively.
Object convertedMessage = extractMessage(message);
//獲取處理消息的方法名
String methodName = getListenerMethodName(message, convertedMessage);
if (methodName == null) {
throw new AmqpIllegalStateException("No default listener method specified: "
+ "Either specify a non-null value for the 'defaultListenerMethod' property or "
+ "override the 'getListenerMethodName' method.");
}
// Invoke the handler method with appropriate arguments.
Object[] listenerArguments = buildListenerArguments(convertedMessage);
Object result = invokeListenerMethod(methodName, listenerArguments, message);
if (result != null) {
handleResult(result, message, channel);
}
else {
logger.trace("No result object given - no result to handle");
}
}
獲取處理消息的方法名
protected String getListenerMethodName(Message originalMessage, Object extractedMessage) throws Exception {
if (this.queueOrTagToMethodName.size() > 0) {
MessageProperties props = originalMessage.getMessageProperties();
String methodName = this.queueOrTagToMethodName.get(props.getConsumerQueue());
if (methodName == null) {
methodName = this.queueOrTagToMethodName.get(props.getConsumerTag());
}
if (methodName != null) {
return methodName;
}
}
return getDefaultListenerMethod();
}
結(jié)論
MessageListenerAdapter
1.可以把一個(gè)沒(méi)有實(shí)現(xiàn)MessageListener和ChannelAwareMessageListener接口的類適配成一個(gè)可以處理消息的處理器
2.默認(rèn)的方法名稱為:handleMessage,可以通過(guò)setDefaultListenerMethod設(shè)置新的消息處理方法
3.MessageListenerAdapter支持不同的隊(duì)列交給不同的方法去執(zhí)行。使用setQueueOrTagToMethodName方法設(shè)置,當(dāng)根據(jù)queue名稱沒(méi)有找到匹配的方法的時(shí)候,就會(huì)交給默認(rèn)的方法去處理。