From 2250bc86f6298d797f46dd6bed08ea85efd785eb Mon Sep 17 00:00:00 2001 From: Cecylia Bocovich Date: Wed, 19 Feb 2025 14:22:33 -0500 Subject: [PATCH] Process and read broker SQS messages more quickly We're losing a lot of messages from the broker SQS queue because they are exceeding their maximum lifetime before being read and processed by the broker. This change speeds up that process by increasing the size of messagesChn and processing the messages within a go routine. --- broker/sqs.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/broker/sqs.go b/broker/sqs.go index 614dafe..fb1164e 100644 --- a/broker/sqs.go +++ b/broker/sqs.go @@ -213,7 +213,7 @@ func newSQSHandler(context context.Context, client sqsclient.SQSClient, sqsQueue func (r *sqsHandler) PollAndHandleMessages(ctx context.Context) { log.Println("SQSHandler: Starting to poll for messages at: " + *r.SQSQueueURL) - messagesChn := make(chan *types.Message, 2) + messagesChn := make(chan *types.Message, 20) go r.pollMessages(ctx, messagesChn) go r.cleanupClientQueues(ctx) @@ -223,8 +223,10 @@ func (r *sqsHandler) PollAndHandleMessages(ctx context.Context) { // if context is cancelled return default: - r.handleMessage(ctx, message) - r.deleteMessage(ctx, message) + go func(msg *types.Message) { + r.handleMessage(ctx, msg) + r.deleteMessage(ctx, msg) + }(message) } } }