class Propono::QueueListener

Attributes

aws_client[R]
message_processor[R]
propono_config[R]
topic_name[R]
visibility_timeout[R]

Public Class Methods

drain(*args, &message_processor) click to toggle source
# File lib/propono/services/queue_listener.rb, line 8
def self.drain(*args, &message_processor)
  new(*args, &message_processor).drain
end
listen(*args, &message_processor) click to toggle source
# File lib/propono/services/queue_listener.rb, line 4
def self.listen(*args, &message_processor)
  new(*args, &message_processor).listen
end
new(aws_client, propono_config, topic_name, options = {}, &message_processor) click to toggle source
# File lib/propono/services/queue_listener.rb, line 13
def initialize(aws_client, propono_config, topic_name, options = {}, &message_processor)
  @aws_client = aws_client
  @propono_config = propono_config
  @topic_name = topic_name
  @message_processor = message_processor
  @visibility_timeout = options[:visibility_timeout] || nil
end

Public Instance Methods

drain() click to toggle source
# File lib/propono/services/queue_listener.rb, line 28
def drain
  raise ProponoError.new("topic_name is nil") unless topic_name
  true while read_messages_from_queue(main_queue, 10, long_poll: false)
  true while read_messages_from_queue(slow_queue, 10, long_poll: false) if propono_config.slow_queue_enabled
end
listen() click to toggle source
# File lib/propono/services/queue_listener.rb, line 21
def listen
  raise ProponoError.new("topic_name is nil") unless topic_name
  loop do
    read_messages
  end
end

Private Instance Methods

corrupt_queue() click to toggle source
# File lib/propono/services/queue_listener.rb, line 113
def corrupt_queue
  @corrupt_queue ||= subscription.corrupt_queue
end
delete_message(raw_sqs_message, queue) click to toggle source
# File lib/propono/services/queue_listener.rb, line 101
def delete_message(raw_sqs_message, queue)
  aws_client.delete_from_sqs(queue, raw_sqs_message.receipt_handle)
end
failed_queue() click to toggle source
# File lib/propono/services/queue_listener.rb, line 109
def failed_queue
  @failed_queue ||= subscription.failed_queue
end
handle(sqs_message) click to toggle source
# File lib/propono/services/queue_listener.rb, line 79
def handle(sqs_message)
  process_message(sqs_message)
rescue => e
  propono_config.logger.error("Failed to handle message #{e.message} #{e.backtrace}")
  requeue_message_on_failure(sqs_message, e)
end
main_queue() click to toggle source
# File lib/propono/services/queue_listener.rb, line 105
def main_queue
  @main_queue ||= subscription.queue
end
move_to_corrupt_queue(raw_sqs_message) click to toggle source
# File lib/propono/services/queue_listener.rb, line 91
def move_to_corrupt_queue(raw_sqs_message)
  aws_client.send_to_sqs(corrupt_queue, raw_sqs_message.body)
end
parse(raw_sqs_message, queue) click to toggle source
# File lib/propono/services/queue_listener.rb, line 70
def parse(raw_sqs_message, queue)
  SqsMessage.new(raw_sqs_message)
rescue
  propono_config.logger.error "Error parsing message, moving to corrupt queue", $!, $!.backtrace
  move_to_corrupt_queue(raw_sqs_message)
  aws_client.delete_from_sqs(queue, raw_sqs_message.receipt_handle)
  nil
end
process_message(sqs_message) click to toggle source
# File lib/propono/services/queue_listener.rb, line 86
def process_message(sqs_message)
  return false unless message_processor
  message_processor.call(sqs_message.message, sqs_message.context)
end
process_raw_message(raw_sqs_message, queue) click to toggle source

The calls to delete_message are deliberately duplicated so as to ensure the message is only deleted if the preceeding line has completed succesfully. We do not want to ensure that the message is deleted regardless of what happens in this method.

# File lib/propono/services/queue_listener.rb, line 61
def process_raw_message(raw_sqs_message, queue)
  sqs_message = parse(raw_sqs_message, queue)
  return unless sqs_message

  propono_config.logger.info "Propono [#{sqs_message.context[:id]}]: Received from sqs."
  handle(sqs_message)
  aws_client.delete_from_sqs(queue, sqs_message.receipt_handle)
end
read_messages() click to toggle source
# File lib/propono/services/queue_listener.rb, line 36
def read_messages
  read_messages_from_queue(main_queue, propono_config.num_messages_per_poll) ||
    (propono_config.slow_queue_enabled ?  read_messages_from_queue(slow_queue, 1) : nil)
end
read_messages_from_queue(queue, num_messages, long_poll: true) click to toggle source
# File lib/propono/services/queue_listener.rb, line 41
def read_messages_from_queue(queue, num_messages, long_poll: true)
  messages = aws_client.read_from_sqs(queue, num_messages, long_poll: long_poll, visibility_timeout: visibility_timeout)
  if messages.empty?
    false
  else
    messages.each { |msg| process_raw_message(msg, queue) }
    true
  end
rescue => e #Aws::Errors => e
  propono_config.logger.error "Unexpected error reading from queue #{queue.url}"
  propono_config.logger.error e.class.name
  propono_config.logger.error e.message
  propono_config.logger.error e.backtrace
  false
end
requeue_message_on_failure(sqs_message, exception) click to toggle source
# File lib/propono/services/queue_listener.rb, line 95
def requeue_message_on_failure(sqs_message, exception)
  next_queue = (sqs_message.failure_count < propono_config.max_retries) ? main_queue : failed_queue
  propono_config.logger.error "Error processing message, moving to queue: #{next_queue}"
  aws_client.send_to_sqs(next_queue, sqs_message.to_json_with_exception(exception))
end
slow_queue() click to toggle source
# File lib/propono/services/queue_listener.rb, line 117
def slow_queue
  @slow_queue ||= subscription.slow_queue
end
subscription() click to toggle source
# File lib/propono/services/queue_listener.rb, line 121
def subscription
  QueueSubscription.create(aws_client, propono_config, topic_name)
end