rabbitMQ+thrift傳送消息

Apache thrift是一個(gè)開源的RPC框架,看到跟protocol buffer一樣也適用多種語(yǔ)言,就想著用rabbitMQ來處理thrift的消息,因?yàn)閠hrift字節(jié)比pb更少,可以適用于大量傳送數(shù)據(jù)的場(chǎng)景,例如,每個(gè)消息10K,傳送100條這樣的消息,就是10*100=1M,但用thrift可以壓縮40%,這里數(shù)據(jù)就少得很可觀了。
這里就記錄一下,我的使用過程:
首先定義一個(gè)Message.thrift

struct Message {
    1: i32 messageid,
    2: string message
}

然后用thrift生成相應(yīng)的js文件

thrift --gen js:node Message.thrift

這里生成了一個(gè)Message_types.js文件
這就是我們用來序列化數(shù)據(jù)的文件
好了,我們?cè)儆胣ode寫一個(gè)rabbitMQ的發(fā)送文件
這里我們就叫send.js

var thrift = require('thrift');
var Message = require('./gen-nodejs/Message_types').Message;
var amqp = require('amqplib/callback_api');

var transport = new thrift.TBufferedTransport();
var protocol = new thrift.TBinaryProtocol(transport);

var AMPQ_URI = 'amqp://localhost:5672';

amqp.connect(AMPQ_URI, function(err, conn){
    conn.createChannel(function(err, ch){
        var q = 'hello';

        var buf = obj2buf({messageid:1,message:'{message:"1234"}'});

        ch.assertQueue(q, {durable: false});
        ch.sendToQueue(q, buf);
        console.log(" [x] Send Data Finish");
    });
    setTimeout(function(){
        conn.close();
        process.exit(0);
    }, 500);
})

/**
 * 將對(duì)象轉(zhuǎn)換成buffer
 * @param  {[type]} obj [description]
 * @return {[type]}     [description]
 */
var obj2buf = function(obj){
    var message = new Message(obj);
    message.write(protocol);
    var outBuffers = transport.outBuffers;
    var outCount = transport.outCount;
    var result = new Buffer(outCount);
    var pos = 0;
    outBuffers.forEach(function(buf) {
      buf.copy(result, pos, 0);
      pos += buf.length;
    });
    return result;
}

其中obj2buf就是thrift將數(shù)據(jù)轉(zhuǎn)換成buffer的方法,別問我怎么得來的,我也是從網(wǎng)上找的,但這個(gè)方法能用,自己親測(cè)
我們?cè)賹懸粋€(gè)receiver.js,這個(gè)方法是用來處理rabbitMQ消息的

var thrift = require('thrift');
var Message = require('./gen-nodejs/Message_types').Message;
var amqp = require('amqplib/callback_api');

var transport = new thrift.TBufferedTransport();
var protocol = new thrift.TBinaryProtocol(transport);

var AMQP_URI = 'amqp://localhost:5672';

amqp.connect(AMQP_URI, function(err, conn){
    conn.createChannel(function(err, ch){
        var q = 'hello';

        ch.assertQueue(q, {durable: false});
        console.log('[*] Waiting for message in %s. To exit press CTRL+C', q);
        ch.consume(q, function(msg){
            // console.log(msg);

            var message = buf2obj(msg.content);
            
            console.log(message);
            console.log('[x] Received Data Finish');
        }, {noAck: true});
    })
})

/**
 * 將buffer轉(zhuǎn)換成對(duì)象
 * @param  {[type]} buffer [description]
 * @return {[type]}     [description]
 */
var buf2obj = function(buffer){
    var data = buffer;
    data.copy(transport.inBuf, transport.writeCursor, 0);
    transport.writeCursor += data.length;
    var message = new Message();
    message.read(protocol);
    return message;
}

這里的buf2obj就是將buffer轉(zhuǎn)換成對(duì)象,
rabbitMQ里面?zhèn)魉拖⒍际且詁uffer類型。
好了,我們可以先跑

node receiver.js
屏幕快照 2017-05-18 下午3.10.07.png

再開一個(gè)窗口運(yùn)行

node send.js
屏幕快照 2017-05-18 下午3.10.18.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,534評(píng)論 19 139
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,507評(píng)論 2 34
  • 轉(zhuǎn)自:http://blog.csdn.net/kesonyk/article/details/50924489 ...
    晴天哥_王志閱讀 25,330評(píng)論 2 38
  • 問題導(dǎo)讀: 1.如何構(gòu)建高并發(fā)電商平臺(tái)架構(gòu) 2.哈希、B樹、倒排、bitmap的作用是什么? 3.作為軟件工程師,...
    MaLiang閱讀 5,257評(píng)論 1 70
  • 曾幻想幾許時(shí)跟未來家公家婆見面的場(chǎng)景和以后相處的樣子,現(xiàn)在還沒看到未來家婆,她已經(jīng)很熱情跟我聊天,各種關(guān)心,前幾天...
    星月菩提子閱讀 466評(píng)論 0 0

友情鏈接更多精彩內(nèi)容