RabbitMQ for Node 官方文檔筆錄

Hello World

RabbitMQ 接收處理和轉(zhuǎn)發(fā)二進(jìn)制blob數(shù)據(jù)

一些術(shù)語

  • Producer:發(fā)送消息的程序
  • Producing:發(fā)送消息
  • Queue:消息緩沖器,暫存消息,在RabbitMQ中
  • Consuming:等待接收消息
  • Consumer:等待并接收消息的程序

使用amqp.node客戶端

任務(wù)需求

  1. 安裝RabbitMQ for Node模塊(amqp.node)
  2. 編寫消息發(fā)送程序
  3. 編寫消息接收程序
  4. 運(yùn)行測試代碼

step1 安裝客戶端模塊

npm install amqplib

發(fā)送

var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn){
    // 創(chuàng)建一個頻道,是大多數(shù)API所在的位置
    conn.createChannel(function(err,ch){
        var q = 'hello'; // 對列名
        // 聲明一個隊列,該操作是冪等的,隊列不存在才會創(chuàng)建
        ch.assertQueue(q,{durable:false});
        // 向隊列發(fā)送消息,消息使用字節(jié)數(shù)組發(fā)送(Buffer)
        ch.sendToQueue(q,new Buffer('Hello World'));
        console.log('[x] Sent \'Hello World\'');
    });
});

接收

var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn){
    // 創(chuàng)建頻道
    conn.createChannel(function(err, ch){
        var q = 'hello'; 
        ch.assertQueue(q,{durable:false});  // 聲明隊列
        // 注冊監(jiān)聽
        ch.consume(q, function(msg){
            console.log('[x] Received%s ', msg.content.toString());
        },{noAck:true});
    });
});

Work Queues 工作隊列

WorkQueue用于在多個Consumer之間分配耗時的任務(wù)。

實現(xiàn)思想

流程

將任務(wù)封裝成消息并將其發(fā)送到隊列,工作進(jìn)程彈出任務(wù)并最終執(zhí)行作業(yè)。

實現(xiàn)

MQ充當(dāng)任務(wù)隊列,Producer向MQ中發(fā)送任務(wù),多個Consumer領(lǐng)取任務(wù)并執(zhí)行。

Step 1 實現(xiàn)Producer 和 Consumer

Producer

Producer的代碼

// new_task.js 每次啟動后往隊列中發(fā)送一條task

var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost',function(err,conn){
    conn.createChannel(function(err,ch){
        var q = 'test_queue';
        var msg = 'New Task!';
        ch.assertQueue(q, {durable:true});
        // 發(fā)送消息
        ch.sendToQueue(q, new Buffer(msg), {persistent: true});
        console.log('[x]Sent \'%s\'',msg);
    }); 
});

Consumer

Consumer的代碼

// worker.js模擬一個消費(fèi)者,監(jiān)聽并接收MQ分配給自己的task。

var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost',function(err,conn){
    conn.createChannel(function(err, ch){
        var q = 'test_queue';
        ch.assertQueue(q, {durable:true});
        // 監(jiān)聽MQ發(fā)送的消息,并在calback中處理
        ch.consume(q, function(msg){
            var secs = msg.content.toString().split('.').length -1;
            console.log('[x] Received %s',msg.content.toString());
            // 模擬處理時間
            setTimeout(function(){
                console.log('[x] Done');
            },secs * 1000);
        },{noAck: true});
    });
});

啟動步驟

  1. 打開多個控制臺,啟動多個worker.js。
  2. 啟動多次new_task.js,向MQ中發(fā)送多條信息。

結(jié)果:每個worker依次接收到任務(wù)并打印。

默認(rèn)情況下,RabbitMQ將按順序?qū)⒚織l消息發(fā)送給下一個消費(fèi)者。
平均而言,每個消費(fèi)者將獲得相同數(shù)量的消息。這種分發(fā)消息的方式稱為循環(huán)法。

問題:在實際應(yīng)用中,上述程序存在一些問題

  1. 每個任務(wù)都會消耗一定的時間,這個時間是不確定的。
  2. 上面的程序中,當(dāng)worker領(lǐng)取任務(wù)后,消息便會從MQ中彈出,不論是否執(zhí)行成功。若執(zhí)行失敗,則任務(wù)丟失。

