nodejs amqplib 使用

生產(chǎn)者

class RabbitMq {
  constructor(options) {
    this.ex = 'XXX'
    this.exType = 'direct'
    this.durable = true
    this.routeKey = 'XX'
    this.autoDelete = true
    this.q = 'hello'
  }

  async send() {
    const conn = await amqp.connect(url)
    const msg = JSON.stringify({ a: "aa" })

    try {
      // const ch = await conn.createChannel()
      // 確認(rèn)消息發(fā)送 ok 猜測(cè)是開(kāi)啟 confirm 機(jī)制,對(duì)應(yīng)的監(jiān)聽(tīng)函數(shù)是什么呢?
      const ch = await conn.createConfirmChannel()
      const res = await ch.assertExchange(this.ex, this.exType, { durable: this.durable })

      var flag = 0
      while(flag < 4) {
// 實(shí)現(xiàn)消息持久化, 要exchange,queue,msg 三者同時(shí)持久化
/*
如果exchange根據(jù)自身類型和消息routeKey無(wú)法找到一個(gè)符合條件的queue,
那么會(huì)調(diào)用basic.return方法將消息返回給生產(chǎn)者(Basic.Return + Content-Header + Content-Body);
當(dāng)mandatory設(shè)置為false時(shí),出現(xiàn)上述情形broker會(huì)直接將消息扔掉
*/
        ch.publish(this.ex, this.routeKey, Buffer.from(msg), {
          persistent: true, // 消息持久化
          mandatory: true
        })
  // 確認(rèn)消息已經(jīng)入隊(duì), 返回錯(cuò)誤 是啥樣? 錯(cuò)誤怎么處理?直接close?
        const res2 = await ch.waitForConfirms()
        console.log('==res2==', res2)

        console.log(" [x] Sent '%s'", msg);

        await timeout(2000)
        flag++
      }
      ch.close()
    } catch (e) {
      console.log('==e==', e)
      ch.close()
    }
  }
}

const rabbit = new RabbitMq({})

rabbit.send()

消費(fèi)端

class RabbitMq {
  constructor(options) {
    this.ex = 'XXX'
    this.exType = 'direct'
    this.durable = true
    this.routeKey = 'XX'
    this.autoDelete = true
    this.q = 'hello'
  }

  async send() {
    const conn = await amqp.connect(url)

    try {
      const ch = await conn.createChannel()
      // 確認(rèn)消息發(fā)送 ok
      const res = await ch.assertExchange(this.ex, this.exType, { durable: this.durable })
      // 此處 q 置空,用的是rabbitmq自動(dòng)生成的隊(duì)列名, exclusive 是生成排他隊(duì)列, 連接斷開(kāi)后就會(huì)自動(dòng)刪除
      const q = await ch.assertQueue('', { exclusive: false })

      console.log('==q=', q)
      // 隊(duì)列綁定 exchange
      ch.bindQueue(q.queue, this.ex, this.routeKey)

      ch.consume(q.queue, msg => {
        console.log('收到消息: ', msg)
         // 發(fā)送確認(rèn)消息
        ch.ack(msg)
      }, { noAck: false })

      // ch.close()
    } catch (e) {
      console.log('==e==', e)
      ch.close()
    }
  }
}

const rabbit = new RabbitMq({})

rabbit.send()
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,694評(píng)論 19 139
  • 嘿,老朋友,又見(jiàn)面了! 算起來(lái)這是我們第六次相見(jiàn),是第五次我們共同度過(guò)一段難忘的時(shí)光,其中有一次是我乘飛機(jī)飛越滇藏...
    水竹清月閱讀 467評(píng)論 0 0
  • 1.今天女友終于休假,我們倆不約而同的準(zhǔn)備一起去看《金剛狼3》。劇情雖然讓中國(guó)電影局刪減的狗血至極,但也把我們感動(dòng)...
    盧不斯拍個(gè)人短片閱讀 671評(píng)論 3 5
  • 【從本視頻中學(xué)到最重要的概念】 Try to understand the structure of the ar...
    曉豬佩琪閱讀 281評(píng)論 3 0

友情鏈接更多精彩內(nèi)容