public class MyMq {
//隊列最大長度
private static final int MAX_LENG = 10;
//隊列
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(MAX_LENG);
//生成者
class Producter extends Thread{
@Override
public void run() {
synchronized (queue){
if(queue.size()>MAX_LENG){
try {
//最大長度 生產者等待
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("當前隊列已滿....");
queue.notify();
}
}else {
//生產消息
queue.add(queue.size()+1);
System.out.println("往隊列里添加一條消息....");
queue.notify();
//為了測試睡眠1秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
//消費者
class Consumer extends Thread{
@Override
public void run() {
//消費者持續(xù)工作
while (true){
//加鎖保證安全
synchronized (queue){
//隊列為空
if(queue.isEmpty()){
System.out.println("當前隊列為空....");
try {
//等待
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
//出現(xiàn)異常手動喚醒
queue.notify();
}
}else {
//消費者消費
System.out.println("消費者開始消費...");
queue.poll();
//喚醒生產者
queue.notify();
}
}
}
}
}
public static void main(String[] args) {
MyMq mq = new MyMq();
Consumer consumer = mq.new Consumer();
consumer.start();
for (int i = 0; i < 10; i++) {
Producter producter = mq.new Producter();
producter.start();
}
}
}
JAVA實現(xiàn)一個簡單的消息隊列
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
相關閱讀更多精彩內容
- 延時消息隊列可以使用redis的zset來實現(xiàn),將消息序列化為一個字符串作為zset的value,消息到期時間作為...
- 在沒有搭建消息中間件(比如rabbitMQ、rocketMQ、kafka)的情況下,可以使用Redis來實現(xiàn)消息隊...
- 本例是Bartosz Milewski C++11 Concurrency課程中第9課的代碼,使用std::con...
- 生產者使用 LPUSH 發(fā)布消息: 消費者這一側,使用 RPOP 拉取消息: 這個模型非常簡單,也很容易理解。 它...