RabbitMQ筆記二十四:RabbitMQ異常處理

異常處理

RabbitMQ java client中的異常處理

消費消息,在消費消息的時候拋出異常,

消費啟動類:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.impl.DefaultExceptionHandler;

import java.util.concurrent.TimeUnit;

public class Consumer {
    public static void main(String[] args) throws Exception{

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");

        Connection connection = connectionFactory.newConnection();


        Channel channel = connection.createChannel();

        System.out.println(channel.isOpen());

        channel.basicConsume("sms",true,new SimpleConsumer(channel));

        TimeUnit.SECONDS.sleep(20);
        System.out.println(channel.isOpen());

        channel.close();
        connection.close();

    }
}

消費邏輯

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class SimpleConsumer extends DefaultConsumer{

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(consumerTag);
        System.out.println("-----收到消息了-----------");
        System.out.println("消息屬性為:"+properties);
        System.out.println("消息內(nèi)容為:"+new String(body));

        throw new NullPointerException("空指針異常");
    }
}

然后我們發(fā)現(xiàn)20s過后,發(fā)現(xiàn)控制臺打印了

true
amq.ctag-fJ45VlTAV2aKO97-zztDNQ
-----收到消息了-----------
消息屬性為:#contentHeader<basic>(content-type=null, content-encoding=null, headers={}, delivery-mode=1, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
消息內(nèi)容為:第一條消息
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=Closed due to exception from Consumer com.zhihao.test.day10.SimpleConsumer@2462e0fd (amq.ctag-fJ45VlTAV2aKO97-zztDNQ) method handleDelivery for channel AMQChannel(amqp://zhihao.miao@192.168.1.131:5672/,1), class-id=0, method-id=0)
    at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:286)
false
    at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:282)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:596)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:530)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:523)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68)
    at com.zhihao.test.day10.Consumer.main(Consumer.java:31)

我們發(fā)現(xiàn)當消費端拋出異常的時候,channel會關(guān)閉,然后channel.close()會報錯。原因是什么呢?

我們知道com.rabbitmq.client.ExceptionHandler這個接口,中定義了各個階段異常的捕獲方法。其默認實現(xiàn)com.rabbitmq.client.impl.DefaultExceptionHandler,繼承com.rabbitmq.client.impl.StrictExceptionHandler,發(fā)現(xiàn)當消費失敗的時候會kill掉channel。

源碼如下:

@Override
    public void handleConsumerException(Channel channel, Throwable exception,
                                        Consumer consumer, String consumerTag,
                                        String methodName)
    {
        handleChannelKiller(channel, exception, "Consumer " + consumer
                                                        + " (" + consumerTag + ")"
                                                        + " method " + methodName
                                                        + " for channel " + channel);
    }

調(diào)用handleChannelKiller方法

  @Override
    protected void handleChannelKiller(Channel channel, Throwable exception, String what) {
        log(what + " threw an exception for channel " + channel, exception);
        try {
            channel.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + what);
        } catch (AlreadyClosedException ace) {
            // noop
        } catch (TimeoutException ace) {
            // noop
        } catch (IOException ioe) {
            log("Failure during close of channel " + channel + " after " + exception, ioe);
            channel.getConnection().abort(AMQP.INTERNAL_ERROR, "Internal error closing channel for " + what);
        }
    }

消息發(fā)送確認異常捕獲:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.impl.DefaultExceptionHandler;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class Send {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");

        connectionFactory.setExceptionHandler(new DefaultExceptionHandler(){
            @Override
            public void handleConfirmListenerException(Channel channel, Throwable exception) {
                System.out.println("=====消息確認發(fā)生異常=======");
                exception.printStackTrace();
            }
        });


        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        channel.confirmSelect();

        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("收到消息確認,:"+deliveryTag);
                throw new IOException("數(shù)據(jù)庫異常,確認失敗");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {

            }
        });

        channel.basicPublish("","sms",null,"發(fā)送消息".getBytes());


        TimeUnit.SECONDS.sleep(20);

        channel.close();
        connection.close();


    }
}

Spring AMQP異常處理

設(shè)置AUTO確認的時候已經(jīng)講解了異常處理,這邊主要講解一下自動聲明的時候的異常處理。

正常的情況下我們聲明隊列的代碼:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public Queue infoQueue(){
        Map<String,Object> arguments = new HashMap<>();
        return new Queue("info",true,false,false,arguments);
    }

    @Bean
    public Queue errorQueue(){
        Map<String,Object> arguments = new HashMap<>();
        return new Queue("error",true,false,false,arguments);
    }
}

