从rabbitMQ获取所有消息

问题描述:

我试图从rabbitMQ队列获取所有消息。从rabbitMQ获取所有消息

const messages = await rabbit.getMessages(outputQueue, false); 

这里是getMessages方法的实现。问题是它只处理3-5条消息并调用'resolve'。一段时间后,它处理休息消息,但'解决'已被调用,并且不能再次执行。

const amqp = require('amqplib'); 
. 
. 
let amqpUrl; 
let queueConf; 

const init = (connection, queue) => { 
    amqpUrl =`amqp://${connection.user}:${connection.password}@${connection.host}:${connectio n.port}`; 
    if (connection.vhost) { 
amqpUrl = `amqp://${connection.user}:${connection.password}@${connection.host}:${connection.port}/${connection.vhost}`; 
    } 
    queueConf = queue; 
} 

const getChannel =() => new Promise((resolve) => { 
    amqp.connect(amqpUrl).then((conn) => { 
    conn.createChannel().then((ch) => { 
     ch.prefetch(1000).then(() => resolve(ch)) 
    }) 
    }) 
}) 

module.exports = (connection, queue) => { 
    init(connection, queue); 
    return { 
    getMessages: (queueName, cleanQueue) => new Promise((resolve) => { 
     let messages = []; 
     let i = 1; 
     getChannel().then((ch) => { 
     ch.consume(queueName, (msg) => { 
      messages.push(msg); 
      console.log(msg.content.toString()) 
     }, { noAck: cleanQueue }).then(() => { 
      logger.info(`Retreived ${messages.length} messages from ${queueName}`); 
      resolve(messages) 
     }) 
     }) 
    }) 
    . 
    . 
    }; 
    }; 

在此先感谢!

+1

证明,承诺不适用于所有的解决方案 - 并承诺绝对不适合这样的事情 –

你可以这样做,但它会非常慢,并且如果消息被添加到队列中的速度比消耗速度快,它可能永远不会解决。从本质上讲,你一直在同一时间得到一个消息,直到channel.get()解决了false,而不是消息对象:

getMessages: (queueName, cleanQueue) => { 
    let messages = [] 
    let i = 1 
    return getChannel().then(function getMessage (ch) { 
    return ch.get(queueName, { noAck: cleanQueue }).then((msg) => { 
     if (msg) { 
     messages.push(msg) 
     return getMessage(ch) 
     } else { 
     logger.info(`Retrieved ${messages.length} messages from ${queueName}`) 
     return messages 
     } 
    }) 
    }).catch((err) => { 
    err.consumedMessages = messages 
    return Promise.reject(err) 
    }) 
}