从rabbitMQ获取所有消息
问题描述:
我试图从rabbitMQ队列获取所有消息。从rabbitMQ获取所有消息
const messages = await rabbit.getMessages(outputQueue, false);
这里是getMessages方法的实现。问题是它只处理3-5条消息并调用'resolve'。一段时间后,它处理休息消息,但'解决'已被调用,并且不能再次执行。
const amqp = require('amqplib');
.
.
let amqpUrl;
let queueConf;
const init = (connection, queue) => {
amqpUrl =`amqp://${connection.user}:${connection.password}@${connection.host}:${connectio n.port}`;
if (connection.vhost) {
amqpUrl = `amqp://${connection.user}:${connection.password}@${connection.host}:${connection.port}/${connection.vhost}`;
}
queueConf = queue;
}
const getChannel =() => new Promise((resolve) => {
amqp.connect(amqpUrl).then((conn) => {
conn.createChannel().then((ch) => {
ch.prefetch(1000).then(() => resolve(ch))
})
})
})
module.exports = (connection, queue) => {
init(connection, queue);
return {
getMessages: (queueName, cleanQueue) => new Promise((resolve) => {
let messages = [];
let i = 1;
getChannel().then((ch) => {
ch.consume(queueName, (msg) => {
messages.push(msg);
console.log(msg.content.toString())
}, { noAck: cleanQueue }).then(() => {
logger.info(`Retreived ${messages.length} messages from ${queueName}`);
resolve(messages)
})
})
})
.
.
};
};
在此先感谢!
答
你可以这样做,但它会非常慢,并且如果消息被添加到队列中的速度比消耗速度快,它可能永远不会解决。从本质上讲,你一直在同一时间得到一个消息,直到channel.get()
解决了false
,而不是消息对象:
getMessages: (queueName, cleanQueue) => {
let messages = []
let i = 1
return getChannel().then(function getMessage (ch) {
return ch.get(queueName, { noAck: cleanQueue }).then((msg) => {
if (msg) {
messages.push(msg)
return getMessage(ch)
} else {
logger.info(`Retrieved ${messages.length} messages from ${queueName}`)
return messages
}
})
}).catch((err) => {
err.consumedMessages = messages
return Promise.reject(err)
})
}
证明,承诺不适用于所有的解决方案 - 并承诺绝对不适合这样的事情 –