class Qyu::Queue::SQS::Adapter

Constants

TYPE

Public Class Methods

new(config) click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 11
def initialize(config)
  init_client(config)
end
valid_config?(config) click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 15
def self.valid_config?(config)
 ConfigurationValidator.new(config).valid?
end

Public Instance Methods

acknowledge_message(queue_name, message_id) click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 57
def acknowledge_message(queue_name, message_id)
  @sqs.delete_message({
    queue_url: get_or_create_queue_url(queue_name),
    receipt_handle: message_id
  })
end
enqueue_task(queue_name, task_id) click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 19
def enqueue_task(queue_name, task_id)
  response = @sqs.send_message({
    queue_url: get_or_create_queue_url(queue_name),
    message_body: { task_id: task_id }.to_json.to_s
  })

  Qyu.logger.debug "SQS response: #{response}"
  Qyu.logger.info "Task enqueued with ID #{task_id} in queue #{queue_name}"

  response
end
enqueue_task_to_failed_queue(queue_name, task_id) click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 31
def enqueue_task_to_failed_queue(queue_name, task_id)
  failed_queue_name = queue_name + '-failed'
  enqueue_task(failed_queue_name, task_id)
end
fetch_next_message(queue_name) click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 36
def fetch_next_message(queue_name)
  Qyu.logger.debug "Listening on `#{queue_name}`"

  while (response = @sqs.receive_message({
    queue_url: get_or_create_queue_url(queue_name),
    max_number_of_messages: 1
  })).messages.count == 0

  sleep 1
  end

  message = response.messages[0]

  Qyu.logger.debug "Fetched message #{message}"

  {
    'id' => message.receipt_handle,
    'task_id' => JSON.parse(message.body)['task_id']
  }
end

Private Instance Methods

get_or_create_queue_url(queue_name) click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 66
def get_or_create_queue_url(queue_name)
  full_queue_name = "#{@queue_prefix}-#{queue_name}"
  begin
    response = @sqs.get_queue_url({
      queue_name: full_queue_name
    })

    return response.queue_url
  rescue Aws::SQS::Errors::NonExistentQueue

    Qyu.logger.info "Could not find queue `#{full_queue_name}`, creating it"

    response = @sqs.create_queue({
      queue_name: full_queue_name,
      attributes: @queue_attributes
    })

    response.queue_url
  end
end
init_client(config) click to toggle source

noinspection RubyArgCount

# File lib/qyu/queue/sqs/adapter.rb, line 88
def init_client(config)

  Qyu.logger.debug "Initializing SQS client with configuration #{config}"

  @queue_prefix = config[:queue_prefix]
  @sqs = Aws::SQS::Client.new(
    region: config[:region],
    access_key_id: config[:access_key_id],
    secret_access_key: config[:secret_access_key]
  )
  @queue_attributes = queue_attributes(config)
end
queue_attributes(config) click to toggle source
# File lib/qyu/queue/sqs/adapter.rb, line 101
def queue_attributes(config)
  attrs = {}
  if config[:message_visibility_timeout]
    attrs['VisibilityTimeout'] = config[:message_visibility_timeout].to_s
  end

  if config[:message_retention_period]
    attrs['MessageRetentionPeriod'] = config[:message_retention_period].to_s
  end

  if config[:maximum_message_size]
    attrs['MaximumMessageSize'] = config[:maximum_message_size].to_s
  end

  attrs
end