RabbitMQ部分API詳解(含demo)

RabbitMQ部分API詳解(含demo)

demo配置文件rabbitmq.properties

host=192.168.221.138
port=5672
username=guest
password=guest

讀取配置文件內(nèi)容

public class RabbitConfig {
    public static String host ;
    public static int port ;
    public static String username;
    public static String password ;

    static{
        try {
            ClassLoader classLoader=RabbitConfig.class.getClassLoader();
            InputStream is=classLoader.getResourceAsStream("rabbitmq.properties");
            Properties properties=new Properties();
            properties.load(is);
            host = properties.getProperty("host");
            port = Integer.parseInt(properties.getProperty("port"));
            username =properties.getProperty("username");
            password =properties.getProperty("password");
        }catch (IOException e) {
            e.printStackTrace();
        }
    }
}

創(chuàng)建連接工廠

public class RabbitUtil {
    public static Connection GetRabbitConnection() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(RabbitConfig.host);
        factory.setPort(RabbitConfig.port);
        factory.setUsername(RabbitConfig.username);
        factory.setPassword(RabbitConfig.password);
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return connection;
    }
}

創(chuàng)建provider

public class FanoutProvider {

    public static void main(String[] args) throws Exception {
        // 創(chuàng)建一個連接
        Connection connection = RabbitUtil.GetRabbitConnection();
        // 創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明exchange
        //exchange:exchange名稱
        //type:exchange類型
        //durable:exchange是否持久化(不代表消息持久化)
        //autoDelete:已經(jīng)沒有消費者了,服務(wù)器是否可以刪除該Exchange
        //exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments)
        channel.exchangeDeclare("zx_fanout", BuiltinExchangeType.FANOUT, true,false,null);

        //聲明queue
        //queue:queue名稱
        //durable:queue是否持久化
        //exclusive:是否為當(dāng)前連接的專用隊列,在連接斷開后,會自動刪除該隊列
        //autodelete:當(dāng)沒有任何消費者使用時,自動刪除該隊列
        //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
        channel.queueDeclare("zx_queue_f1", true, false, false, null);
        channel.queueDeclare("zx_queue_f2", true, false, false, null);
        channel.queueDeclare("zx_queue_f3", true, false, false, null);

        //queue:queue名稱
        //exchange:exchange名稱
        //routingKey:路由鍵;用來綁定queue和exchange
        //queueBind(String queue, String exchange, String routingKey)
        channel.queueBind("zx_queue_f1","zx_fanout","");
        channel.queueBind("zx_queue_f2","zx_fanout","");
        channel.queueBind("zx_queue_f3","zx_fanout","");

        //開啟confirm機制
        // (即rabbitmq-server收到生產(chǎn)端的消息,會給生產(chǎn)端發(fā)送一個確認,如果沒法送確認,生產(chǎn)端重新發(fā)送消息到server中)
        channel.confirmSelect();

        for (int i = 1 ; i <= 100 ;i++){
            String message = "fanout:廣播的第 "+ i +" 條消息";

            //exchange
            //routingKey:路由鍵
            //mandatory:true=如果exchange根據(jù)自身類型和消息routeKey無法找到一個符合條件的queue,那么會調(diào)用basic.return方法將消息返還給生產(chǎn)者。false=出現(xiàn)上述情形broker會直接將消息扔掉
            //immediate:true=如果exchange在將消息route到queue(s)時發(fā)現(xiàn)對應(yīng)的queue上沒有消費者,那么這條消息不會放入隊列中。false=當(dāng)與消息routeKey關(guān)聯(lián)的所有queue(一個或多個)都沒有消費者時,該消息會通過basic.return方法返還給生產(chǎn)者。
            //BasicProperties:消息的基本屬性,例如路由頭等(MessageProperties.PERSISTENT_TEXT_PLAIN:消息的持久化)
            //basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            channel.basicPublish("zx_fanout","",false,false,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("utf-8"));
            System.out.println(message);
        }

        // 關(guān)閉連接
        channel.close();
        connection.close();
    }
}

創(chuàng)建consumer

public class Consumer1 {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建一個連接
        Connection conn = RabbitUtil.GetRabbitConnection();
        // 創(chuàng)建通道
        Channel channel = conn.createChannel();
        //聲明exchange
        //exchange:exchange名稱
        //type:exchange類型
        //durable:exchange是否持久化(不代表消息持久化)
        //autoDelete:已經(jīng)沒有消費者了,服務(wù)器是否可以刪除該Exchange
        //exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments)
        channel.exchangeDeclare("zx_fanout", BuiltinExchangeType.FANOUT,true,false,null);

        //聲明queue
        //queue:queue名稱
        //durable:queue是否持久化
        //exclusive:是否為當(dāng)前連接的專用隊列,在連接斷開后,會自動刪除該隊列
        //autodelete:當(dāng)沒有任何消費者使用時,自動刪除該隊列
        //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
        channel.queueDeclare("zx_queue_f1", true, false, false, null);

        //queue:queue名稱
        //exchange:exchange名稱
        //routingKey:路由鍵;用來綁定queue和exchange
        //queueBind(String queue, String exchange, String routingKey)
        channel.queueBind("zx_queue_f1","zx_fanout","");
        
        //uack的消息最多有幾條
        channel.basicQos(200);

        //消息消費
        //queue:綁定隊列名稱
        //autoAck:是否自動ack,如果不自動ack,需要使用channel.ack、channel.nack、channel.basicReject 進行消息應(yīng)答
        //basicConsume(String queue, boolean autoAck, Consumer callback)
        channel.basicConsume("zx_queue_f1",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    String message = new String(body,"utf-8");
                    System.out.println(Consumer1.class.getSimpleName()+"--接收消息--"+message);
                } catch (UnsupportedEncodingException e) {

                    //deliveryTag:該消息的唯一ID
                    //multiple:是否批量. true:將一次性拒絕所有小于deliveryTag的消息。
                    //requeue:被拒絕的是否重新入隊列
                    //channel.basicNack(long deliveryTag, boolean multiple, boolean requeue);
                    channel.basicNack(envelope.getDeliveryTag(),false,true);

                    //deliveryTag:該消息的index
                    //requeue:被拒絕的是否重新入隊列
                    //channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
                    //channel.basicNack 與 channel.basicReject 的區(qū)別在于basicNack可以拒絕多條消息,而basicReject一次只能拒絕一條消息
                } finally {

                    //basicAck(long deliveryTag, boolean multiple)
                    //deliveryTag:該消息的唯一ID
                    //multiple:是否批量.true:將一次性ack所有小于deliveryTag的消息。
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        });
    }
}

demo的github地址

https://github.com/AndeZhaoX/rabbitmq-test.git

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 夏季,一個神奇的季節(jié)。它可以讓百花盛開,讓樹木成蔭,讓天氣瞬間東邊日出西邊雨。當(dāng)然,還有,讓大街上人們的衣...
    嵐月看海閱讀 277評論 0 1
  • 嗨,我們又見面了,我是你們的老朋友——丹丹媽咪,一枚專注分享育兒知識的寶媽。今天想和大家聊聊啥呢?請繼續(xù)往下看噢。...
    愛笑寶媽歐丹丹閱讀 548評論 0 0
  • 明知道你不會找我 還是忍不住看手機
    歡歡_5726閱讀 177評論 0 1
  • 繁花似錦, 不及一葉的你。 似水流年, 載不走, 你年華的老去。 每一絲紋路, 每一寸葉的 肌膚, 耐得住 時光蹉...
    俗知者閱讀 336評論 0 4

友情鏈接更多精彩內(nèi)容