RabbitMq 消息發(fā)布/確認(rèn) 以及對(duì)確認(rèn)失敗的消息的處理

這部分沒有涉及到交換機(jī),所以一個(gè)消息只能被消費(fèi)一次,多個(gè)消費(fèi)者之間是競爭關(guān)系


image.png

1、連接rabbitMq

pom文件


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.11.0</version>
        </dependency>


    </dependencies>

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @Author: yokipang
 * @Date: 2022/5/10
 * 連接工廠創(chuàng)建信道
 */
public class RabbitMqUtils {
    public static Channel getChannel() throws  Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.1.xx");
        factory.setUsername("xxx");
        factory.setPassword("xxxxx");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return  channel;
    }

}

2、具體方法

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import utils.RabbitMqUtils;

import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class ConfirmMessage {

    public static final int message_count = 1000;


    public static void main(String[] args) throws Exception {
        //單個(gè)確認(rèn) 690ms
        //publishMessage1();

        //批量確認(rèn) 160ms
        //publishMessage2();

        //異步批量確認(rèn)
        publishMessage3();
    }

    //單個(gè)確認(rèn)
    public static  void publishMessage1() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        /**
         * 聲明隊(duì)列
         * 參數(shù)1 隊(duì)列名稱
         * 參數(shù)2 消息是否持久化
         * 參數(shù)3 是否可以由多個(gè)消費(fèi)者消費(fèi)
         * 參數(shù)4 是否自動(dòng)刪除
         * 參數(shù)5 其他
         */
        channel.queueDeclare(queueName,true,false,false,null);

        //開啟發(fā)布、確認(rèn)
        channel.confirmSelect();

        //開始時(shí)間
        long begin = System.currentTimeMillis();

        for (int i = 0 ; i<message_count;i++){
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            boolean flag = channel.waitForConfirms();
            if(flag){
                System.out.println("消息發(fā)送成功");
            }
        }

        long end = System.currentTimeMillis();

        System.out.println("所需時(shí)間:"+(end-begin)+"ms");

    }


    //批量發(fā)布確認(rèn)
    public static  void publishMessage2() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        /**
         * 聲明隊(duì)列
         * 參數(shù)1 隊(duì)列名稱
         * 參數(shù)2 消息是否持久化
         * 參數(shù)3 是否可以由多個(gè)消費(fèi)者消費(fèi)
         * 參數(shù)4 是否自動(dòng)刪除
         * 參數(shù)5 其他
         */
        channel.queueDeclare(queueName,true,false,false,null);

        //開啟發(fā)布、確認(rèn)
        channel.confirmSelect();

        //開始時(shí)間
        long begin = System.currentTimeMillis();

        //批量確認(rèn)數(shù)量
        int batchSize = 100;


        for (int i = 0 ; i<message_count;i++){
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());

            if( i % batchSize == 0){
                channel.waitForConfirms();
            }
        }

        long end = System.currentTimeMillis();
        System.out.println("所需時(shí)間:"+(end-begin)+"ms");

    }



    //異步批量發(fā)布確認(rèn)
    public static  void publishMessage3() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        /**
         * 聲明隊(duì)列
         * 參數(shù)1 隊(duì)列名稱
         * 參數(shù)2 消息是否持久化
         * 參數(shù)3 是否可以由多個(gè)消費(fèi)者消費(fèi)
         * 參數(shù)4 是否自動(dòng)刪除
         * 參數(shù)5 其他
         */
        channel.queueDeclare(queueName,true,false,false,null);
        //開啟發(fā)布、確認(rèn)
        channel.confirmSelect();

        /**
         * 線程安全有序的哈希表 適用于高并發(fā)情況
         * 1、輕松地將序號(hào)與消息進(jìn)行關(guān)聯(lián)
         * 2、輕松的批量刪除條目 只要給到序號(hào)
         * 3、支持高并發(fā)(多線程)
         */
        ConcurrentSkipListMap<Long,String> concurrentSkipListMap = new ConcurrentSkipListMap<>();

        //消息發(fā)送前準(zhǔn)備監(jiān)聽器 監(jiān)聽消息發(fā)送狀態(tài)
        /**
         * 消息確認(rèn)成功函數(shù)
         * 1、消息標(biāo)記
         * 2、是否批量
         */
        ConfirmCallback ackConfirmCallback =(deliveryTag,multiple)->{
            //是否批量處理
            if(multiple){
                //刪除掉已確認(rèn)的消息 剩下的就是未成功發(fā)送的消息
                ConcurrentNavigableMap<Long,String> confirmd = concurrentSkipListMap.headMap(deliveryTag);
                confirmd.clear();
            }else{
                concurrentSkipListMap.remove(deliveryTag);
            }
            System.out.println("確認(rèn)成功的消息:"+ deliveryTag);
        };

        /**
         * 消息確認(rèn)失敗函數(shù)
         * 1、消息標(biāo)記
         * 2、是否批量
         */
        ConfirmCallback notAckConfirmCallback =(deliveryTag,multiple)->{
            String message = concurrentSkipListMap.get(deliveryTag);
            System.out.println("確認(rèn)失敗的消息:"+message +"   消息標(biāo)記" + deliveryTag);
        };

        //開始時(shí)間
        long begin = System.currentTimeMillis();
        //監(jiān)聽器
        channel.addConfirmListener(ackConfirmCallback,notAckConfirmCallback);

        //發(fā)送消息
        for (int i = 0 ; i<message_count;i++){
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            //記錄所有要發(fā)送的消息
            concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message);
        }

        long end = System.currentTimeMillis();
        System.out.println("所需時(shí)間:"+(end-begin)+"ms");

    }
}


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容