Friday, May 10, 2024
 Popular · Latest · Hot · Upcoming
57
rated 0 times [  64] [ 7]  / answers: 1 / hits: 5337  / 4 Years ago, mon, march 9, 2020, 12:00:00

I'm pretty new to Node.js and ES6, and this is just confusing me a little bit. I am trying to leave a process running, consuming messages from a RabbitMQ queue. It needs to be able to process the message (which takes about 30-60 seconds) before it grabs the next message. Currently, the code I have, it grabs all messages it can and then tries to fork the processes. When there are 3-5 messages in the queue, this is fine, but for 20, 50 or 100 messages, this causes the server to run out of memory.



I have tried making the .consume() callback function async and adding await to the message processing function. I have tried wrapping an await new Promise within the .consume() callback around processMessage. I have tried adding await to the line that calls channel.consume. Nothing changes the behavior.



#!/usr/bin/env node

const amqp = require('amqplib');

const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
const conn_str = amqp:// + process.env.RABBITMQ_USERNAME + : + process.env.RABBITMQ_PASSWORD + @ + process.env.RABBITMQ_HOST + /development?heartbeat=60
const cluster = await amqp.connect(conn_str);
const channel = await cluster.createChannel();
await channel.assertQueue(queue, { durable: durable, autoDelete: true });
if (prefetch) {
channel.prefetch(prefetch);
}
console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)

try {
channel.consume(queue, message => {
if (message !== null) {
console.log(' [x] Received', message.content.toString());
processMessage(message.content.toString());
channel.ack(message);
return null;
} else {
console.log(error, 'Queue is empty!')
channel.reject(message);
}
}, {noAck: isNoAck});
} catch (error) {
console.log(error, 'Failed to consume messages from Queue!')
cluster.close();
}
}

exports.consumeFromQueue = consumeFromQueue;


As a sidenote, if I create an array of strings and loop through the strings, when I add await to the processMessage line, it waits to execute process (30-60 seconds) before processing the next string.



(async () => {
for (let i=0; i<urls.length; i++) {
await processMessage(urls[i]);
}
})();


So I basically need something that functions like this, but with listening to the queue in RabbitMQ.


More From » node.js

 Answers
10

If you want to limit the number of messages being processed by a consumer at any given time, use channel.prefetch():




The count given is the maximum number of messages sent over the
channel that can be awaiting acknowledgement; once there are count
messages outstanding, the server will not send more messages on this
channel until one or more have been acknowledged.




That is, if you only want to be able process a single message at a time before moving on to the next, set channel.prefetch(1)


[#4532] Friday, March 6, 2020, 4 Years  [reply] [flag answer]
Only authorized users can answer the question. Please sign in first, or register a free account.
austynp

Total Points: 505
Total Questions: 118
Total Answers: 106

Location: Tajikistan
Member since Sun, Aug 29, 2021
3 Years ago
austynp questions
Sun, Apr 11, 21, 00:00, 3 Years ago
Sat, Feb 6, 21, 00:00, 3 Years ago
Thu, Mar 19, 20, 00:00, 4 Years ago
;