RabbitMQ Spring-AMQP官方教程(二)--工作隊列

2 工作隊列(Work Queues)

image

In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.

在第一個教程中,我們編寫了兩個程序,一個用于往一個命名了的隊列發(fā)送消息,另一個從這個隊列里接收消息。在本教程里,我們將創(chuàng)建一個工作隊列(Work Queue),它將被用來在多個工作者之間分發(fā)耗時任務。

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

工作隊列(也叫做任務隊列)背后的主要目的是為了避免立即執(zhí)行資源密集型的任務以及避免必須等待任務完成的情況,相反,它計劃著讓這些任務可以留到后面去執(zhí)行。我們將任務分裝成消息,并將它發(fā)送到隊列里。運行在后臺的工作進程將取出任務并最終執(zhí)行它。如果你運行了多個工作進程,那么所有的任務將被它們共享。

This concept is especially useful in web applications where it's impossible to handle a complex task during a short HTTP request window.

對于web應用,這個概念特別有用,因為web應用無法在短短的HTTP請求窗口內處理復雜的任務。

Preparation(準備工作)

In the previous part of this tutorial we sent a message containing "Hello World!". Now we'll be sending strings that stand for complex tasks. We don't have a real-world task, like images to be resized or pdf files to be rendered, so let's fake it by just pretending we're busy - by using the Thread.sleep() function. We'll take the number of dots in the string as its complexity; every dot will account for one second of "work". For example, a fake task described by Hello... will take three seconds.

在教程1里,我們實現(xiàn)了如何發(fā)送包含“Hello World!”的消息?,F(xiàn)在,我們將發(fā)送表示復雜任務的字符串。由于我們沒有真實的任務,如調整圖片大小或者渲染pdf文件,所以我們通過使用Thread.sleep()函數(shù)來模擬繁忙的耗時任務。我們將在字符串中用點號的個數(shù)來表示復雜度,點號的個數(shù)表示整個任務執(zhí)行秒的秒數(shù)。例如,Hello...表示模擬任務需要執(zhí)行三秒。

Please see the setup in first tutorial if you have not setup the project. We will follow the same pattern as in the first tutorial: create a package (tut2) and create a Tut2Config, Tut2Receiver, and Tut2Sender. Start by creating a new package (tut2) where we'll place our three classes. In the configuration class we setup two profiles, the label for the tutorial ("tut2") and the name of the pattern ("work-queues"). We leverage spring to expose the queue as a bean. We setup the receiver as a profile and define two beans to correspond to the workers in our diagram above: receiver1 and receiver2. Finally, we define a profile for the sender and define the sender bean. The configuration is now done.

如果你還未建立配置好項目,請見教程1的配置過程。我們將采用與教程1相同的方式,新建一個包目錄(tut2)并創(chuàng)建一個名為Tut2Config的配置類,一個名為Tut2Receiver的消息接收類,以及一個名為Tut2Sender的消息發(fā)送類。首先新建包目錄(tut2),我們將在這個包下面放剛說到的那三個類。在配置類里,我們將配置兩個配置組,一個作為當前教程的標簽(“tut2”),一個作為當前RabbitMQ使用模式的名字(“work-queue”)。我們利用Spring框架將隊列暴露為一個bean。我們設置一個接受者配置組,并定義兩個bean來對應于上面圖中的兩個消費者:receiver1和receiver2。最后,我們會定義一個發(fā)送者配置組,并定義作為發(fā)送者的bean。這樣配置就結束了。

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile({"tut2", "work-queues"})
@Configuration
public class Tut2Config {

    @Bean
    public Queue hello() {
        return new Queue("hello");
    }
    
    @Profile("receiver")
    private static class ReceiverConfig {
    
        @Bean
        public Tut2Receiver receiver1() {
            return new Tut2Receiver(1);
        }
        
        @Bean
        public Tut2Receiver receiver2() {
            return new Tut2Receiver(2);
        }
    }

    @Profile("sender")
    @Bean
    public Tut2Sender sender() {
        return new Tut2Sender();
    }
}

Sender(發(fā)送者)

We will modify the sender to provide a means for identifying whether its a longer running task by appending a dot to the message in a very contrived fashion using the same method on the RabbitTemplate to publish the message, convertAndSend. The documentation defines this as, "Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key."

