問題介紹
在開發(fā)中,遇到了這樣一個問題,我們使用ActivateMQ來接收處理消息,然后調(diào)用人工智能的算法去處理數(shù)據(jù),但是算法處理的速度太慢,跟不上消息的接收速度,限制于硬件的問題,算法也沒辦法增加更多的服務(wù)器來進行并發(fā)處理。所以導(dǎo)致消息堆積,處理的延遲越來越大,新的數(shù)據(jù)得不到處理。而我們待處理的消息,可以容忍丟棄一部分,所以想到了,如果處理不完的數(shù)據(jù),可以丟棄,保證最新的數(shù)據(jù)能夠得到處理。
生產(chǎn)者消費者模式介紹
生產(chǎn)者消費者模式,是一種常見的設(shè)計模式。該模式將消息的生成者和消費者分開,還有一個緩沖區(qū)處于生產(chǎn)者和消費者之間,作為一個中介。生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū),而消費者從緩沖區(qū)取出數(shù)據(jù)。
使用mq隊列,也是一種服務(wù)之間的生產(chǎn)者消費者模式,其緩沖區(qū)即mq隊列。而在java單應(yīng)用中,生產(chǎn)者消費者模式,一般使用阻塞隊列來實現(xiàn),即BlockingQueue接口,來充當緩沖區(qū)。
BlockingQueue簡介
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?。當隊列滿時,存儲元素的線程會等待隊列可用。生產(chǎn)者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產(chǎn)者存放元素的容器,而消費者也只從容器里拿元素。
BlockingQueue的接口方法:
| 方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
|---|---|---|---|---|
| 插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除方法 | remove() | poll() | take() | poll(time,unit) |
| 檢查方法 | element() | peek() | 不可用 | 不可用 |
上述這些方法不再詳細介紹了,大家可以查閱其他資料。
容忍丟棄的消費者實現(xiàn)(使用BlockingQueue實現(xiàn))
消費者,使用生產(chǎn)者消費者模式實現(xiàn)。消費時,不是直接進行處理,而是將消息添加到阻塞隊列中,如果指定的時間內(nèi),沒有添加進去,就跳過該條消息,這條消息可丟棄,或者保存數(shù)據(jù)庫等,以便后續(xù)處理。
將消息添加到阻塞隊列中時,通過指定的時間,可以計算出該消費者的最低并發(fā),如指定1s,則最低并發(fā)為1,即如果處理消息時,處理速度大于1個/s,則該消費者并發(fā)降到最低,變?yōu)?。所以為了保證mq中消息不堆積,可以設(shè)置添加阻塞隊列的超時時間,和mq中接收消息的間隔一致。
如果有多個mq的監(jiān)聽消費者,則根據(jù)多消費者來計算,確定阻塞隊列的超時時間:
阻塞隊列的超時時間 = mq中接收消息的間隔 * mq監(jiān)聽者數(shù)量
(當然系統(tǒng)運行時,消息推送處理等會有耗時,阻塞隊列的超時時間應(yīng)當適當小一些,保證mq消息不堆積)
阻塞隊列消費時,使用線程池,啟動多個線程,讀取阻塞隊列,處理消息。
代碼示例:
package com.pu.WebMagicPro;
import java.util.concurrent.*;
public class ConsumerTest {
/**
* 使用多線程處理消息
*/
private Executor executor = new ThreadPoolExecutor(3, 5,
10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512), // 使用有界隊列,避免OOM
new ThreadPoolExecutor.DiscardPolicy());
/**
* 隊列長度
*/
private static final int QUEUE_SIZE = 3;
/**
* 消費者處理隊列
*/
private BlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
/**
* 啟動讀取隊列內(nèi)容的標記
*/
private volatile boolean readQueue = false;
/**
* 消費方法
* @param content 消息內(nèi)容
*/
public void consumer(String content) {
try {
// 消費消息,1s未插入隊列,則消費不成功
boolean success = queue.offer(content, 1, TimeUnit.SECONDS);
if (!success) {
System.out.println("消息丟棄:" + content); // 消息消費不成功,可以直接丟棄,或者保存到數(shù)據(jù)庫中等,使用其他辦法處理
}
if (!readQueue) {
readQueue = true;
// 啟動多個線程消費隊列
for (int i = 0; i < QUEUE_SIZE; i++) {
executor.execute(this::startRead);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void startRead() {
while (readQueue) {
String content = null;
try {
// 讀取消息
content = queue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (content != null) {
// 處理消息
dealContent(content);
}
}
}
/**
* 模擬實際的處理方法
* @param content 消息
*/
private void dealContent(String content) {
// 模擬處理過程
System.out.println("處理: " + content);
try {
// 線程睡眠5秒,模擬處理用時
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("處理完成: " + content);
}
public static void main(String[] args) {
ConsumerTest consumer = new ConsumerTest();
int size = 5;
// 開啟5個線程進行消息的推送,每個線程推送10次消息
for (int i = 0; i < size; i++) {
final int t = i;
Thread thread = new Thread(() -> {
for (int j = 0; j < 10; j++) {
consumer.consumer("線程:" + t + ",消息" + j);
}
});
thread.start();
}
}
}