異常處理
RabbitMQ java client中的異常處理
消費消息,在消費消息的時候拋出異常,
消費啟動類:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.impl.DefaultExceptionHandler;
import java.util.concurrent.TimeUnit;
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
System.out.println(channel.isOpen());
channel.basicConsume("sms",true,new SimpleConsumer(channel));
TimeUnit.SECONDS.sleep(20);
System.out.println(channel.isOpen());
channel.close();
connection.close();
}
}
消費邏輯
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class SimpleConsumer extends DefaultConsumer{
public SimpleConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag);
System.out.println("-----收到消息了-----------");
System.out.println("消息屬性為:"+properties);
System.out.println("消息內(nèi)容為:"+new String(body));
throw new NullPointerException("空指針異常");
}
}
然后我們發(fā)現(xiàn)20s過后,發(fā)現(xiàn)控制臺打印了
true
amq.ctag-fJ45VlTAV2aKO97-zztDNQ
-----收到消息了-----------
消息屬性為:#contentHeader<basic>(content-type=null, content-encoding=null, headers={}, delivery-mode=1, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
消息內(nèi)容為:第一條消息
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=Closed due to exception from Consumer com.zhihao.test.day10.SimpleConsumer@2462e0fd (amq.ctag-fJ45VlTAV2aKO97-zztDNQ) method handleDelivery for channel AMQChannel(amqp://zhihao.miao@192.168.1.131:5672/,1), class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:286)
false
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:282)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:596)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:530)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:523)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68)
at com.zhihao.test.day10.Consumer.main(Consumer.java:31)
我們發(fā)現(xiàn)當消費端拋出異常的時候,channel會關(guān)閉,然后channel.close()會報錯。原因是什么呢?
我們知道com.rabbitmq.client.ExceptionHandler這個接口,中定義了各個階段異常的捕獲方法。其默認實現(xiàn)com.rabbitmq.client.impl.DefaultExceptionHandler,繼承com.rabbitmq.client.impl.StrictExceptionHandler,發(fā)現(xiàn)當消費失敗的時候會kill掉channel。
源碼如下:
@Override
public void handleConsumerException(Channel channel, Throwable exception,
Consumer consumer, String consumerTag,
String methodName)
{
handleChannelKiller(channel, exception, "Consumer " + consumer
+ " (" + consumerTag + ")"
+ " method " + methodName
+ " for channel " + channel);
}
調(diào)用handleChannelKiller方法
@Override
protected void handleChannelKiller(Channel channel, Throwable exception, String what) {
log(what + " threw an exception for channel " + channel, exception);
try {
channel.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + what);
} catch (AlreadyClosedException ace) {
// noop
} catch (TimeoutException ace) {
// noop
} catch (IOException ioe) {
log("Failure during close of channel " + channel + " after " + exception, ioe);
channel.getConnection().abort(AMQP.INTERNAL_ERROR, "Internal error closing channel for " + what);
}
}
消息發(fā)送確認異常捕獲:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.impl.DefaultExceptionHandler;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Send {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
connectionFactory.setExceptionHandler(new DefaultExceptionHandler(){
@Override
public void handleConfirmListenerException(Channel channel, Throwable exception) {
System.out.println("=====消息確認發(fā)生異常=======");
exception.printStackTrace();
}
});
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("收到消息確認,:"+deliveryTag);
throw new IOException("數(shù)據(jù)庫異常,確認失敗");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
}
});
channel.basicPublish("","sms",null,"發(fā)送消息".getBytes());
TimeUnit.SECONDS.sleep(20);
channel.close();
connection.close();
}
}
Spring AMQP異常處理
設(shè)置AUTO確認的時候已經(jīng)講解了異常處理,這邊主要講解一下自動聲明的時候的異常處理。
正常的情況下我們聲明隊列的代碼:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public Queue infoQueue(){
Map<String,Object> arguments = new HashMap<>();
return new Queue("info",true,false,false,arguments);
}
@Bean
public Queue errorQueue(){
Map<String,Object> arguments = new HashMap<>();
return new Queue("error",true,false,false,arguments);
}
}
應(yīng)用啟動類:
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan
public class Application {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
//使得客戶端第一次連接rabbitmq
context.getBean(RabbitAdmin.class).getQueueProperties("**");
context.close();
}
}
執(zhí)行聲明成功,但是假如我們的info隊列已經(jīng)存在,并且屬性和自動聲明的不一致,那么就會拋出異常造成info聲明不了,糟糕的是error隊列也聲明不了。