我們將對發(fā)送者類進行修改,在發(fā)送方法中,通過人為地在消息后面添加點號來識別當前任務是否耗時,并依舊使用RabbitTemplate的convertAndSend方法來發(fā)布消息。文檔把convertAndSend方法定義為,“將一個Java對象轉換成一個Amqp消息,并用一個默認路由鍵(routing key)將其發(fā)送到一個默認的exchange里?!?/p>

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class Tut2Sender {

    @Autowired
    private RabbitTemplate template;
    
    @Autowired
    private Queue queue;
    
    int dots = 0;
    int count = 0;
    
    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        StringBuilder builder = new StringBuilder("Hello");
        if (dots++ == 3) {
            dots = 1;
        }
        for (int i = 0; i < dots; i++) {
            builder.append('.');
        }
        
        builder.append(Integer.toString(++count));
        String message = builder.toString();
        template.convertAndSend(queue.getName(), message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

Receiver(接收者)

Our receiver, Tut2Receiver, simulates an arbitary length for a fake task in the doWork() method where the number of dots translates into the number of seconds the work will take. Again, we leverage a @RabbitListener on the "hello" queue and a @RabbitHandler to receive the message. The instance that is consuming the message is added to our monitor to show which instance, the message and the length of time to process the message.

我們的接收者類,Tut2Receiver,在doWork()方法里根據(jù)消息所帶點號的個數(shù),將其轉換成任務所需消耗的秒數(shù),以此模擬了任意任務的長度。同樣地,我們對“hello”隊列使用了@RabbitListener注解,并通過@RabbitHandler注解來接收消息。同時,用一個編號來標識正在消費消息的實例,并將它加入我們的監(jiān)控中,最終打印出實例編號,消息內容以及處理消息所耗費的時長。

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

@RabbitListener(queues = "hello")
public class Tut2Receiver {

    private final int instance;
    
    public Tut2Receiver(int i) {
        this.instance = i;
    }
    
    @RabbitHandler
    public void receive(String in) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        System.out.println("instance " + this.instance +
        " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        System.out.println("instance " + this.instance +
        " [x] Done in " + watch.getTotalTimeSeconds() + "s");
        }
        
        private void doWork(String in) throws InterruptedException {
            for (char ch : in.toCharArray()) {
                if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }
}

Putting it all together(代碼整合)

Compile them using mvn package and run with the following options

使用mvn package來編譯上述代碼,并在運行時添加下面的命令行參數(shù)

mvn clean package

java -jar target/rabbitmq-amqp-tutorials-0.0.1-SNAPSHOT.jar --spring.profiles.active=work-queues,receiver
java -jar target/rabbitmq-amqp-tutorials-0.0.1-SNAPSHOT.jar --spring.profiles.active=work-queues,sender

The output of the sender should look something like:

發(fā)送者類的輸出看起來應該是類似于這樣的:

Ready ... running for 10000ms
[x] Sent 'Hello.1'
[x] Sent 'Hello..2'
[x] Sent 'Hello...3'
[x] Sent 'Hello.4'
[x] Sent 'Hello..5'
[x] Sent 'Hello...6'
[x] Sent 'Hello.7'
[x] Sent 'Hello..8'
[x] Sent 'Hello...9'
[x] Sent 'Hello.10'

And the output from the workers should look something like:

而工作者類的輸出看起來應該是類似于這樣的:

Ready ... running for 10000ms
instance 1 [x] Received 'Hello.1'
instance 2 [x] Received 'Hello..2'
instance 1 [x] Done in 1.001s
instance 1 [x] Received 'Hello...3'
instance 2 [x] Done in 2.004s
instance 2 [x] Received 'Hello.4'
instance 2 [x] Done in 1.0s
instance 2 [x] Received 'Hello..5'

Message acknowledgment(消息確認)

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. Spring AMQP by default takes a conservative approach to message acknowledgement. If the listener throws an exception the container calls:

完成一個任務需要耗費幾秒。對于一個開始了長任務的消費者,你可能會想知道,當它只完成了部分任務就掛掉時發(fā)生了什么。Spring AMQP默認采用保守的方式來進行消息確認。如果監(jiān)聽器拋出了一個異常,那么容器會調用:

channel.basicReject(deliveryTag, requeue)

Requeue is true by default unless you explicitly set:

requeue(重入隊列)默認為true,除非你顯示地將其設置為false:

defaultRequeueRejected=false

or the listener throws an AmqpRejectAndDontRequeueException. This is typically the bahavior you want from your listener. In this mode there is no need to worry about a forgotten acknowledgement. After processing the message the listener calls:

否則監(jiān)聽器拋出AmqpRejectAndDontRequeueException異常。這通常會是你想要監(jiān)聽器的行為。在這種模式下,無需擔心一個被忘記了的確認。在處理完消息后,監(jiān)聽器會調用:

channel.basicAck()

Acknowledgement must be sent on the same channel the delivery it is for was received on. Attempts to acknowledge using a different channel will result in a channel-level protocol exception. See the doc guide on confirmations to learn more. Spring AMQP generally takes care of this but when used in combination with code that uses RabbitMQ Java client directly, this is something to keep in mind.

消息確認必須在與接收信息相同的通道(channel)上進行發(fā)送。試圖使用不同的通道來進行確認將導致通道級別的協(xié)議異常。若想更詳細了解,可以參閱關于消息確認的文檔指南。Spring AMQP一般都會處理好這種問題,但與直接使用RabbitMQ的Java客戶端的代碼結合使用時,這點要小心。

Forgotten acknowledgment(被遺忘的確認)

It's a common mistake to miss the basicAck and spring-amqp helps to avoid this through its default configuraiton. The consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.

忘記調用basciAck方法是常見的一個錯誤,spring-amqp通過它的默認配置來避免它。這個錯誤的后果是很嚴重的。當你的客戶端退出后,消息會被重復發(fā)送(看起來就像是隨機重發(fā)),但RabbitMQ將吃掉越來越多的內存,因為它無法釋放任何未確認的消息。

In order to debug this kind of mistake you can use rabbitmqctl to print the messages_unacknowledged field:

為了調試這種錯誤,你可以使用rabbitmqctl來打印messages_unacknowledged域:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

On Windows, drop the sudo:

在Windows環(huán)境下,去掉sudo:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Message durability(消息持久性)

With spring-amqp there are reasonable default values in the MessageProperties that account for message durability. In particular you can check the table for common properties. You'll see two relevant to our discussion here on durability:

對于spring-amqp,消息屬性配置里有很多合理的默認值,這些默認值共同決定了消息的持久性。你可以查閱常用屬性表。你將會看到與我們正在討論的消息持久性相關的兩個屬性:

Property default Description
durable true When declareExchange is true the durable flag is set to this value
deliveryMode PERSISTENT PERSISTENT or NON_PERSISTENT to determine whether or not RabbitMQ should persist the messages

Note on message persistence(關于消息持久化要注意的地方)

Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee then you can use publisher confirms.

將消息標記為持久化并不能完全保證消息將不會丟失。雖然它告訴RabbitMQ要將消息保存到磁盤,但當RabbitMQ接收了某條消息并且還沒有保存該消息時,仍有一個小的時間窗口。而且,RabbitMQ不會為每條消息都進行fsync操作——它可能僅僅只是將其緩存起來,但并沒有真的將消息寫入磁盤。這么做雖然無法完全保證持久化,但對于我們簡單的任務隊列來說,這已經(jīng)很足夠了。如果你需要完全保證持久化,那么你可以使用發(fā)布者確認。

Fair dispatch vs Round-robin dispatching(公平調度vs循環(huán)調度)

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. In this mode dispatching doesn't necessarily work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.

默認情況下,RabbitMQ將把每一條消息按順序逐一發(fā)送給下一個消費者。每個消費者都將被平均分到相同的個數(shù)的消息。這種消息分派方式被稱為循環(huán)調度。這種調度模式有時無法完全滿足我們的需求。例如,假設有兩個工作者,第奇數(shù)條消息是重任務,第偶數(shù)條消息是輕任務,那么其中一個工作者將會總是很繁忙,而另一個工作者則有可能沒什么事做。然而,RabbitMQ并不知道這個狀況,并且仍然會這么均勻地分發(fā)消息。

This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

會發(fā)生這個情況是因為,當消息進入隊列時,RabbitMQ僅僅是將消息分派出去。它不會去看某個消費者未確認的消息的數(shù)量。它只是盲目地將消息均勻分派給各個消費者。

However, "Fair dispatch" is the default configuration for spring-amqp. The SimpleMessageListenerContainer defines the value for DEFAULT_PREFETCH_COUNT to be 1. If the DEFAULT_PREFECTH_COUNT were set to 0 the behavior would be round robin messaging as described above.

然而,“公平調度”是spring-amqp的默認配置。SimpleMessageListenerContainer類將DEFAULT_PREFETCH_COUNT的值定義為1。如果DEFAULT_PREFETCH_COUNT的值被設置為0,那么調度方式將變成循環(huán)調用。

image

However, with the prefetchCount set to 1 by default, this tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

然而,prefetchCount的值默認設為1,這告訴RabbitMQ不要同時將多個消息分派給一個工作者。換句話說,在某個工作者處理完一條消息并確認它之前,RabbitMQ不會給該工作者分派新的消息,而是將新的消息分派給下一個不是很繁忙的工作者。

Note about queue size(關于隊列大小需要注意的地方)

If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.

如果所有的工作者都繁忙,那么你的隊列會被填滿。你需要注意這種情況,要么添加多幾個工作者,要么就采用其它策略。

By using spring-amqp you get reasonable values configured for message acknowledgments and fair dispatching. The default durability for queues and persistence for messages provided by spring-amqp allow let the messages to survive even if RabbitMQ is restarted.

通過使用spring-amqp,你會發(fā)現(xiàn),它已經(jīng)為消息確認和公平調度配置好合理值。spring-amqp為隊列和消息持久化提供的默認持久屬性使得即使在RabbitMQ重啟的情況下,消息還能保存下來。

For more information on Channel methods and MessageProperties, you can browse the javadocs online. For understanding the underlying foundation for spring-amqp you can find the rabbitmq-java-client.

關于channel方法和MessageProperties的更多信息,可以瀏覽在線的javadocs。若要了解spring-amqp的底層機制,可以參閱rabbitmq-java-client文檔

Now we can move on to tutorial 3 and learn how to deliver the same message to many consumers.

現(xiàn)在我們可以開始教程3,學習如何將相同的消息發(fā)送給多個消費者。

轉自我的博客:https://jiapengcai.github.io/posts/13955/

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容