RabbitMQ筆記九:MessageListenerAdapter詳解

MessageListenerAdapter詳解

Message listener adapter that delegates the handling of messages to target listener methods via reflection, with flexible message type conversion. Allows listener methods to operate on message content types, completely independent from the Rabbit API.
消息監(jiān)聽(tīng)適配器(adapter),通過(guò)反射將消息處理委托給目標(biāo)監(jiān)聽(tīng)器的處理方法,并進(jìn)行靈活的消息類型轉(zhuǎn)換。允許監(jiān)聽(tīng)器方法對(duì)消息內(nèi)容類型進(jìn)行操作,完全獨(dú)立于Rabbit API。

By default, the content of incoming Rabbit messages gets extracted before being passed into the target listener method, to let the target method operate on message content types such as String or byte array instead of the raw
Message. Message type conversion is delegated to a Spring AMQ MessageConverter. By default, a SimpleMessageConverter will be used. (If you do not want such automatic message conversion taking place, then be sure to set the #setMessageConverter MessageConverter to null.)
默認(rèn)情況下,傳入Rabbit消息的內(nèi)容在被傳遞到目標(biāo)監(jiān)聽(tīng)器方法之前被提取,以使目標(biāo)方法對(duì)消息內(nèi)容類型進(jìn)行操作以String或者byte類型進(jìn)行操作,而不是原始Message類型。 (消息轉(zhuǎn)換器)
消息類型轉(zhuǎn)換委托給MessageConverter接口的實(shí)現(xiàn)類。 默認(rèn)情況下,將使用SimpleMessageConverter。 (如果您不希望進(jìn)行這樣的自動(dòng)消息轉(zhuǎn)換,
那么請(qǐng)自己通過(guò)#setMessageConverter MessageConverter設(shè)置為null)

If a target listener method returns a non-null object (typically of a message content type such as String or byte array), it will get wrapped in a Rabbit Message and sent to the exchange of the incoming message with the routingKey that comes from the Rabbit ReplyTo property or via #setResponseRoutingKey(String) specified routingKey).
如果目標(biāo)監(jiān)聽(tīng)器方法返回一個(gè)非空對(duì)象(通常是消息內(nèi)容類型,例如String或byte數(shù)組),它將被包裝在一個(gè)Rabbit Message 中,并發(fā)送使用來(lái)自Rabbit ReplyTo屬性或通過(guò)#setResponseRoutingKey(String)指定的routingKeyroutingKey來(lái)傳送消息。(使用rabbitmq 來(lái)實(shí)現(xiàn)異步rpc功能時(shí)候會(huì)使用到這個(gè)屬性)。

Note:The sending of response messages is only available when using the ChannelAwareMessageListener entry point (typically through a Spring message listener container). Usage as MessageListener does not support the generation of response messages.
注意:發(fā)送響應(yīng)消息僅在使用ChannelAwareMessageListener入口點(diǎn)(通常通過(guò)Spring消息監(jiān)聽(tīng)器容器)時(shí)可用。 用作MessageListener不支持生成響應(yīng)消息。

Demo

配置類MQConfig:

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("zhihao.direct.exchange");
        rabbitTemplate.setRoutingKey("zhihao.debug");
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("order","pay","zhihao.miao.order");

        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
        //設(shè)置處理器的消費(fèi)消息的默認(rèn)方法,如果沒(méi)有設(shè)置,那么默認(rèn)的處理器中的默認(rèn)方式是handleMessage方法
        adapter.setDefaultListenerMethod("onMessage");
        Map<String, String> queueOrTagToMethodName = new HashMap<>();
        queueOrTagToMethodName.put("order","onorder");
        queueOrTagToMethodName.put("pay","onpay");
        queueOrTagToMethodName.put("zhihao.miao.order","oninfo");
        adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
        container.setMessageListener(adapter);

        return container;
    }
}

Handler類MessageHandler,MessageHandler類中定義的方法也就是上面翻譯的目標(biāo)監(jiān)聽(tīng)器的處理方法:

public class MessageHandler {

    //沒(méi)有設(shè)置默認(rèn)的處理方法的時(shí)候,方法名是handleMessage
    public void handleMessage(byte[] message){
        System.out.println("---------handleMessage-------------");
        System.out.println(new String(message));
    }


    //通過(guò)設(shè)置setDefaultListenerMethod時(shí)候指定的方法名
    public void onMessage(byte[] message){
        System.out.println("---------onMessage-------------");
        System.out.println(new String(message));
    }

    //以下指定不同的隊(duì)列不同的處理方法名
    public void onorder(byte[] message){
        System.out.println("---------onorder-------------");
        System.out.println(new String(message));
    }

    public void onpay(byte[] message){
        System.out.println("---------onpay-------------");
        System.out.println(new String(message));
    }

