SQS (Simple Queuing Service) is a distributed messaging queuing service introduced by Amazon as a part of AWS (Amazon Web Services). It is a messaging queue that helps in async processing of messages (or tasks). The producer can produce the tasks (by pushing the messages representing the tasks in the queue) and the consumer listens to the queue, fetches a task/message and execute it. This decoupling eliminates the dependency of producer on the consumer.
Suppose if processing the task is a heavy process, the producer will then waste most of its time waiting for the consumer to process the task. Messaging Queues (like SQS) allow us to decouple producer from consumer. The producer just picks up a task and pushes it into the Queue and is again ready to produce a task, not bothered about when the consumer will process it. Thus this increases the availability and efficiency of producer, and the end-user is never stuck waiting for a producer to get free.
SQS is managed service from AWS, you don’t have to own a server and set it up. AWS does it all for you and charges you as per your usage. In this article we will be implementing a simple service for Timed Retires using AWS SQS in Golang; that is a service that retries a task after a given time in a distributed environment.
Properties of SQS
Properties are some attributes the define the behaviour of the queue, like if you want the queue to be strictly FIFO or randomised, etc. Here we will discuss some of the properties of SQS that we shall be needing for this article.
- QueueName: A name for the queue.
- FifoQueue: If set to true, creates a FIFO queue. If you don’t specify this property, Amazon SQS creates a standard queue.
- VisibilityTimeout: The length of time during which a message will be unavailable after a message is delivered from the queue. This blocks other components from receiving the same message and gives the initial component-time to process and delete the message from the queue.
- DelaySeconds: The time in seconds for which the delivery of all messages in the queue is delayed.
Please note this is not the exhaustive list of properties, these are just some of them which we will be needing in this article. For an exhaustive list, please click here.
The property which we are going to take advantage of to write this utility is the DelaySeconds
.
Defining this property with t seconds
means that the message you pushed in the queue will only be visible to the consumer only after t seconds
.
This property can be defined on the queue level, as well as the message level. If defined on the queue level, all the messages will have the same delay, whereas defining it on message level gives you the freedom to choose different times for different cases.
Implementation
Let us suppose there is a third party API that you hit for a particular transaction, but in the response, this API does not return the result of the transaction (whether it was a success or not), it just returns the acknowledgement. The third-party has provided another API for enquiring the status of the transaction but has defined the time and number of retries possible. Suppose they support 3 retires after a duration of 1,2 and 3 minutes respectively. In case you don’t get a success response after the 3 enquiries, you mark the transaction as a failure.
Let us discuss how we approach the problem, as soon as we hit the third party API, we put a message, with all the fields for enquiring this transaction, into the queue, with a DelaySeconds
of first enquiry delay (1 min = 60 seconds). This message will be visible to the consumer, that will consume the message, and hit the enquiry API. If the enquiry response is Success
the consumer simply deletes the message. In case the enquiry response returns a Failure
, our consumer copies the message, put a DelaySeconds
of second enquiry delay (2 min = 120 seconds), pushes the new message into the queue and deletes the original message. Same is repeated for the third time. When the consumer gets a Failure
event in the third attempt, it simply deletes the message and marks the transaction as Failure
.
func (c *consumer) Consume() { delays := [60,120,180] for { output, err := receiveSQSMessages(c.QueueURL) if err != nil { //Process the error here continue } for _, m := range output.Messages { resp, err := c.Enquire(m) //This function hits the enquiry URL if err != nil { //Process the error here continue } else { defer c.delete(m) //In the end we have to delete this message. if resp.Status != "Success" { //Check if the response is Success if m.AttemptNo != 3 { newMessage := copyMessage(m) //This function copies the message to a new message newMessage.AttemptNo = newMessage.AttemptNo+1 //increase the attempt number by 1 produceSQSMessage(newMessage, delays[newMessage.AttemptNo]) //produce the message with delaySeconds } else { markFailure(m) //If all retires exhausted, mark the transaction as failure } } else { markSucess(m) //mark the transaction as success } } } } }
Conclusion
An interesting observation here is that this utility is not just consuming, it is also producing to the queue. However to maintain the terminology we have been using we name this function as Consume and the entity (struct) consumer. The function is not fully optimized, and various error checks have been ignored to keep it simple. Various functions which are called by this consume
function are left for the reader to implement as per the use case. The function just encapsulates the basic gist of implementing timed retires using AWS SQS in Golang.
Please note that this function also works in a distributed environment. That is the beauty of using async queues, they eliminate the interdependency of various components. I would advice try to run something of this sort to gain confidence in SQS. You can completely emulate SQS offline and carry on the experimentation without worrying about the cost. Please follow this article to see how.
We will be back with more articles and use cases of message queues till then get your hands dirty and concept clear.
0 Comments