nodejs amqplib 使用 · 物联网平台-威尼斯人最新

niubi · 2020年05月25日 · 最后由 回复于 2022年08月19日 · 330 次阅读

生产者

class rabbitmq {
  constructor(options) {
    this.ex = 'xxx'
    this.extype = 'direct'
    this.durable = true
    this.routekey = 'xx'
    this.autodelete = true
    this.q = 'hello'
  }
  async send() {
    const conn = await amqp.connect(url)
    const msg = json.stringify({ a: "aa" })
    try {
      // const ch = await conn.createchannel()
      // 确认消息发送 ok 猜测是开启 confirm 机制,对应的监听函数是什么呢?
      const ch = await conn.createconfirmchannel()
      const res = await ch.assertexchange(this.ex, this.extype, { durable: this.durable })
      var flag = 0
      while(flag < 4) {
// 实现消息持久化, 要exchange,queue,msg 三者同时持久化
/*
如果exchange根据自身类型和消息routekey无法找到一个符合条件的queue,
那么会调用basic.return方法将消息返回给生产者(basic.return   content-header   content-body);
当mandatory设置为false时,出现上述情形broker会直接将消息扔掉
*/
        ch.publish(this.ex, this.routekey, buffer.from(msg), {
          persistent: true, // 消息持久化
          mandatory: true
        })
  // 确认消息已经入队, 返回错误 是啥样? 错误怎么处理?直接close?
        const res2 = await ch.waitforconfirms()
        console.log('==res2==', res2)
        console.log(" [x] sent '%s'", msg);
        await timeout(2000)
        flag  
      }
      ch.close()
    } catch (e) {
      console.log('==e==', e)
      ch.close()
    }
  }
}
const rabbit = new rabbitmq({})
rabbit.send()

消费端

class rabbitmq {
  constructor(options) {
    this.ex = 'xxx'
    this.extype = 'direct'
    this.durable = true
    this.routekey = 'xx'
    this.autodelete = true
    this.q = 'hello'
  }
  async send() {
    const conn = await amqp.connect(url)
    try {
      const ch = await conn.createchannel()
      // 确认消息发送 ok
      const res = await ch.assertexchange(this.ex, this.extype, { durable: this.durable })
      // 此处 q 置空,用的是rabbitmq自动生成的队列名, exclusive 是生成排他队列, 连接断开后就会自动删除
      const q = await ch.assertqueue('', { exclusive: false })
      console.log('==q=', q)
      // 队列绑定 exchange
      ch.bindqueue(q.queue, this.ex, this.routekey)
      ch.consume(q.queue, msg => {
        console.log('收到消息: ', msg)
         // 发送确认消息
        ch.ack(msg)
      }, { noack: false })
      // ch.close()
    } catch (e) {
      console.log('==e==', e)
      ch.close()
    }
  }
}
const rabbit = new rabbitmq({})
rabbit.send()
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册
网站地图