應(yīng)用啟動類:

import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@ComponentScan
public class Application {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        //使得客戶端第一次連接rabbitmq
        context.getBean(RabbitAdmin.class).getQueueProperties("**");
        context.close();
    }
}

執(zhí)行聲明成功,但是假如我們的info隊列已經(jīng)存在,并且屬性和自動聲明的不一致,那么就會拋出異常造成info聲明不了,糟糕的是error隊列也聲明不了。

已經(jīng)聲明了info隊列

此時執(zhí)行發(fā)現(xiàn)拋出異常,并且聲明不了隊列。

控制臺打印出異常堆棧信息:
信息: Starting beans in phase -2147482648
九月 29, 2017 11:49:30 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
信息: Created new connection: connectionFactory#163e4e87:0/SimpleConnection@ef9296d [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 63190]
九月 29, 2017 11:49:30 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory log
嚴重: Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'info' in vhost '/': received none but current is the value 'zhihao.topic.exchange' of type 'longstr', class-id=50, method-id=10)
Exception in thread "main" org.springframework.amqp.AmqpIOException: java.io.IOException
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71)
    at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1461)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1387)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:500)
    at org.springframework.amqp.rabbit.core.RabbitAdmin$11.onCreate(RabbitAdmin.java:419)
    at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:33)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:571)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1430)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1387)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:336)
    at com.zhihao.miao.exception.Application.main(Application.java:18)

怎樣去解決呢?

修改配置類,設(shè)置忽略聲明異常

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        //設(shè)置忽略聲明異常
        rabbitAdmin.setIgnoreDeclarationExceptions(true);
        return rabbitAdmin;
    }

    @Bean
    public Queue infoQueue(){
        Map<String,Object> arguments = new HashMap<>();
        return new Queue("info",true,false,false,arguments);
    }

    @Bean
    public Queue errorQueue(){
        Map<String,Object> arguments = new HashMap<>();
        return new Queue("error",true,false,false,arguments);
    }
}

重新啟動應(yīng)用啟動類,

import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@ComponentScan
public class Application {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        //使得客戶端第一次連接rabbitmq
        context.getBean(RabbitAdmin.class).getQueueProperties("**");
        context.close();
    }
}

此時發(fā)現(xiàn)隊列error聲明成功,info聲明失敗,控制臺打?。?/p>

九月 29, 2017 11:51:23 下午 org.springframework.context.annotation.AnnotationConfigApplicationContext prepareRefresh
信息: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@446cdf90: startup date [Fri Sep 29 23:51:23 CST 2017]; root of context hierarchy
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
九月 29, 2017 11:51:24 下午 org.springframework.context.support.DefaultLifecycleProcessor start
信息: Starting beans in phase -2147482648
九月 29, 2017 11:51:24 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
信息: Created new connection: connectionFactory#163e4e87:0/SimpleConnection@ef9296d [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 63244]
九月 29, 2017 11:51:24 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory log
嚴重: Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'info' in vhost '/': received none but current is the value 'zhihao.topic.exchange' of type 'longstr', class-id=50, method-id=10)
九月 29, 2017 11:51:24 下午 org.springframework.amqp.rabbit.core.RabbitAdmin logOrRethrowDeclarationException
警告: Failed to declare queue: Queue [name=info, durable=true, autoDelete=false, exclusive=false, arguments={}], continuing... com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'info' in vhost '/': received none but current is the value 'zhihao.topic.exchange' of type 'longstr', class-id=50, method-id=10)
九月 29, 2017 11:51:24 下午 org.springframework.context.annotation.AnnotationConfigApplicationContext doClose
信息: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@446cdf90: startup date [Fri Sep 29 23:51:23 CST 2017]; root of context hierarchy
九月 29, 2017 11:51:24 下午 org.springframework.context.support.DefaultLifecycleProcessor stop
信息: Stopping beans in phase -2147482648

此時也就達到了我們的需求。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,599評論 19 139
  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html,并沒有及時更新。 術(shù)語對...
    joyenlee閱讀 7,806評論 0 3
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,513評論 2 34
  • 關(guān)于消息隊列,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個朋友聊這塊的技術(shù)選型,是時...
    預(yù)流閱讀 586,631評論 51 787
  • 什么叫消息隊列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單,比如只包含文本字符串,也可以更復雜...
    lijun_m閱讀 1,417評論 0 1

友情鏈接更多精彩內(nèi)容