    public void oninfo(byte[] message){
        System.out.println("---------oninfo-------------");
        System.out.println(new String(message));
    }

}

啟動(dòng)應(yīng)用類:

@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        System.out.println("===start up======");
        TimeUnit.SECONDS.sleep(60);
        context.close();
    }
}

總結(jié)

使用MessageListenerAdapter處理器進(jìn)行消息隊(duì)列監(jiān)聽(tīng)處理,如果容器沒(méi)有設(shè)置setDefaultListenerMethod,則處理器中默認(rèn)的處理方法名是handleMessage,如果設(shè)置了setDefaultListenerMethod,則處理器中處理消息的方法名就是setDefaultListenerMethod方法參數(shù)設(shè)置的值。也可以通過(guò)setQueueOrTagToMethodName方法為不同的隊(duì)列設(shè)置不同的消息處理方法。

源碼分析:

我們知道MessageListenerAdapter繼承AbstractAdaptableMessageListener類,實(shí)現(xiàn)MessageListenerChannelAwareMessageListener接口,而我們知道MessageListenerChannelAwareMessageListener接口的onMessage方法就是具體容器監(jiān)聽(tīng)隊(duì)列處理隊(duì)列消息的方法。

MessageListenerAdapteronMessage方法

@Override
public void onMessage(Message message, Channel channel) throws Exception {
    // Check whether the delegate is a MessageListener impl itself.
    // In that case, the adapter will simply act as a pass-through.
    Object delegate = getDelegate();
    if (delegate != this) {
        if (delegate instanceof ChannelAwareMessageListener) {
            if (channel != null) {
                ((ChannelAwareMessageListener) delegate).onMessage(message, channel);
                return;
            }
            else if (!(delegate instanceof MessageListener)) {
                throw new AmqpIllegalStateException("MessageListenerAdapter cannot handle a "
                        + "ChannelAwareMessageListener delegate if it hasn't been invoked with a Channel itself");
            }
        }
        if (delegate instanceof MessageListener) {
            ((MessageListener) delegate).onMessage(message);
            return;
        }
    }

    // Regular case: find a handler method reflectively.
    Object convertedMessage = extractMessage(message);
    //獲取處理消息的方法名
    String methodName = getListenerMethodName(message, convertedMessage);
    if (methodName == null) {
        throw new AmqpIllegalStateException("No default listener method specified: "
                + "Either specify a non-null value for the 'defaultListenerMethod' property or "
                + "override the 'getListenerMethodName' method.");
    }

    // Invoke the handler method with appropriate arguments.
    Object[] listenerArguments = buildListenerArguments(convertedMessage);
    Object result = invokeListenerMethod(methodName, listenerArguments, message);
    if (result != null) {
        handleResult(result, message, channel);
    }
    else {
        logger.trace("No result object given - no result to handle");
    }
}

獲取處理消息的方法名

protected String getListenerMethodName(Message originalMessage, Object extractedMessage) throws Exception {
    if (this.queueOrTagToMethodName.size() > 0) {
        MessageProperties props = originalMessage.getMessageProperties();
        String methodName = this.queueOrTagToMethodName.get(props.getConsumerQueue());
        if (methodName == null) {
            methodName = this.queueOrTagToMethodName.get(props.getConsumerTag());
        }
        if (methodName != null) {
            return methodName;
        }
    }
    return getDefaultListenerMethod();
}

結(jié)論

MessageListenerAdapter
1.可以把一個(gè)沒(méi)有實(shí)現(xiàn)MessageListenerChannelAwareMessageListener接口的類適配成一個(gè)可以處理消息的處理器
2.默認(rèn)的方法名稱為:handleMessage,可以通過(guò)setDefaultListenerMethod設(shè)置新的消息處理方法
3.MessageListenerAdapter支持不同的隊(duì)列交給不同的方法去執(zhí)行。使用setQueueOrTagToMethodName方法設(shè)置,當(dāng)根據(jù)queue名稱沒(méi)有找到匹配的方法的時(shí)候,就會(huì)交給默認(rèn)的方法去處理。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,568評(píng)論 19 139
  • https://nodejs.org/api/documentation.html 工具模塊 Assert 測(cè)試 ...
    KeKeMars閱讀 6,607評(píng)論 0 6
  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,513評(píng)論 2 34
  • 作者: weiyf時(shí)間: 2016-10-31 21:35:33原文鏈接:https://developer.an...
    衛(wèi)裕發(fā)閱讀 4,865評(píng)論 2 15
  • 她遇見(jiàn)他是在最好的年紀(jì),他予她是最美的相遇。 那一年,她18他20,她和他在朋友的生日聚會(huì)上邂逅了。他面容俊朗,玉...
    陽(yáng)春白雪ch閱讀 310評(píng)論 0 0

友情鏈接更多精彩內(nèi)容