解決MQ消費者消息堆積問題(MQ的消費者消息丟棄功能)

問題介紹

在開發(fā)中,遇到了這樣一個問題,我們使用ActivateMQ來接收處理消息,然后調(diào)用人工智能的算法去處理數(shù)據(jù),但是算法處理的速度太慢,跟不上消息的接收速度,限制于硬件的問題,算法也沒辦法增加更多的服務(wù)器來進行并發(fā)處理。所以導(dǎo)致消息堆積,處理的延遲越來越大,新的數(shù)據(jù)得不到處理。而我們待處理的消息,可以容忍丟棄一部分,所以想到了,如果處理不完的數(shù)據(jù),可以丟棄,保證最新的數(shù)據(jù)能夠得到處理。

生產(chǎn)者消費者模式介紹

生產(chǎn)者消費者模式,是一種常見的設(shè)計模式。該模式將消息的生成者和消費者分開,還有一個緩沖區(qū)處于生產(chǎn)者和消費者之間,作為一個中介。生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū),而消費者從緩沖區(qū)取出數(shù)據(jù)。
使用mq隊列,也是一種服務(wù)之間的生產(chǎn)者消費者模式,其緩沖區(qū)即mq隊列。而在java單應(yīng)用中,生產(chǎn)者消費者模式,一般使用阻塞隊列來實現(xiàn),即BlockingQueue接口,來充當緩沖區(qū)。

BlockingQueue簡介

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?。當隊列滿時,存儲元素的線程會等待隊列可用。生產(chǎn)者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產(chǎn)者存放元素的容器,而消費者也只從容器里拿元素。

BlockingQueue的接口方法:

方法\處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用

上述這些方法不再詳細介紹了,大家可以查閱其他資料。

容忍丟棄的消費者實現(xiàn)(使用BlockingQueue實現(xiàn))

消費者,使用生產(chǎn)者消費者模式實現(xiàn)。消費時,不是直接進行處理,而是將消息添加到阻塞隊列中,如果指定的時間內(nèi),沒有添加進去,就跳過該條消息,這條消息可丟棄,或者保存數(shù)據(jù)庫等,以便后續(xù)處理。
將消息添加到阻塞隊列中時,通過指定的時間,可以計算出該消費者的最低并發(fā),如指定1s,則最低并發(fā)為1,即如果處理消息時,處理速度大于1個/s,則該消費者并發(fā)降到最低,變?yōu)?。所以為了保證mq中消息不堆積,可以設(shè)置添加阻塞隊列的超時時間,和mq中接收消息的間隔一致。
如果有多個mq的監(jiān)聽消費者,則根據(jù)多消費者來計算,確定阻塞隊列的超時時間:
阻塞隊列的超時時間 = mq中接收消息的間隔 * mq監(jiān)聽者數(shù)量
(當然系統(tǒng)運行時,消息推送處理等會有耗時,阻塞隊列的超時時間應(yīng)當適當小一些,保證mq消息不堆積)

阻塞隊列消費時,使用線程池,啟動多個線程,讀取阻塞隊列,處理消息。

代碼示例:

package com.pu.WebMagicPro;

import java.util.concurrent.*;

public class ConsumerTest {

    /**
     * 使用多線程處理消息
     */
    private Executor executor = new ThreadPoolExecutor(3, 5,
            10, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(512), // 使用有界隊列,避免OOM
            new ThreadPoolExecutor.DiscardPolicy());

    /**
     * 隊列長度
     */
    private static final int QUEUE_SIZE = 3;

    /**
     * 消費者處理隊列
     */
    private BlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);

    /**
     * 啟動讀取隊列內(nèi)容的標記
     */
    private volatile boolean readQueue = false;


    /**
     * 消費方法
     * @param content 消息內(nèi)容
     */
    public void consumer(String content) {
        try {
            // 消費消息,1s未插入隊列,則消費不成功
            boolean success = queue.offer(content, 1, TimeUnit.SECONDS);
            if (!success) {
                System.out.println("消息丟棄:" + content);     // 消息消費不成功,可以直接丟棄,或者保存到數(shù)據(jù)庫中等,使用其他辦法處理
            }

            if (!readQueue) {
                readQueue = true;
                // 啟動多個線程消費隊列
                for (int i = 0; i < QUEUE_SIZE; i++) {
                    executor.execute(this::startRead);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void startRead() {
        while (readQueue) {
            String content = null;
            try {
                // 讀取消息
                content = queue.poll(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (content != null) {
                // 處理消息
                dealContent(content);
            }
        }
    }

    /**
     * 模擬實際的處理方法
     * @param content 消息
     */
    private void dealContent(String content) {
        // 模擬處理過程
        System.out.println("處理: " + content);
        try {
            // 線程睡眠5秒,模擬處理用時
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("處理完成: " + content);
    }

    public static void main(String[] args) {
        ConsumerTest consumer = new ConsumerTest();
        int size = 5;
        // 開啟5個線程進行消息的推送,每個線程推送10次消息
        for (int i = 0; i < size; i++) {
            final int t = i;
            Thread thread = new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    consumer.consumer("線程:" + t + ",消息" + j);
                }
            });
            thread.start();
        }
    }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容