Add unit tests for SQS rendezvous in client

Co-authored-by: Michael Pu <michael.pu@uwaterloo.ca>
This commit is contained in:
Anthony Chang 2024-01-12 23:23:33 -05:00 committed by Cecylia Bocovich
parent f3b062ddb2
commit 32e864b71d
No known key found for this signature in database
GPG key ID: 009DE379FD9B7B90
3 changed files with 137 additions and 38 deletions

View file

@ -23,6 +23,8 @@ type sqsRendezvous struct {
sqsClientID string
sqsClient sqsclient.SQSClient
sqsURL *url.URL
timeout time.Duration
numRetries int
}
func newSQSRendezvous(sqsQueue string, sqsAccessKeyId string, sqsSecretKey string, transport http.RoundTripper) (*sqsRendezvous, error) {
@ -66,6 +68,8 @@ func newSQSRendezvous(sqsQueue string, sqsAccessKeyId string, sqsSecretKey strin
sqsClientID: clientID,
sqsClient: client,
sqsURL: sqsURL,
timeout: time.Second,
numRetries: 5,
}, nil
}
@ -86,11 +90,10 @@ func (r *sqsRendezvous) Exchange(encPollReq []byte) ([]byte, error) {
return nil, err
}
time.Sleep(time.Second) // wait for client queue to be created by the broker
time.Sleep(r.timeout) // wait for client queue to be created by the broker
numRetries := 5
var responseQueueURL *string
for i := 0; i < numRetries; i++ {
for i := 0; i < r.numRetries; i++ {
// The SQS queue corresponding to the client where the SDP Answer will be placed
// may not be created yet. We will retry up to 5 times before we error out.
var res *sqs.GetQueueUrlOutput
@ -99,8 +102,8 @@ func (r *sqsRendezvous) Exchange(encPollReq []byte) ([]byte, error) {
})
if err != nil {
log.Println(err)
log.Printf("Attempt %d of %d to retrieve URL of response SQS queue failed.\n", i+1, numRetries)
time.Sleep(time.Second)
log.Printf("Attempt %d of %d to retrieve URL of response SQS queue failed.\n", i+1, r.numRetries)
time.Sleep(r.timeout)
} else {
responseQueueURL = res.QueueUrl
break
@ -111,7 +114,7 @@ func (r *sqsRendezvous) Exchange(encPollReq []byte) ([]byte, error) {
}
var answer string
for i := 0; i < numRetries; i++ {
for i := 0; i < r.numRetries; i++ {
// Waiting for SDP Answer from proxy to be placed in SQS queue.
// We will retry upt to 5 times before we error out.
res, err := r.sqsClient.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{
@ -123,9 +126,9 @@ func (r *sqsRendezvous) Exchange(encPollReq []byte) ([]byte, error) {
return nil, err
}
if len(res.Messages) == 0 {
log.Printf("Attempt %d of %d to receive message from response SQS queue failed. No message found in queue.\n", i+1, numRetries)
log.Printf("Attempt %d of %d to receive message from response SQS queue failed. No message found in queue.\n", i+1, r.numRetries)
delay := float64(i)/2.0 + 1
time.Sleep(time.Duration(delay*1000) * time.Millisecond)
time.Sleep(time.Duration(delay*1000) * (r.timeout / 1000))
} else {
answer = *res.Messages[0].Body
break