class Qyu::Queue::SQS::Adapter
Constants
- TYPE
Public Class Methods
new(config)
click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 11 def initialize(config) init_client(config) end
valid_config?(config)
click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 15 def self.valid_config?(config) ConfigurationValidator.new(config).valid? end
Public Instance Methods
acknowledge_message(queue_name, message_id)
click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 57 def acknowledge_message(queue_name, message_id) @sqs.delete_message({ queue_url: get_or_create_queue_url(queue_name), receipt_handle: message_id }) end
enqueue_task(queue_name, task_id)
click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 19 def enqueue_task(queue_name, task_id) response = @sqs.send_message({ queue_url: get_or_create_queue_url(queue_name), message_body: { task_id: task_id }.to_json.to_s }) Qyu.logger.debug "SQS response: #{response}" Qyu.logger.info "Task enqueued with ID #{task_id} in queue #{queue_name}" response end
enqueue_task_to_failed_queue(queue_name, task_id)
click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 31 def enqueue_task_to_failed_queue(queue_name, task_id) failed_queue_name = queue_name + '-failed' enqueue_task(failed_queue_name, task_id) end
fetch_next_message(queue_name)
click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 36 def fetch_next_message(queue_name) Qyu.logger.debug "Listening on `#{queue_name}`" while (response = @sqs.receive_message({ queue_url: get_or_create_queue_url(queue_name), max_number_of_messages: 1 })).messages.count == 0 sleep 1 end message = response.messages[0] Qyu.logger.debug "Fetched message #{message}" { 'id' => message.receipt_handle, 'task_id' => JSON.parse(message.body)['task_id'] } end
Private Instance Methods
get_or_create_queue_url(queue_name)
click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 66 def get_or_create_queue_url(queue_name) full_queue_name = "#{@queue_prefix}-#{queue_name}" begin response = @sqs.get_queue_url({ queue_name: full_queue_name }) return response.queue_url rescue Aws::SQS::Errors::NonExistentQueue Qyu.logger.info "Could not find queue `#{full_queue_name}`, creating it" response = @sqs.create_queue({ queue_name: full_queue_name, attributes: @queue_attributes }) response.queue_url end end
init_client(config)
click to toggle source
noinspection RubyArgCount
# File lib/qyu/queue/sqs/adapter.rb, line 88 def init_client(config) Qyu.logger.debug "Initializing SQS client with configuration #{config}" @queue_prefix = config[:queue_prefix] @sqs = Aws::SQS::Client.new( region: config[:region], access_key_id: config[:access_key_id], secret_access_key: config[:secret_access_key] ) @queue_attributes = queue_attributes(config) end
queue_attributes(config)
click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 101 def queue_attributes(config) attrs = {} if config[:message_visibility_timeout] attrs['VisibilityTimeout'] = config[:message_visibility_timeout].to_s end if config[:message_retention_period] attrs['MessageRetentionPeriod'] = config[:message_retention_period].to_s end if config[:maximum_message_size] attrs['MaximumMessageSize'] = config[:maximum_message_size].to_s end attrs end