Hello World
RabbitMQ 接收處理和轉(zhuǎn)發(fā)二進(jìn)制blob數(shù)據(jù)
一些術(shù)語
- Producer:發(fā)送消息的程序
- Producing:發(fā)送消息
- Queue:消息緩沖器,暫存消息,在RabbitMQ中
- Consuming:等待接收消息
- Consumer:等待并接收消息的程序
使用amqp.node客戶端
任務(wù)需求
- 安裝RabbitMQ for Node模塊(amqp.node)
- 編寫消息發(fā)送程序
- 編寫消息接收程序
- 運(yùn)行測試代碼
step1 安裝客戶端模塊
npm install amqplib
發(fā)送
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn){
// 創(chuàng)建一個頻道,是大多數(shù)API所在的位置
conn.createChannel(function(err,ch){
var q = 'hello'; // 對列名
// 聲明一個隊列,該操作是冪等的,隊列不存在才會創(chuàng)建
ch.assertQueue(q,{durable:false});
// 向隊列發(fā)送消息,消息使用字節(jié)數(shù)組發(fā)送(Buffer)
ch.sendToQueue(q,new Buffer('Hello World'));
console.log('[x] Sent \'Hello World\'');
});
});
接收
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn){
// 創(chuàng)建頻道
conn.createChannel(function(err, ch){
var q = 'hello';
ch.assertQueue(q,{durable:false}); // 聲明隊列
// 注冊監(jiān)聽
ch.consume(q, function(msg){
console.log('[x] Received%s ', msg.content.toString());
},{noAck:true});
});
});
Work Queues 工作隊列
WorkQueue用于在多個Consumer之間分配耗時的任務(wù)。
實現(xiàn)思想
流程
將任務(wù)封裝成消息并將其發(fā)送到隊列,工作進(jìn)程彈出任務(wù)并最終執(zhí)行作業(yè)。
實現(xiàn)
MQ充當(dāng)任務(wù)隊列,Producer向MQ中發(fā)送任務(wù),多個Consumer領(lǐng)取任務(wù)并執(zhí)行。
Step 1 實現(xiàn)Producer 和 Consumer
Producer
Producer的代碼
// new_task.js 每次啟動后往隊列中發(fā)送一條task
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost',function(err,conn){
conn.createChannel(function(err,ch){
var q = 'test_queue';
var msg = 'New Task!';
ch.assertQueue(q, {durable:true});
// 發(fā)送消息
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
console.log('[x]Sent \'%s\'',msg);
});
});
Consumer
Consumer的代碼
// worker.js模擬一個消費(fèi)者,監(jiān)聽并接收MQ分配給自己的task。
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost',function(err,conn){
conn.createChannel(function(err, ch){
var q = 'test_queue';
ch.assertQueue(q, {durable:true});
// 監(jiān)聽MQ發(fā)送的消息,并在calback中處理
ch.consume(q, function(msg){
var secs = msg.content.toString().split('.').length -1;
console.log('[x] Received %s',msg.content.toString());
// 模擬處理時間
setTimeout(function(){
console.log('[x] Done');
},secs * 1000);
},{noAck: true});
});
});
啟動步驟
- 打開多個控制臺,啟動多個worker.js。
- 啟動多次new_task.js,向MQ中發(fā)送多條信息。
結(jié)果:每個worker依次接收到任務(wù)并打印。
默認(rèn)情況下,RabbitMQ將按順序?qū)⒚織l消息發(fā)送給下一個消費(fèi)者。
平均而言,每個消費(fèi)者將獲得相同數(shù)量的消息。這種分發(fā)消息的方式稱為循環(huán)法。
問題:在實際應(yīng)用中,上述程序存在一些問題
- 每個任務(wù)都會消耗一定的時間,這個時間是不確定的。
- 上面的程序中,當(dāng)worker領(lǐng)取任務(wù)后,消息便會從MQ中彈出,不論是否執(zhí)行成功。若執(zhí)行失敗,則任務(wù)丟失。
解決方案:使用消息確認(rèn)解決。
Step 2 使用消息確認(rèn)機(jī)制
為了確保消息永不丟失,RabbitMQ支持消息確認(rèn)
消費(fèi)者發(fā)回ack(acknowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由刪除它。
如果Consumer死亡(其通道關(guān)閉,連接關(guān)閉或TCP連接丟失)而不發(fā)送ack,RabbitMQ會將該消息重新排隊。如果其他消費(fèi)者同時在線,則會迅速將其重新發(fā)送給其他消費(fèi)者。這樣就可以確保沒有消息丟失,即使Consumer偶爾會死亡。
實現(xiàn)原理
使用consume方法的noAck屬性,將其設(shè)置成false開啟消息確認(rèn)。
然后調(diào)用Chanel.ack(MSG)方法應(yīng)答MQ
代碼如下
ch.consume(q, function(msg){
// ...do something sync
setTimeout(function(){
// ...do something async
ch.ack(MSG) // 發(fā)送ack
},1000);
},{noAck: true}); // 開啟ack
注意:確認(rèn)必須在收到的交付的同一信道上發(fā)送。嘗試使用不同的通道進(jìn)行確認(rèn)將導(dǎo)致通道級協(xié)議異常。
Step3 消息持久化
當(dāng)RabbitMQ退出或崩潰時,它將丟失隊列和消息。要確保消息不會丟失,我們需要將隊列和消息都標(biāo)記為持久。
開啟隊列的持久化
// 聲明隊列時,設(shè)置durable屬性為true開啟隊列持久化
ch.assertQueue('hello', {durable: true});
開啟消息的持久化
// 發(fā)送消息時,設(shè)置persistent屬性為true,讓RabbitMQ持久化當(dāng)前消息
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
有關(guān)消息持久化的注意事項
注意:將消息標(biāo)記為持久性并不能完全保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,但是當(dāng)RabbitMQ接受消息并且尚未保存消息時,仍然有一個短時間窗口期。如果需要更強(qiáng)的保證,那么可以使用 發(fā)布者確認(rèn)。
Step4 公平派遣任務(wù)
問題
平均依次分配任務(wù)不能保證每個worker接收到相同分量的任務(wù)。當(dāng)出現(xiàn)一些任務(wù)很重,另一些很輕時,經(jīng)常會出現(xiàn)一個worker將經(jīng)常忙碌而另一個worker經(jīng)??臻e。
解決方案
使用prefetch方法控制worker同時最多接收的任務(wù)數(shù)量。當(dāng)worker待處理的任務(wù)達(dá)到最大數(shù)量時,MQ不會向其發(fā)送新任務(wù),而是會向空閑的worker發(fā)送新任務(wù)。使用這種機(jī)制可以達(dá)到負(fù)載均衡的效果。
Consumer(Worker)的代碼
// ...imports
amqp.connect('amqp://localhost',function(err,conn){
conn.createChannel(function(err,ch){
var q = 'task_queue';
ch.prefetch(1); // 當(dāng)有一個任務(wù)未完成時,不再接受新任務(wù)
ch.consume(q,function(msg){
// ...do something
setTimeout(()=>{
// ...do something async
ch.ack(MSG); // 確認(rèn)執(zhí)行完畢
},1000);
},{noAck:false});
});
});