class Aws::SQS::QueuePoller
A utility class for long polling messages in a loop. **Messages are automatically deleted from the queue at the end of the given block.**
poller = Aws::SQS::QueuePoller.new(queue_url) poller.poll do |msg| puts msg.body end
## Long Polling
By default, messages are received using long polling. This method will force a default `:wait_time_seconds` of 20 seconds. If you prefer to use the queue default wait time, then pass a `nil` value for `:wait_time_seconds`.
# disables 20 second default, use queue ReceiveMessageWaitTimeSeconds poller.poll(wait_time_seconds:nil) do |msg| # ... end
When disabling `:wait_time_seconds` by passing `nil`, you must ensure the queue `ReceiveMessageWaitTimeSeconds` attribute is set to a non-zero value, or you will be short-polling. This will trigger significantly more API calls.
## Batch Receiving Messages
You can specify a maximum number of messages to receive with each polling attempt via `:max_number_of_messages`. When this is set to a positive value, greater than 1, the block will receive an array of messages, instead of a single message.
# receives and yields 1 message at a time poller.poll do |msg| # ... end # receives and yields up to 10 messages at a time poller.poll(max_number_of_messages:10) do |messages| messages.each do |msg| # ... end end
The maximum value for `:max_number_of_messages` is enforced by Amazon SQS
.
## Visibility Timeouts
When receiving messages, you have a fixed amount of time to process and delete the message before it is added back into the queue. This is the visibility timeout. By default, the queue's `VisibilityTimeout` attribute is used. You can provide an alternative visibility timeout when polling.
# queue default VisibilityTimeout poller.poll do |msg| end # custom visibility timeout poller.poll(visibility_timeout:10) do |msg| end
You can reset the visibility timeout of a single message by calling {#change_message_visibility_timeout}. This is useful when you need more time to finish processing the message.
poller.poll do |msg| # do work ... # need more time for processing poller.change_message_visibility_timeout(msg, 60) # finish work ... end
If you change the visibility timeout of a message to zero, it will return to the queue immediately.
## Deleting Messages
Messages are deleted from the queue when the block returns normally.
poller.poll do |msg| # do work end # messages deleted here
You can skip message deletion by passing `skip_delete: true`. This allows you to manually delete the messages using {#delete_message}, or {#delete_messages}.
# single message poller.poll(skip_delete: true) do |msg| poller.delete_message(msg) # if successful end # batch delete messages poller.poll(skip_delete: true, max_number_of_messages:10) do |messages| poller.delete_messages(messages) end
Another way to manage message deletion is to throw `:skip_delete` from the poll block. You can use this to choose when a message, or message batch is deleted on an individual basis. This can be very useful when you are capturing temporal errors and wish for the message to timeout.
poller.poll do |msg| begin # do work rescue # unexpected error occurred while processing messages, # log it, and skip delete so it can be re-processed later throw :skip_delete end end
## Terminating the Polling Loop
By default, polling will continue indefinitely. You can stop the poller by providing an idle timeout or by throwing `:stop_polling` from the {#before_request} callback.
### `:idle_timeout` Option
This is a configurable, maximum number of seconds to wait for a new message before the polling loop exists. By default, there is no idle timeout.
# stops polling after a minute of no received messages poller.poll(idle_timeout: 60) do |msg| # ... end
### Throw `:stop_polling`
If you want more fine grained control, you can configure a before request callback to trigger before each long poll. Throwing `:stop_polling` from this callback will cause the poller to exit normally without making the next request.
# stop after processing 100 messages poller.before_request do |stats| throw :stop_polling if stats.received_message_count >= 100 end poller.poll do |msg| # do work ... end
## Tracking Progress
The poller will automatically track a few statistics client-side in a {PollerStats} object. You can access the poller stats three ways:
-
The first block argument of {#before_request}
-
The second block argument of {#poll}.
-
The return value from {#poll}.
Here are examples of accessing the statistics.
-
Configure a {#before_request} callback.
“` poller.before_request do |stats|
logger.info("requests: #{stats.request_count}") logger.info("messages: #{stats.received_message_count}") logger.info("last-timestamp: #{stats.last_message_received_at}")
end “`
-
Accept a 2nd argument in the poll block, for example:
“` poller.poll do |msg, stats|
logger.info("requests: #{stats.request_count}") logger.info("messages: #{stats.received_message_count}") logger.info("last-timestamp: #{stats.last_message_received_at}")
end “`
-
Return value:
“` stats = poller.poll(idle_timeout:10) do |msg|
# do work ...
end logger.info(“requests: #{stats.request_count}”) logger.info(“messages: #{stats.received_message_count}”) logger.info(“last-timestamp: #{stats.last_message_received_at}”) “`
Attributes
@return [Client]
@return [PollerConfig]
@return [String]
Public Class Methods
Public Instance Methods
Registers a callback that is invoked once before every polling attempt.
poller.before_request do |stats| logger.info("requests: #{stats.request_count}") logger.info("messages: #{stats.received_message_count}") logger.info("last-timestamp: #{stats.last_message_received_at}") end poller.poll do |msg| # do work ... end
## `:stop_polling`
If you throw `:stop_polling` from the {#before_request} callback, then the poller will exit normally before making the next long poll request.
poller.before_request do |stats| throw :stop_polling if stats.received_messages >= 100 end # at most 100 messages will be yielded poller.poll do |msg| # do work ... end
@yieldparam [PollerStats] stats An object that tracks a few
client-side statistics about the queue polling.
@return [void]
# File lib/aws-sdk-sqs/queue_poller.rb, line 257 def before_request(&block) @default_config = @default_config.with(before_request: block) if block_given? end
@note This method should be called from inside a {#poll} block. @param [#receipt_handle] message An object that responds to
`#receipt_handle`.
@param [Integer] seconds
# File lib/aws-sdk-sqs/queue_poller.rb, line 350 def change_message_visibility_timeout(message, seconds) @client.change_message_visibility({ queue_url: @queue_url, receipt_handle: message.receipt_handle, visibility_timeout: seconds, }) end
@note This method should be called from inside a {#poll} block. @param [#receipt_handle] message An object that responds to
`#receipt_handle`.
# File lib/aws-sdk-sqs/queue_poller.rb, line 361 def delete_message(message) @client.delete_message({ queue_url: @queue_url, receipt_handle: message.receipt_handle, }) end
@note This method should be called from inside a {#poll} block. @param [Array<#message_id, receipt_handle>] messages An array of received
messages. Each object must respond to `#message_id` and `#receipt_handle`.
# File lib/aws-sdk-sqs/queue_poller.rb, line 372 def delete_messages(messages) @client.delete_message_batch( queue_url: @queue_url, entries: messages.map { |msg| { id: msg.message_id, receipt_handle: msg.receipt_handle } } ) end
Polls the queue, yielded a message, or an array of messages. Messages are automatically deleted from the queue at the end of the given block. See the class documentation on {QueuePoller} for more examples.
@example Basic example, loops indefinitely
poller.poll do |msg| # ... end
@example Receives and deletes messages as a batch
poller.poll(max_number_of_messages:10) do |messages| messages.each do |msg| # ... end end
@option options [Integer] :wait_time_seconds (20) The
long polling interval. Messages are yielded as soon as they are received. The `:wait_time_seconds` option specifies the max duration for each polling attempt before a new request is sent to receive messages.
@option options [Integer] :max_number_of_messages (1) The maximum
number of messages to yield from each polling attempt. Values can be from 1 to 10.
@option options [Integer] :visibility_timeout (nil)
The number of seconds you have to process a message before it is put back into the queue and can be received again. By default, the queue's visibility timeout is not set.
@option options [Array<String>] :attribute_names ([])
The list of attributes that need to be returned along with each message. Valid attribute names include: * `All` - All attributes. * `ApproximateFirstReceiveTimestamp` - The time when the message was first received from the queue (epoch time in milliseconds). * `ApproximateReceiveCount` - The number of times a message has been received from the queue but not deleted. * `SenderId` - The AWS account number (or the IP address, if anonymous access is allowed) of the sender. * `SentTimestamp` - The time when the message was sent to the queue (epoch time in milliseconds).
@option options [Array<String>] :message_attribute_names ([])
A list of message attributes to receive. You can receive all messages by using `All` or `.*`. You can also use `foo.*` to return all message attributes starting with the `foo` prefix.
@option options [Integer] :idle_timeout (nil) Polling terminates
gracefully when `:idle_timeout` seconds have passed without receiving any messages.
@option options [Boolean] :skip_delete (false) When `true`, messages
are not deleted after polling block. If you wish to delete received messages, you will need to call `#delete_message` or `#delete_messages` manually.
@option options [Proc] :before_request (nil) Called before each
polling attempt. This proc receives a single argument, an instance of {PollerStats}.
@return [PollerStats]
# File lib/aws-sdk-sqs/queue_poller.rb, line 329 def poll(options = {}, &block) config = @default_config.with(options) stats = PollerStats.new catch(:stop_polling) do loop do messages = get_messages(config, stats) if messages.empty? check_idle_timeout(config, stats, messages) else process_messages(config, stats, messages, &block) end end end stats.polling_stopped_at = Time.now stats end
Private Instance Methods
# File lib/aws-sdk-sqs/queue_poller.rb, line 395 def check_idle_timeout(config, stats, messages) if config.idle_timeout since = stats.last_message_received_at || stats.polling_started_at idle_time = Time.now - since throw :stop_polling if idle_time > config.idle_timeout end end
# File lib/aws-sdk-sqs/queue_poller.rb, line 383 def get_messages(config, stats) config.before_request.call(stats) if config.before_request messages = send_request(config).messages stats.request_count += 1 messages end
# File lib/aws-sdk-sqs/queue_poller.rb, line 403 def process_messages(config, stats, messages, &block) stats.received_message_count += messages.count stats.last_message_received_at = Time.now catch(:skip_delete) do yield_messages(config, messages, stats, &block) delete_messages(messages) unless config.skip_delete end end
# File lib/aws-sdk-sqs/queue_poller.rb, line 390 def send_request(config) params = config.request_params.merge(queue_url: @queue_url) @client.receive_message(params) end
# File lib/aws-sdk-sqs/queue_poller.rb, line 412 def yield_messages(config, messages, stats, &block) if config.request_params[:max_number_of_messages] == 1 messages.each do |msg| yield(msg, stats) end else yield(messages, stats) end end