I'm pretty new to Node.js and ES6, and this is just confusing me a little bit. I am trying to leave a process running, consuming messages from a RabbitMQ queue. It needs to be able to process the message (which takes about 30-60 seconds) before it grabs the next message. Currently, the code I have, it grabs all messages it can and then tries to fork the processes. When there are 3-5 messages in the queue, this is fine, but for 20, 50 or 100 messages, this causes the server to run out of memory.
I have tried making the .consume()
callback function async and adding await
to the message processing function. I have tried wrapping an await new Promise
within the .consume()
callback around processMessage
. I have tried adding await
to the line that calls channel.consume
. Nothing changes the behavior.
#!/usr/bin/env node
const amqp = require('amqplib');
const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
const conn_str = amqp:// + process.env.RABBITMQ_USERNAME + : + process.env.RABBITMQ_PASSWORD + @ + process.env.RABBITMQ_HOST + /development?heartbeat=60
const cluster = await amqp.connect(conn_str);
const channel = await cluster.createChannel();
await channel.assertQueue(queue, { durable: durable, autoDelete: true });
if (prefetch) {
channel.prefetch(prefetch);
}
console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)
try {
channel.consume(queue, message => {
if (message !== null) {
console.log(' [x] Received', message.content.toString());
processMessage(message.content.toString());
channel.ack(message);
return null;
} else {
console.log(error, 'Queue is empty!')
channel.reject(message);
}
}, {noAck: isNoAck});
} catch (error) {
console.log(error, 'Failed to consume messages from Queue!')
cluster.close();
}
}
exports.consumeFromQueue = consumeFromQueue;
As a sidenote, if I create an array of strings and loop through the strings, when I add await to the processMessage
line, it waits to execute process (30-60 seconds) before processing the next string.
(async () => {
for (let i=0; i<urls.length; i++) {
await processMessage(urls[i]);
}
})();
So I basically need something that functions like this, but with listening to the queue in RabbitMQ.