解決方案:使用消息確認(rèn)解決。

Step 2 使用消息確認(rèn)機(jī)制

為了確保消息永不丟失,RabbitMQ支持消息確認(rèn)

消費(fèi)者發(fā)回ack(acknowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由刪除它。

如果Consumer死亡(其通道關(guān)閉,連接關(guān)閉或TCP連接丟失)而不發(fā)送ack,RabbitMQ會將該消息重新排隊。如果其他消費(fèi)者同時在線,則會迅速將其重新發(fā)送給其他消費(fèi)者。這樣就可以確保沒有消息丟失,即使Consumer偶爾會死亡。

實現(xiàn)原理

使用consume方法的noAck屬性,將其設(shè)置成false開啟消息確認(rèn)。

然后調(diào)用Chanel.ack(MSG)方法應(yīng)答MQ

代碼如下

ch.consume(q, function(msg){ 
    // ...do something sync
    setTimeout(function(){
        // ...do something async
        ch.ack(MSG) // 發(fā)送ack
    },1000);
 },{noAck: true}); // 開啟ack

注意:確認(rèn)必須在收到的交付的同一信道上發(fā)送。嘗試使用不同的通道進(jìn)行確認(rèn)將導(dǎo)致通道級協(xié)議異常。

Step3 消息持久化

當(dāng)RabbitMQ退出或崩潰時,它將丟失隊列和消息。要確保消息不會丟失,我們需要將隊列和消息都標(biāo)記為持久。

開啟隊列的持久化

// 聲明隊列時,設(shè)置durable屬性為true開啟隊列持久化
ch.assertQueue('hello', {durable: true}); 

開啟消息的持久化

// 發(fā)送消息時,設(shè)置persistent屬性為true,讓RabbitMQ持久化當(dāng)前消息
ch.sendToQueue(q, new Buffer(msg), {persistent: true});

有關(guān)消息持久化的注意事項

注意:將消息標(biāo)記為持久性并不能完全保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,但是當(dāng)RabbitMQ接受消息并且尚未保存消息時,仍然有一個短時間窗口期。如果需要更強(qiáng)的保證,那么可以使用 發(fā)布者確認(rèn)。

Step4 公平派遣任務(wù)

問題

平均依次分配任務(wù)不能保證每個worker接收到相同分量的任務(wù)。當(dāng)出現(xiàn)一些任務(wù)很重,另一些很輕時,經(jīng)常會出現(xiàn)一個worker將經(jīng)常忙碌而另一個worker經(jīng)??臻e。

解決方案

使用prefetch方法控制worker同時最多接收的任務(wù)數(shù)量。當(dāng)worker待處理的任務(wù)達(dá)到最大數(shù)量時,MQ不會向其發(fā)送新任務(wù),而是會向空閑的worker發(fā)送新任務(wù)。使用這種機(jī)制可以達(dá)到負(fù)載均衡的效果。

Consumer(Worker)的代碼

// ...imports
amqp.connect('amqp://localhost',function(err,conn){
    conn.createChannel(function(err,ch){
        var q = 'task_queue';
        ch.prefetch(1); // 當(dāng)有一個任務(wù)未完成時,不再接受新任務(wù)
        ch.consume(q,function(msg){
            // ...do something 
            setTimeout(()=>{
                // ...do something async
                 ch.ack(MSG); // 確認(rèn)執(zhí)行完畢
            },1000);
        },{noAck:false});
    });
});

Publish/Subscribe

Routing

Topics

RPC

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,692評論 19 139
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,514評論 2 34
  • Kafka簡介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計目標(biāo)如下: 以時間復(fù)雜度為O(1)的方...
    Alukar閱讀 3,155評論 0 43
  • 背景介紹 Kafka簡介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計目標(biāo)如下: 以時間復(fù)雜度為O...
    高廣超閱讀 13,062評論 8 167
  • 最近會思考一個問題:python3 中 while 與 while true 有啥不區(qū)別 ? while True...
    liqun奮斗struggle閱讀 37,489評論 1 7

友情鏈接更多精彩內(nèi)容