rabbitmq消息隊(duì)列入門

介紹

rabbitmq是一個消息代理,它接收和轉(zhuǎn)發(fā)消息,類似一個郵局,把你投遞的郵件送給指定收件人。
相關(guān)術(shù)語:

  • producing: 消息生產(chǎn)者,用于發(fā)送消息
  • queue: 隊(duì)列,用于存儲消息
  • consuming: 消息消費(fèi)者,用于接收消息

HelloWorld

P為生產(chǎn)者,C是消費(fèi)者,中間的框是隊(duì)列,消息的緩沖區(qū)。


image.png

消息發(fā)送

image.png

send.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            // 創(chuàng)建一個連接
            Connection connection = factory.newConnection();
            // 創(chuàng)建一個頻道,用于復(fù)用連接
            Channel channel = connection.createChannel();
            // 聲明消息發(fā)送的隊(duì)列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            // 往隊(duì)列中發(fā)出一條消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent'" + message + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

}

消息接收

image.png

Recv.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            // 創(chuàng)建一個連接
            Connection connection = factory.newConnection();
            // 創(chuàng)建一個頻道,用于復(fù)用連接
            Channel channel = connection.createChannel();
            // 指定發(fā)送的隊(duì)列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            // 往隊(duì)列中發(fā)出一條消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent'" + message + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

}

Work Queues

image.png

在HelloWorld中寫了發(fā)送/接收消息的程序,現(xiàn)在我們創(chuàng)建一個Work Queues(也稱為Task Queues),來在多個耗時的消息之間分配任務(wù)。

NewTask.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class NewTask {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // durable true 持久化
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 發(fā)送當(dāng)前時間
            String message = String.valueOf(System.currentTimeMillis());
            // PERSISTENT_TEXT_PLAIN 持久化
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("[x] Sent'" + message + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

}

聲明消息持久化后rabbitmq宕機(jī)也能從存儲中恢復(fù)消息。

Worker.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Worker {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            // 未回復(fù)消息處理完,消息隊(duì)列不會給它發(fā)新的消息
            channel.basicQos(1);
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");

                try {
                    doWork();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[x] Done");
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            // 手動確認(rèn)
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    private  static  void  doWork () throws InterruptedException {
        Thread.sleep(1000);
    }

}

設(shè)置prefetchCount為1,在沒有處理完一條消息的時候,消息隊(duì)列不會給它繼續(xù)下發(fā)消息,在它確認(rèn)完消息后,消息隊(duì)列繼續(xù)下發(fā)新消息。


image.png

發(fā)送/訂閱

在上面我們把消息發(fā)給相同的一個消費(fèi)者,現(xiàn)在把消息發(fā)送給多個消費(fèi)者,這種模式稱為發(fā)布訂閱模式。為了演示這種模式,我們創(chuàng)建一個簡單的日志記錄系統(tǒng),生產(chǎn)者發(fā)出日志,消費(fèi)者接收并打印它們,發(fā)布的消息將被廣播給所有消費(fèi)者。

Exchange (交換機(jī))

前面簡單的展示了如何接收發(fā)送消息,現(xiàn)在介紹完整的rabbitmq概念。簡單重復(fù)一下前面介紹的內(nèi)容:

  • 生產(chǎn)者是發(fā)送消息的程序
  • 隊(duì)列是消息的緩沖器
  • 消費(fèi)者是接收處理消息的程序

rabbitmq消息模型的核心思想是,生產(chǎn)者從來不會直接發(fā)送消息給一個隊(duì)列。又或者說生產(chǎn)者甚至不知道它的消息將會發(fā)送到哪個隊(duì)列。

生產(chǎn)者只能發(fā)送消息給一個交換機(jī)。交換機(jī)是一個很簡單的概念。它接收生產(chǎn)者的消息,然后推送消息到隊(duì)列中。交換機(jī)必須明確知道自己要對接收到的消息進(jìn)行何種處理: 是推送到特定隊(duì)列,還是推送到所有的隊(duì)列還是直接丟棄,這些規(guī)則由交換機(jī)的類型來定義。


image.png

有許多可供選擇的交換機(jī)類型: direct、topic、headers、fanout。 我詳細(xì)介紹fanout。創(chuàng)建一個fanout類型的交換機(jī),稱它為logs。

EmitLog.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // fanout廣播模式,會廣播所有接收到的消息給所有它的已知隊(duì)列
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = argv.length < 1 ? "info: Hello World!" :
                    String.join(" ", argv);

            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

}

ReveiveLogs.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 臨時隊(duì)列,非持久化的,唯一的,斷開連接自動刪除的并且隨機(jī)名稱的隊(duì)列
        String queueName = channel.queueDeclare().getQueue();
        // 綁定隊(duì)列和交換機(jī),告訴交換機(jī)給我們發(fā)送消息,如果沒有綁定到交換機(jī)上,消息會丟失
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }

}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,209評論 2 11
  • 1.RabbitMQ概述 簡介: MQ全稱為Message Queue,消息隊(duì)列是應(yīng)用程序和應(yīng)用程序之間的通信方法...
    梁朋舉閱讀 50,553評論 0 47
  • 關(guān)于消息隊(duì)列,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個朋友聊這塊的技術(shù)選型,是時...
    預(yù)流閱讀 586,640評論 51 787
  • 關(guān)于消息隊(duì)列,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個朋友聊這塊的技術(shù)選型,是時...
    Johnson_zx閱讀 1,157評論 0 5
  • 關(guān)于消息隊(duì)列,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個朋友聊這塊的技術(shù)選型,是時...
    Java機(jī)械師閱讀 594評論 0 2

友情鏈接更多精彩內(nèi)容