RabbitMQ 的自定義消費(fèi)者使用

之前的文章里面,我都是在消費(fèi)端的代碼里面編寫 while 循環(huán),進(jìn)行 consumer.nextDelivery 方法進(jìn)行獲取下一條消息,然后進(jìn)行消費(fèi)處理,這種方式太 low 了,耦合性太高,所以要使用自定義的 consumer 來解耦,這種方式更方便一些,也是在實(shí)際工作中最常用的使用方式

下面來看看具體的代碼實(shí)現(xiàn), 代碼地址:

https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 項(xiàng)目下

如圖所示,先來實(shí)現(xiàn)我們的自定義消費(fèi)者

public class MyConsumer extends DefaultConsumer {

    private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
    
    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag,  //消費(fèi)者標(biāo)簽
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        log.info("------MyConsumer-----consume message----------");
        log.info("consumerTag: " + consumerTag);
        log.info("envelope: " + envelope);
        log.info("properties: " + properties);
        log.info("body: " + new String(body));
    }
}

接著,重點(diǎn)來了,在聲明消費(fèi)者的代碼里面使用剛才的自定義消費(fèi)者

/**
 * 使用自定義消費(fèi)者
 */
public class Consumer {

    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    
    public static final String EXCHANGE_NAME = "test_consumer_exchange";
    public static final String EXCHANGE_TYPE = "topic";
    public static final String ROUTING_KEY_TYPE = "consumer.#";
    public static final String ROUTING_KEY = "consumer.save";
    public static final String QUEUE_NAME = "test_consumer_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 獲取C onnection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection創(chuàng)建一個(gè)新的Channel
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
        
      //使用自定義消費(fèi)者
        channel.basicConsume(QUEUE_NAME, true, new MyConsumer(channel));
        log.info("消費(fèi)端啟動成功");
    }
}

生產(chǎn)端代碼基本不需要修改

public class Procuder {

    private static final Logger log = LoggerFactory.getLogger(Procuder.class);

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String msg = "Hello RabbitMQ Consumer Message";
        for(int i = 0; i < 5; i ++){
            log.info("生產(chǎn)端發(fā)送:{}", msg + i);
            channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
        }
    }
}

先啟動消費(fèi)端,再啟動生產(chǎn)端,查看運(yùn)行結(jié)果:注意看消費(fèi)端的日志,打印出了我們自定義消費(fèi)者里面的東西了。

image
image

至此,簡單的使用自定義消費(fèi)者demo就完成了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 4. 設(shè)計(jì)思想 4.1 動機(jī) 我們設(shè)計(jì)的 Kafka 能夠作為一個(gè)統(tǒng)一的平臺來處理大公司可能擁有的所有實(shí)時(shí)數(shù)據(jù)饋送...
    瘋狂的橙閱讀 1,142評論 1 4
  • 發(fā)現(xiàn) 關(guān)注 消息 iOS 第三方庫、插件、知名博客總結(jié) 作者大灰狼的小綿羊哥哥關(guān)注 2017.06.26 09:4...
    肇東周閱讀 15,170評論 4 61
  • 那棵枯樹上結(jié)著 最美麗的蘋果 她借著雨水的倒影 等待著 躲在云后的 睡著了的太陽
    省略掉閱讀 120評論 0 1
  • 笨熊之所以叫笨熊,是因?yàn)樯掷锏木用穸颊J(rèn)為,這只熊很傻。 笨熊笨到捉不到獵物,只好整日吃素。 笨熊下河摸魚,魚沒捉...
    茲心非心閱讀 424評論 2 3
  • 橡樹代表的意思是:永恒。 橡樹材質(zhì)堅(jiān)硬,粗壯寬大,樹冠繁茂,有“森林之王”的美稱。 在歐美文化中,橡樹與人的生命相...
    哥舒閱讀 594評論 0 0

友情鏈接更多精彩內(nèi)容