已經(jīng)聲明了info隊列
此時執(zhí)行發(fā)現(xiàn)拋出異常,并且聲明不了隊列。
控制臺打印出異常堆棧信息:
信息: Starting beans in phase -2147482648
九月 29, 2017 11:49:30 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
信息: Created new connection: connectionFactory#163e4e87:0/SimpleConnection@ef9296d [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 63190]
九月 29, 2017 11:49:30 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory log
嚴重: Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'info' in vhost '/': received none but current is the value 'zhihao.topic.exchange' of type 'longstr', class-id=50, method-id=10)
Exception in thread "main" org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71)
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1461)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1387)
at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:500)
at org.springframework.amqp.rabbit.core.RabbitAdmin$11.onCreate(RabbitAdmin.java:419)
at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:33)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:571)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1430)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1387)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:336)
at com.zhihao.miao.exception.Application.main(Application.java:18)
怎樣去解決呢?
修改配置類,設(shè)置忽略聲明異常
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//設(shè)置忽略聲明異常
rabbitAdmin.setIgnoreDeclarationExceptions(true);
return rabbitAdmin;
}
@Bean
public Queue infoQueue(){
Map<String,Object> arguments = new HashMap<>();
return new Queue("info",true,false,false,arguments);
}
@Bean
public Queue errorQueue(){
Map<String,Object> arguments = new HashMap<>();
return new Queue("error",true,false,false,arguments);
}
}
重新啟動應(yīng)用啟動類,
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan
public class Application {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
//使得客戶端第一次連接rabbitmq
context.getBean(RabbitAdmin.class).getQueueProperties("**");
context.close();
}
}
此時發(fā)現(xiàn)隊列error聲明成功,info聲明失敗,控制臺打?。?/p>
九月 29, 2017 11:51:23 下午 org.springframework.context.annotation.AnnotationConfigApplicationContext prepareRefresh
信息: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@446cdf90: startup date [Fri Sep 29 23:51:23 CST 2017]; root of context hierarchy
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
九月 29, 2017 11:51:24 下午 org.springframework.context.support.DefaultLifecycleProcessor start
信息: Starting beans in phase -2147482648
九月 29, 2017 11:51:24 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
信息: Created new connection: connectionFactory#163e4e87:0/SimpleConnection@ef9296d [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 63244]
九月 29, 2017 11:51:24 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory log
嚴重: Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'info' in vhost '/': received none but current is the value 'zhihao.topic.exchange' of type 'longstr', class-id=50, method-id=10)
九月 29, 2017 11:51:24 下午 org.springframework.amqp.rabbit.core.RabbitAdmin logOrRethrowDeclarationException
警告: Failed to declare queue: Queue [name=info, durable=true, autoDelete=false, exclusive=false, arguments={}], continuing... com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'info' in vhost '/': received none but current is the value 'zhihao.topic.exchange' of type 'longstr', class-id=50, method-id=10)
九月 29, 2017 11:51:24 下午 org.springframework.context.annotation.AnnotationConfigApplicationContext doClose
信息: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@446cdf90: startup date [Fri Sep 29 23:51:23 CST 2017]; root of context hierarchy
九月 29, 2017 11:51:24 下午 org.springframework.context.support.DefaultLifecycleProcessor stop
信息: Stopping beans in phase -2147482648
此時也就達到了我們的需求。