工人阻止消费 - RabbitMQ - Node.js
问题描述:
我在Node.js中遇到了RabbitMQ问题。 我试图实现一个Pub/Sub连接器,其中每个用户都有自己的队列轮询以获取消息。 当我通过Postma发布邮件并且用户使用邮件时,我没有任何问题(我正确地获取邮件),而如果用户试图使用队列中的邮件(但不存在邮件),它会一直卡住,直到发布了新的发布,但我无法获得该消息。 我想要做的是放弃消耗,然后重试。 你能帮我解决吗?工人阻止消费 - RabbitMQ - Node.js
get_queue工作正常,并获取用户的个人队列。
路线
app.post('/consume', (req, res) => {
project.get_queue(req, res)
.then(result1 => {return project.consume(req, res, result1.message);})
.then(result2 => {res.status(result2.status).json({ message: result2.message });})
.catch(err => res.status(err.status >= 100 && err.status < 600 ? err.status : 500).send({message: err.message}))
});
控制器
exports.consume = function(req, res, user) {
return new Promise((resolve,reject) => {
const queue = user.queueName;
amqpConn.createChannel(function(err, ch) {
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });
ch.on("error", function(err){
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.assertQueue(queue, {durable: true},function(err, _ok){
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });
console.log("#Msg: ", _ok.messageCount)
if(_ok.messageCount === 0) {
reject({ status: 400, message: 'No consuming' });
}
else {
var json = [];
var topic = ch.consume(queue, function(msg, err) {
work(msg, function(ok) {
try {
if (ok) {
ch.ack(msg);
json.push(JSON.parse(msg.content.toString()));
resolve({ status: 200, message: json});
}
else {
ch.reject(msg, true);
resolve({ status: 200, message: json});
}
} catch (e) {
reject({ status: 404, message: "niente da consumare"});
closeOnErr(e);
}});
}, {noAck: false});
})
})
};
这是我做的那么远。
exports.consume = function(req, res, user) {
return new Promise((resolve,reject) => {
const queue = user.queueName;
amqpConn.createChannel(function(err, ch) {
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });;
ch.on("error", function(err){
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
var json = [];
iterate(ch, queue, json);
})
})
};
function iterate(ch, queue, json) {
ch.get(queue, {
noAck: false
}, function (err, msg) {
if (!msg) return resolve({
status: 200,
message: json
});
work(msg, function (ok) {
console.log("Errore?");
console.log("MSG consumed: ", msg.content.toString());
try {
if (ok) {
ch.ack(msg);
json.push(JSON.parse(msg.content.toString()));
return iterate(ch, queue, json);
}
ch.reject(msg, true);
return resolve({
status: 200,
message: json
});
} catch (e) {
reject({
status: 404,
message: "niente da consumare"
});
closeOnErr(e);
}
});
})}
答
使用ch.get于迭代:
exports.consume = function(req, res, user) {
return new Promise((resolve, reject) => {
const queue = user.queueName;
amqpConn.createChannel(function(err, ch) {
if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });;
ch.on("error", function(err){
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
var json = [];
return iterate(ch, queue, json, resolve, reject);
});
});
};
function iterate(ch, queue, json, resolve, reject) {
ch.get(queue, {
noAck: false
}, function (err, msg) {
if (!msg) return resolve({
status: 200,
message: json
});
work(msg, function (ok) {
console.log("Errore?");
console.log("MSG consumed: ", msg.content.toString());
try {
if (ok) {
ch.ack(msg);
json.push(JSON.parse(msg.content.toString()));
return iterate(ch, queue, json, resolve, reject);
}
ch.reject(msg, true);
return resolve({
status: 200,
message: json
});
} catch (e) {
closeOnErr(e);
return reject({
status: 404,
message: "niente da consumare"
});
}
});
});
}
它将运行功能iterate()
,如果msg
是false
,它会解决,否则将处理消息,如果从work
结果不好,它会拒绝并停止迭代,如果结果没问题,它会再次迭代;
你忘了通过决定并拒绝迭代。因为,与我的代码相比,你从Promise的范围中删除了迭代,但忘记了通过决定并拒绝。这就是为什么解决未定义。 –
新承诺:你能提供一个例子吗? – JackLametta
当然,Promise中的解析函数基本上说明了它的成功。我将用您的代码的更正版本更新我的答案。 –