之前的文章里面,我都是在消費(fèi)端的代碼里面編寫 while 循環(huán),進(jìn)行 consumer.nextDelivery 方法進(jìn)行獲取下一條消息,然后進(jìn)行消費(fèi)處理,這種方式太 low 了,耦合性太高,所以要使用自定義的 consumer 來解耦,這種方式更方便一些,也是在實(shí)際工作中最常用的使用方式
下面來看看具體的代碼實(shí)現(xiàn), 代碼地址:
https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 項(xiàng)目下
如圖所示,先來實(shí)現(xiàn)我們的自定義消費(fèi)者
public class MyConsumer extends DefaultConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, //消費(fèi)者標(biāo)簽
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------MyConsumer-----consume message----------");
log.info("consumerTag: " + consumerTag);
log.info("envelope: " + envelope);
log.info("properties: " + properties);
log.info("body: " + new String(body));
}
}
接著,重點(diǎn)來了,在聲明消費(fèi)者的代碼里面使用剛才的自定義消費(fèi)者
/**
* 使用自定義消費(fèi)者
*/
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static final String EXCHANGE_NAME = "test_consumer_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY_TYPE = "consumer.#";
public static final String ROUTING_KEY = "consumer.save";
public static final String QUEUE_NAME = "test_consumer_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1 創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 通過Connection創(chuàng)建一個(gè)新的Channel
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
//使用自定義消費(fèi)者
channel.basicConsume(QUEUE_NAME, true, new MyConsumer(channel));
log.info("消費(fèi)端啟動成功");
}
}
生產(chǎn)端代碼基本不需要修改
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ Consumer Message";
for(int i = 0; i < 5; i ++){
log.info("生產(chǎn)端發(fā)送:{}", msg + i);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
}
}
}
先啟動消費(fèi)端,再啟動生產(chǎn)端,查看運(yùn)行結(jié)果:注意看消費(fèi)端的日志,打印出了我們自定義消費(fèi)者里面的東西了。
image
image
至此,簡單的使用自定義消費(fèi)者demo就完成了。